package eu.dnetlib.data.mapreduce.dedup;

import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.junit.Before;
import org.junit.Test;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.config.DynConf;
import eu.dnetlib.pace.config.Type;
import eu.dnetlib.pace.distance.PaceDocumentDistance;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.MapDocument;

public class DedupReducerTest {

	@Before
	public void setUp() throws Exception {
	}

	@Test
	public void test() throws InterruptedException, ExecutionException {
		final Config cfg = DynConf.load("pace.model { title { algo = Levenstein, type = String, weight = 0.9, ignoreMissing = false }, "
				+ "	description { algo = Level2JaroWinkler, type = String, 	weight = 0.1, ignoreMissing = true } }");

		Map<String, List<Field>> mapA = Maps.newHashMap();
		mapA.put("title", Lists.newArrayList(new Field(Type.String, "title",
				"Lifting in Cygnus X-2 in a multi-wavelength campaign due to absorption of extended ADC emission")));
		mapA.put("description", Lists.newArrayList(new Field(Type.String, "description",
				"We report results of one-day simultaneous multiwavelength observations ofCygnus X-2 using XMM")));

		Map<String, List<Field>> mapB = Maps.newHashMap();
		mapB.put("title", Lists.newArrayList(new Field(Type.String, "title",
				"Lifting in CygnusX-2 in a multi-wavelength campaign due to absorption of extended ADC emission")));
		mapB.put("description", Lists.newArrayList(new Field(Type.String, "description",
				"We report results of one-day simultaneous multiwavelength observations ofCygnus X-2 using TYH")));

		final MapDocument resA = new MapDocument("A", mapA);
		final MapDocument resB = new MapDocument("B", mapB);

		//val algo = new JaroWinkler()

		ExecutorService ex = Executors.newFixedThreadPool(100);

		List<Future<Integer>> futures = Lists.newArrayList();
		for (int i = 0; i < 100; i++) {

			futures.add(ex.submit(new Callable<Integer>() {
				@Override
				public Integer call() {
					for (int j = 0; j < 100; j++) {
						//println(i + "_" + j)
						double d = new PaceDocumentDistance().between(resA, resB, cfg);
						System.out.println(Thread.currentThread().getId() + "_" + j + " Result ---> " + d);
					}
					return 1;

				}
			}));
		}
		for (Future<Integer> f : futures) {
			f.get();
		}

	}
}
