package eu.dnetlib.enabling.resultset;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import javax.xml.ws.wsaddressing.W3CEndpointReference;

import com.google.common.collect.Lists;

import eu.dnetlib.enabling.tools.ServiceResolver;
import eu.dnetlib.miscutils.collections.MappedCollection;
import eu.dnetlib.miscutils.functional.UnaryFunction;

public class ParallelMappedResultSet extends MappedResultSet {

	private ExecutorService executor;

	public ParallelMappedResultSet(final W3CEndpointReference source, final UnaryFunction<String, String> mapper, final ServiceResolver serviceResolver,
			final ExecutorService executor) {
		super(source, mapper, serviceResolver);
		this.executor = executor;
	}

	@Override
	public List<String> getResult(int fromPosition, int toPosition) {

//		log.info("Parallel page " + (toPosition - fromPosition + 1));

		List<Future<String>> results = Lists.newArrayList();

		for (final String input : getResultFromSource(fromPosition, toPosition)) {
			results.add(executor.submit(new Callable<String>() {
				@Override
				public String call() throws Exception {
					return getMapper().evaluate(input);
				}
			}));
		}

		return MappedCollection.listMap(results, new UnaryFunction<String, Future<String>>() {
			@Override
			public String evaluate(Future<String> arg) {
				try {
					return arg.get();
				} catch (InterruptedException e) {
					throw new IllegalStateException(e);
				} catch (ExecutionException e) {
					throw new IllegalStateException(e);
				}
			}
		});
	}

}
