package eu.dnetlib.oai.mongo;

import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Resource;

import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import eu.dnetlib.cql.CqlTranslator;
import eu.dnetlib.enabling.tools.DnetStreamSupport;
import eu.dnetlib.oai.PublisherStoreDAO;
import eu.dnetlib.oai.RecordChangeDetector;
import eu.dnetlib.oai.conf.OAIConfigurationReader;
import eu.dnetlib.oai.sets.MongoSetCollection;
import eu.dnetlib.rmi.provision.MDFInfo;
import eu.dnetlib.rmi.provision.OaiPublisherException;
import eu.dnetlib.rmi.provision.OaiPublisherRuntimeException;
import net.sf.ehcache.Cache;
import net.sf.ehcache.Element;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;

public class MongoPublisherStoreDAO implements PublisherStoreDAO<MongoPublisherStore, DNetOAIMongoCursor> {

	private static final Log log = LogFactory.getLog(MongoPublisherStoreDAO.class); // NOPMD by marko on 11/24/08 5:02 PM

	@Autowired
	private MongoClient publisherMongoClient;

	/**
	 * Name of the collection with information about the OAI stores.
	 **/
	private String metadataCollection;

	@Autowired
	private RecordInfoGenerator recordInfoGenerator;
	@Autowired
	private MetadataExtractor metadataExtractor;
	@Autowired
	private CqlTranslator cqlTranslator;
	@Resource(name = "oaiConfigurationExistReader")
	private OAIConfigurationReader configuration;
	@Autowired
	private RecordChangeDetector recordChangeDetector;
	@Autowired
	private MongoSetCollection mongoSetCollection;

	/**
	 * Cache for oaistores. Keys are dbName-storeId, values are objects of type MongoPublisherStore.
	 */
	private Cache mongoOaistoreCache;

	/**
	 * Cache for oaistores. Keys is OAI metadata prefixes, values are objects of type MongoPublisherStore.
	 */
	private Cache mongoOaistoreCacheByMdPrefix;

	private boolean alwaysNewRecord;

	protected MongoDatabase getDB(final String dbName) {
		return this.publisherMongoClient.getDatabase(dbName);
	}

	@Override
	public List<MongoPublisherStore> listPublisherStores(final String dbName) {
		final MongoDatabase db = getDB(dbName);
		final FindIterable<DBObject> stores = db.getCollection(this.metadataCollection, DBObject.class).find();
		return DnetStreamSupport.generateStreamFromIterator(stores.iterator())
				.map(storeInfo -> createFromDBObject(storeInfo, db))
				.collect(Collectors.toList());
	}

	@Override
	public MongoPublisherStore getStore(final String storeId, final String dbName) {
		final DBObject storeInfo = getDB(dbName).getCollection(this.metadataCollection, DBObject.class).find(Filters.eq("id", storeId)).first();
		return this.createFromDBObject(storeInfo, getDB(dbName));
	}

	@Override
	public MongoPublisherStore getStore(final String mdfName, final String mdfInterpretation, final String mdfLayout, final String dbName) {
		return this.getStore(this.generateStoreId(mdfName, mdfInterpretation, mdfLayout), dbName);
	}

	@Override
	public MongoPublisherStore getStoreFor(final String targetMetadataPrefix, final String dbName) {

		final Element elem = this.mongoOaistoreCacheByMdPrefix.get(targetMetadataPrefix);

		if (elem != null) {
			return (MongoPublisherStore) elem.getObjectValue();
		} else {
			final MDFInfo info = this.configuration.getMetadataFormatInfo(targetMetadataPrefix);
			final MongoPublisherStore store =
					this.getStore(info.getSourceFormat(), info.getSourceInterpretation(), info.getSourceLayout(), dbName);
			this.mongoOaistoreCacheByMdPrefix.put(new Element(targetMetadataPrefix, store));
			return store;
		}
	}

