package eu.dnetlib.datasource.publisher.clients;

import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.*;
import eu.dnetlib.datasource.publisher.ApiException;
import eu.dnetlib.datasource.publisher.model.*;
import eu.dnetlib.datasource.publisher.clients.utils.IndexDsInfo;
import eu.dnetlib.datasource.publisher.clients.utils.IndexRecordsInfo;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

/**
 * Created by claudio on 20/10/2016.
 */
public class DatasourceInfoRetriever {

	private static final Log log = LogFactory.getLog(DatasourceInfoRetriever.class);

	@Autowired
	private MongoLoggerClient mongoLoggerClient;

	@Autowired
	private DatasourceIndexClient datasourceIndexClient;

	@Autowired
	private ISLookupClient lookupClient;

	@Autowired
	private JdbcDatasourceDao jdbcDatasourceDao;

	private ListeningExecutorService service;

	private final static int WORKERS = 100;

	@Value("${datasource.publisher.timeout}")
	private int timeout = 10;

	public DatasourceInfoRetriever() {
		service = MoreExecutors.listeningDecorator(
				new ScheduledThreadPoolExecutor(WORKERS,
						new ThreadFactoryBuilder().setNameFormat("datasource-info-retriever-%d").build()));
	}

	public IdentifiersResponse listIds() throws ApiException {
		return new IdentifiersResponse(jdbcDatasourceDao.listIds());
	}

	public ClientResponse getInfo(final String dsId) {

		final CountDownLatch outerLatch = new CountDownLatch(3);
		final Queue<Throwable> errors = new ConcurrentLinkedQueue<>();
		final DatasourceResponse datasourceResponse = new DatasourceResponse();

		Futures.addCallback(getAggregationHistory(dsId), new FutureCallback<List<AggregationInfo>>() {
			public void onSuccess(List<AggregationInfo> info) {
				setAggregationHistory(datasourceResponse, info);
				outerLatch.countDown();
			}
			public void onFailure(Throwable e) {
				errors.offer(e);
				outerLatch.countDown();
			}
		});

		Futures.addCallback(getDatasourceDetails(dsId), new FutureCallback<Datasource>() {
			@Override
			public void onSuccess(final Datasource datasource) {
				datasourceResponse.setDatasource(datasource);
				outerLatch.countDown();
			}

			@Override
			public void onFailure(final Throwable e) {
				errors.offer(e);
				outerLatch.countDown();
			}
		});

		Futures.addCallback(calculateCurrentIndexDsInfo(), new FutureCallback<IndexDsInfo>() {

			public void onSuccess(final IndexDsInfo info) {

				final CountDownLatch innerLatch = new CountDownLatch(1);

				Futures.addCallback(getIndexInfo(dsId, info), new FutureCallback<IndexRecordsInfo>() {
					public void onSuccess(IndexRecordsInfo info) {
						datasourceResponse.setIndexRecords(info.getCount()).setLastIndexingDate(info.getDate());
						innerLatch.countDown();
					}
					public void onFailure(Throwable e) {
						errors.offer(e);
						innerLatch.countDown();
					}
				});
				waitLatch(innerLatch, errors, timeout);

				outerLatch.countDown();
			}

			public void onFailure(final Throwable e) {
				errors.offer(e);
				outerLatch.countDown();
			}
		});

		waitLatch(outerLatch, errors, timeout);

		if (!errors.isEmpty()) {
			datasourceResponse.getResponseHeader().setError(Joiner.on("\n").skipNulls().join(Iterables.transform(errors, new Function<Throwable, String>() {
				@Override
				public String apply(final Throwable e) {
					return e.getMessage();
				}
			})));
			log.error(Joiner.on("\n").skipNulls().join(Iterables.transform(errors, new Function<Throwable, String>() {
				@Override
				public String apply(final Throwable e) {
					return ExceptionUtils.getFullStackTrace(e);
				}
			})));
		}

		return new ClientResponse().datasourceInfo(datasourceResponse).errors(errors);
	}

	private ListenableFuture<Datasource> getDatasourceDetails(final String dsId) {
		return service.submit(new Callable<Datasource>() {
			@Override
			public Datasource call() throws ApiException {
				return jdbcDatasourceDao.getDatasource(dsId);
			}
		});
	}

	private ListenableFuture<IndexDsInfo> calculateCurrentIndexDsInfo() {
		return service.submit(new Callable<IndexDsInfo>() {
				public IndexDsInfo call() throws ApiException {
					return lookupClient.calculateCurrentIndexDsInfo();
				}
			});
	}

	private ListenableFuture<IndexRecordsInfo> getIndexInfo(final String dsId, final IndexDsInfo info) {
		return service.submit(new Callable<IndexRecordsInfo>() {
			public IndexRecordsInfo call() throws ApiException {
				return datasourceIndexClient.getIndexInfo(dsId, info);
			}
		});
	}

	private ListenableFuture<List<AggregationInfo>> getAggregationHistory(final String dsId) {
		return service.submit(new Callable<List<AggregationInfo>>() {
			public List<AggregationInfo> call() throws ApiException {
				return mongoLoggerClient.getAggregationHistory(dsId);
			}
		});
	}

	private void setAggregationHistory(DatasourceResponse datasourceResponse, final List<AggregationInfo> info) {
		datasourceResponse.setAggregationHistory(info);
		if (!info.isEmpty()) {
			datasourceResponse.setLastCollection(Iterables.find(info, new Predicate<AggregationInfo>() {
				@Override
				public boolean apply(final AggregationInfo a) {
					return AggregationStage.COLLECT.equals(a.getAggregationStage());
				}
			})).setLastTransformation(Iterables.find(info, new Predicate<AggregationInfo>() {
				@Override
				public boolean apply(final AggregationInfo a) {
					return AggregationStage.TRANSFORM.equals(a.getAggregationStage());
				}
			}));
		}
	}

	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
		try {
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
			}
		} catch (final InterruptedException e) {
			errors.offer(e);
		}
	}

}
