package eu.dnetlib.datasource.publisher.clients;

import java.util.List;

import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Projections;
import eu.dnetlib.datasource.publisher.ApiException;
import eu.dnetlib.datasource.publisher.clients.utils.DatasourceFunctions;
import eu.dnetlib.datasource.publisher.model.*;
import eu.dnetlib.miscutils.datetime.DateUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.Cacheable;

import static com.mongodb.client.model.Filters.*;



/**
 * Created by claudio on 20/10/2016.
 */
public class MongoLoggerClient {

	private static final Log log = LogFactory.getLog(MongoLoggerClient.class);

	@Autowired
	private MongoClient datasourcePublisherMongoClient;
	@Value("${datasource.publisher.mongodb.collection}")
	private String collectionName;
	@Value("${datasource.publisher.mongodb.db}")
	private String dbName;
	@Value("${datasource.publisher.mongodb.query.limit}")
	private int limit;

	private static Bson fields = getFields();

	private static MongoCollection<Document> collection = null;

	private Bson getQuery(final String dsId) {
		return and(
				eq("parentDatasourceId", dsId),
				eq("system:profileFamily", "aggregator"),
				eq("system:isCompletedSuccessfully", "true"));
	}

	private synchronized MongoCollection<Document> getCollection() {
		if (collection == null) {
			log.info("inizializing mongodb collection ...");
			collection = datasourcePublisherMongoClient.getDatabase(dbName).getCollection(collectionName);
		}
		return collection;
	}

	@Cacheable("datasources-mongo-cache")
	public List<AggregationInfo> getAggregationHistory(final String dsId) throws ApiException {

		log.warn("getAggregationHistory(): not using cache");

		final Bson query = getQuery(dsId);

		final FindIterable<Document> iterable = getCollection().find(query).projection(fields).limit(limit).sort(dbo("system:startHumanDate", -1));
		final Iterable<AggregationInfo> transform = Iterables.transform(iterable, new Function<Document, AggregationInfo>() {

			@Override
			public AggregationInfo apply(final Document d) {

				AggregationInfo info = null;
				final AggregationStage stage = AggregationStage.parse(d.getString("system:wfName"));
				switch (stage) {

				case COLLECT:
					CollectionInfo cInfo = new CollectionInfo();
					cInfo.setAggregationStage(stage);
					final String collectionMode = d.getString("system:node:SELECT_MODE:selection");
					cInfo.setCollectionMode(StringUtils.isNotBlank(collectionMode) ? CollectionMode.valueOf(collectionMode) : null);
					cInfo.setNumberOfRecords(getNumberOfRecords(d));
					cInfo.setDate(getDate(d));
					info = cInfo;
					break;
				case TRANSFORM:
					TransformationInfo tInfo = new TransformationInfo();
					tInfo.setAggregationStage(stage);
					tInfo.setNumberOfRecords(getNumberOfRecords(d));
					tInfo.setDate(getDate(d));
					info = tInfo;
					break;
				}
				return info;
			}

			private String getDate(final Document d) {
				final String dateString = d.getString("system:startHumanDate");
				if (StringUtils.isBlank(dateString)) {
					return "";
				}
				return DateFormatUtils.format(new DateUtils().parse(dateString), DatasourceFunctions.DATE_FORMAT);
			}

			private Integer getNumberOfRecords(final Document d) {
				final String sinkSize = d.getString("mainlog:sinkSize");
				final String total = d.getString("mainlog:total");

				if(StringUtils.isNotBlank(sinkSize)) {
					return Ints.tryParse(sinkSize);
				} else if(StringUtils.isNotBlank(total)) {
					return Ints.tryParse(total);
				} else {
					return -1;
				}
			}
		});

		final Iterable<AggregationInfo> filter = Iterables.filter(transform, new Predicate<AggregationInfo>() {
			@Override
			public boolean apply(final AggregationInfo ai) {
				return ai.getNumberOfRecords() >= 0 && StringUtils.isNotBlank(ai.getDate());
			}
		});

		return Lists.newArrayList(filter);
	}

	private static Bson getFields() {
		return Projections.fields(
				eq("system:wfName", 1),
				eq("system:node:SELECT_MODE:selection", 1),
				eq("mainlog:sinkSize", 1),
				eq("mainlog:writeOps", 1),
				eq("mainlog:total", 1),
				eq("system:startHumanDate", 1),
				eq("system:profileName", 1));
	}

	private static BasicDBObject dbo(final String key, final Object value) {
		return new BasicDBObject(key, value);
	}


}