	@Override
	public MongoPublisherStore createStore(final String mdfName, final String mdfInterpretation, final String mdfLayout, final String dbName)
			throws OaiPublisherException {
		final MongoDatabase db = getDB(dbName);
		final DBObject store = createMetadataEntry(mdfName, mdfInterpretation, mdfLayout);
		final MongoCollection<DBObject> metadata = db.getCollection(this.metadataCollection, DBObject.class);
		metadata.insertOne(store);
		final MongoPublisherStore theStore = this.createFromDBObject(store, db);
		return theStore;

	}

	@Override
	public boolean deleteStore(final String storeId, final String dbName) {

		final MongoDatabase db = getDB(dbName);
		final MongoCollection<DBObject> metadata = db.getCollection(this.metadataCollection, DBObject.class);
		final DBObject storeDeleted = metadata.findOneAndDelete(Filters.eq("id", storeId));
		if (storeDeleted == null) {
			return false;
		} else {
			db.getCollection(storeId).drop();
			// TODO: should drop entries related to mdPrefix served by the store we are deleting, not all of them.
			this.mongoSetCollection.dropOAISets(dbName);
			log.debug("Deleted oaistore " + storeId + ", db: " + dbName);
			return true;
		}
	}

	@Override
	public boolean deleteFromStore(final String storeId, final String dbName, final String set) {
		final MongoDatabase db = getDB(dbName);
		final MongoCollection<DBObject> metadata = db.getCollection(this.metadataCollection, DBObject.class);
		final DBObject storeInfo = metadata.find(Filters.eq("id", storeId)).first();
		if (storeInfo == null) {
			return false;
		} else {
            db.getCollection(storeId).deleteMany(Filters.eq(OAIConfigurationReader.SET_FIELD, this.mongoSetCollection.normalizeSetSpec(set)));
            this.mongoSetCollection.dropSet(dbName, set);
			log.debug("Deleted set " + set + " from oaistore " + storeId + ", db: " + dbName);
			return true;
		}
	}

	@Override
	public boolean deleteFromStore(final String mdfName, final String mdfInterpretation, final String mdfLayout, final String dbName, final String set) {
		return this.deleteFromStore(this.generateStoreId(mdfName, mdfInterpretation, mdfLayout), dbName, set);
	}

	@Override
	public boolean deleteStore(final String mdfName, final String mdfInterpretation, final String mdfLayout, final String dbName) {
		return this.deleteStore(this.generateStoreId(mdfName, mdfInterpretation, mdfLayout), dbName);
	}

	public void ensureIndex(final MongoPublisherStore store) {
		if (store == null) { throw new OaiPublisherRuntimeException("Can't ensure index on null store"); }
		final Thread t = new Thread() {

			@Override
			public void run() {
				store.ensureIndices();
			}
		};
		t.start();
	}

	public void ensureIndex(final String dbName) {
		final List<MongoPublisherStore> stores = this.listPublisherStores(dbName);
		for (final MongoPublisherStore s : stores) {
			s.ensureIndices();
		}

	}

	private MongoPublisherStore createFromDBObject(final DBObject storeInfo, final MongoDatabase db) {
		if (storeInfo == null) { return null; }
		final String storeId = (String) storeInfo.get("id");
		final String mdFormat = (String) storeInfo.get("metadataFormat");
		final String mdInterpreation = (String) storeInfo.get("interpretation");
		final String mdLayout = (String) storeInfo.get("layout");
		final String k = db.getName() + "-" + storeId;

		final Element elem = this.mongoOaistoreCache.get(k);
		if (elem != null) {
			log.debug("Store retrieved from cache and alwaysNewRecord is" + this.alwaysNewRecord);
			final MongoPublisherStore store = (MongoPublisherStore) elem.getObjectValue();
			store.setAlwaysNewRecord(this.alwaysNewRecord);
			log.debug(store);
			return store;
		} else {
			log.debug("Store retrieved, cache miss,  alwaysNewRecord is" + this.alwaysNewRecord);
			log.fatal("Not using cache to create oaistore from dbObject: " + k);
			final MongoPublisherStore store = new MongoPublisherStore(storeId, mdFormat, mdInterpreation, mdLayout, db.getCollection(storeId, DBObject.class),
					this.configuration.getFields(mdFormat, mdInterpreation, mdLayout), this.cqlTranslator, this.recordInfoGenerator,
					this.configuration.getIdScheme(),
					this.configuration.getIdNamespace(), this.metadataExtractor, this.recordChangeDetector, this.alwaysNewRecord, db);
			store.setMongoSetCollection(this.mongoSetCollection);
			this.mongoOaistoreCache.put(new Element(k, store));
			return store;
		}
	}

	private DBObject createMetadataEntry(final String mdfName, final String mdfInterpretation, final String mdfLayout) {
		final DBObject info = BasicDBObjectBuilder.start("id", generateStoreId(mdfName, mdfInterpretation, mdfLayout)).append("metadataFormat", mdfName)
				.append("interpretation", mdfInterpretation).append("layout", mdfLayout).get();
		return info;

	}

	private String generateStoreId(final String mdfName, final String mdfInterpretation, final String mdfLayout) {
		return mdfName + "-" + mdfLayout + "-" + mdfInterpretation;
	}

	public String getMetadataCollection() {
		return this.metadataCollection;
	}

	@Required
	public void setMetadataCollection(final String metadataCollection) {
		this.metadataCollection = metadataCollection;
	}

	public OAIConfigurationReader getConfiguration() {
		return this.configuration;
	}

	public void setConfiguration(final OAIConfigurationReader configuration) {
		this.configuration = configuration;
	}

	public RecordInfoGenerator getRecordInfoGenerator() {
		return this.recordInfoGenerator;
	}

	public void setRecordInfoGenerator(final RecordInfoGenerator recordInfoGenerator) {
		this.recordInfoGenerator = recordInfoGenerator;
	}

	public MetadataExtractor getMetadataExtractor() {
		return this.metadataExtractor;
	}

	public void setMetadataExtractor(final MetadataExtractor metadataExtractor) {
		this.metadataExtractor = metadataExtractor;
	}

	public RecordChangeDetector getRecordChangeDetector() {
		return this.recordChangeDetector;
	}

	public void setRecordChangeDetector(final RecordChangeDetector recordChangeDetector) {
		this.recordChangeDetector = recordChangeDetector;
	}

	public MongoSetCollection getMongoSetCollection() {
		return this.mongoSetCollection;
	}

	public void setMongoSetCollection(final MongoSetCollection mongoSetCollection) {
		this.mongoSetCollection = mongoSetCollection;
	}

	public boolean isAlwaysNewRecord() {
		return this.alwaysNewRecord;
	}

	public void setAlwaysNewRecord(final boolean alwaysNewRecord) {
		this.alwaysNewRecord = alwaysNewRecord;
	}

	public Cache getMongoOaistoreCache() {
		return this.mongoOaistoreCache;
	}

	@Required
	public void setMongoOaistoreCache(final Cache mongoOaistoreCache) {
		this.mongoOaistoreCache = mongoOaistoreCache;
	}

	public Cache getMongoOaistoreCacheByMdPrefix() {
		return this.mongoOaistoreCacheByMdPrefix;
	}

	@Required
	public void setMongoOaistoreCacheByMdPrefix(final Cache mongoOaistoreCacheByMdPrefix) {
		this.mongoOaistoreCacheByMdPrefix = mongoOaistoreCacheByMdPrefix;
	}

	public CqlTranslator getCqlTranslator() {
		return cqlTranslator;
	}

	public void setCqlTranslator(final CqlTranslator cqlTranslator) {
		this.cqlTranslator = cqlTranslator;
	}

}
