package eu.dnetlib.oai.mongo;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBObject;
import com.mongodb.WriteConcern;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import eu.dnetlib.cql.CqlTranslator;
import eu.dnetlib.oai.PublisherField;
import eu.dnetlib.oai.PublisherStore;
import eu.dnetlib.oai.RecordChangeDetector;
import eu.dnetlib.oai.conf.OAIConfigurationReader;
import eu.dnetlib.oai.info.RecordInfo;
import eu.dnetlib.oai.info.SetInfo;
import eu.dnetlib.oai.parser.PublisherRecordParser;
import eu.dnetlib.oai.sets.MongoSetCollection;
import eu.dnetlib.rmi.provision.OaiPublisherRuntimeException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bson.conversions.Bson;
import org.bson.types.Binary;

public class MongoPublisherStore implements PublisherStore<DNetOAIMongoCursor> {

	private static final Log log = LogFactory.getLog(MongoPublisherStore.class); // NOPMD by marko on 11/24/08 5:02 PM

	private String id, metadataFormat, interpretation, layout;
	/**
	 * Keeps information about the fields to be created in mongo.
	 **/
	private List<PublisherField> mongoFields;

	private MongoCollection<DBObject> collection;
	private MongoCollection<DBObject> discardedCollection;

	private CqlTranslator cqlTranslator;
	private RecordInfoGenerator recordInfoGenerator;
	private MetadataExtractor metadataExtractor;

	private RecordChangeDetector recordChangeDetector;

	private MongoSetCollection mongoSetCollection;


	/**
	 * Used to generate the OAI identifiers compliant to the protocol. See
	 * http://www.openarchives.org/OAI/openarchivesprotocol.html#UniqueIdentifier.
	 */
	private String idScheme;
	/**
	 * Used to generate the OAI identifiers compliant to the protocol. See
	 * http://www.openarchives.org/OAI/openarchivesprotocol.html#UniqueIdentifier.
	 */
	private String idNamespace;

	private boolean alwaysNewRecord;

	public MongoPublisherStore() {
		super();
	}

	public MongoPublisherStore(final String id,
			final String metadataFormat,
			final String interpretation,
			final String layout,
			final MongoCollection<DBObject> collection,
			final List<PublisherField> mongoFields,
			final CqlTranslator cqlTranslator,
			final RecordInfoGenerator recordInfoGenerator,
			final String idScheme,
			final String idNamespace,
			final MetadataExtractor metadataExtractor,
			final RecordChangeDetector recordChangeDetector,
			final boolean alwaysNewRecord,
			final MongoDatabase mongodb) {
		super();
		this.id = id;
		this.metadataFormat = metadataFormat;
		this.interpretation = interpretation;
		this.layout = layout;
		this.collection = collection;
		this.discardedCollection = mongodb.getCollection("discarded-" + collection.getNamespace().getCollectionName(), DBObject.class);
		this.mongoFields = mongoFields;
		this.cqlTranslator = cqlTranslator;
		this.recordInfoGenerator = recordInfoGenerator;
		this.idScheme = idScheme;
		this.idNamespace = idNamespace;
		this.metadataExtractor = metadataExtractor;
		this.recordChangeDetector = recordChangeDetector;
		this.alwaysNewRecord = alwaysNewRecord;
	}

	@Override
	public RecordInfo getRecord(final String recordId) {
		final Bson query = Filters.eq(OAIConfigurationReader.ID_FIELD, recordId);
		final DBObject result = this.collection.find(query).first();
		return this.recordInfoGenerator.transformDBObject(result, true);
	}

	@Override
	public RecordInfo getRecord(final String recordId, final Function<String, String> unaryFunction) {
		final RecordInfo result = this.getRecord(recordId);
		if (result != null) {
			final String transformedBody = unaryFunction.apply(result.getMetadata());
			result.setMetadata(transformedBody);
		}
		return result;
	}

	@Override
	public DNetOAIMongoCursor getRecords(final String queryString, final boolean bodyIncluded, final int limit) {
		final FindIterable<DBObject> iter = loggedFindByQuery(queryString, limit);
		return new DNetOAIMongoCursor(iter.iterator(), bodyIncluded, this.recordInfoGenerator, this.metadataExtractor);
	}

	@Override
	public DNetOAIMongoCursor getRecords(final String queryString,
			final Function<String, String> unaryFunction,
			final boolean bodyIncluded,
			final int limit) {
		final FindIterable<DBObject> iter = loggedFindByQuery(queryString, limit);
		return new DNetOAIMongoCursor(iter.iterator(), unaryFunction, bodyIncluded, this.recordInfoGenerator, this.metadataExtractor);
	}

	private FindIterable<DBObject> loggedFindByQuery(final String queryString, final int limit) {
		final Bson query = parseQuery(queryString);
		long start = System.currentTimeMillis();
		Bson sortByIdAsc = Sorts.orderBy(Sorts.ascending("_id"));
		FindIterable<DBObject> iter = this.collection.find(query).sort(sortByIdAsc).limit(limit);
		long end = System.currentTimeMillis();
		log.debug("Query:" + query + "\ntime to get mongo iterable (ms): " + (end - start));
		return iter;
	}

	private Bson parseQuery(final String query) {
		try {
			return cqlTranslator.toMongo(query);
		} catch(Exception e ) {
			throw new OaiPublisherRuntimeException(e);
		}
	}


	@Override
	public List<PublisherField> getIndices() {
		return this.mongoFields;
	}

	/**
	 * <p>
	 * Ensure indices on the configuration-defined fields and on the system fields DATESTAMP_FIELD and LAST_COLLECTION_DATE_FIELD.
	 * <p>
	 * <p>
	 * Note that by default ID_FIELD, SET_FIELD, DELETED_FIELD, BODY_FIELD, UPDATED_FIELD are not indexed. If you want an index on those,
	 * then you have to specify it in the configuration file of the OAI Publisher: <br>
	 * <INDEX name="objIdentifier" repeatable="false">
	 * <SOURCE interpretation="transformed" layout="store" name="DMF" path="//*[local-name()='objIdentifier']"/>
	 * </INDEX>
	 * </p>
	 * <p>
	 * {@inheritDoc}
	 */
	@Override
	public void ensureIndices() {
		final IndexOptions indexOptions = new IndexOptions().background(true);
		final Stopwatch sw = Stopwatch.createUnstarted();
		sw.start();
		for (PublisherField field : this.mongoFields) {
			BasicDBObject mongoIdx = new BasicDBObject(field.getFieldName(), 1);
			log.debug("Creating index on store "+id+" : " + mongoIdx);
			this.collection.createIndex(mongoIdx, indexOptions);
		}
		log.debug("Creating index over : " + OAIConfigurationReader.DATESTAMP_FIELD);
		this.collection.createIndex(new BasicDBObject(OAIConfigurationReader.DATESTAMP_FIELD, 1), indexOptions);
		log.debug("Creating index over : " + OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD);
		this.collection.createIndex(new BasicDBObject(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, 1), indexOptions);
		sw.stop();
		log.info("All indexes have been updated in " + sw.elapsed(TimeUnit.MILLISECONDS) + " milliseconds");
	}

	/**
	 * Creates a compound index over the specified fields on the given store.
	 * <p>
	 * The creation is performed on the background
	 * </p>
	 *
	 * @param fieldNames
	 *            List of fields to be included in the compound index
	 * @theStore MongoPublisherStore where to create the index
	 */
	public void createCompoundIndex(final List<String> fieldNames) {
		if ((fieldNames == null) || fieldNames.isEmpty()) {
			log.fatal("No fields specified for the creation of the compound index");
		}
		BasicDBObjectBuilder theIndexBuilder = BasicDBObjectBuilder.start();
		for (String f : fieldNames) {
			theIndexBuilder.add(f, 1);
		}
		BasicDBObject theIndex = (BasicDBObject) theIndexBuilder.get();
		log.info("Creating index " + theIndex + " on " + this.getId());
		this.getCollection().createIndex(theIndex, new IndexOptions().background(true));
	}

	private void dropDiscarded(final String source) {
		if (StringUtils.isBlank(source)) {
			log.debug("Dropping discarded records from publisherStore " + this.id);
			this.discardedCollection.drop();
		} else {
			log.debug("Dropping discarded records for source " + source + " from publisherStore " + this.id);
			this.discardedCollection.deleteMany(Filters.eq(OAIConfigurationReader.SET_FIELD, source));
		}
	}

	@Override
	public int feed(final Iterable<String> records, final String source) {
		final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(80);
		final Object sentinel = new Object();
		this.dropDiscarded(source);
		final Date feedDate = new Date();
		final Thread background = new Thread(() -> {
			// For fast feeding we want to use a collection with unack write concern
			final MongoCollection<DBObject> unackCollection = MongoPublisherStore.this.collection.withWriteConcern(WriteConcern.UNACKNOWLEDGED);
			while (true) {
				try {
					final Object record = queue.take();
					if (record == sentinel) {
						break;
					}
					safeFeedRecord((String) record, source, feedDate, unackCollection);
				} catch (final InterruptedException e) {
					log.fatal("got exception in background thread", e);
					throw new IllegalStateException(e);
				}
			}
		});
		background.start();
		final long startFeed = feedDate.getTime();
		try {
			log.info("feeding publisherStore " + this.id);
			for (final String record : records) {
				queue.put(record);
			}
			queue.put(sentinel);
			log.info("finished feeding publisherStore " + this.id);

			background.join();
		} catch (final InterruptedException e) {
			throw new IllegalStateException(e);
		}
		final long endFeed = System.currentTimeMillis();
		log.fatal("OAI STORE " + this.id + " FEEDING COMPLETED IN " + (endFeed - startFeed) + "ms");
		this.setDeletedFlags(feedDate, source);
		//Let's add the set here so we can avoid to add in the workflow another job that upsert the sets.
		if(StringUtils.isNotBlank(source)) {
			this.upsertSets(Lists.newArrayList(source));
		}
		return this.count();
	}

	/**
	 * Launches the thread that flags the records to be considered as 'deleted'.
	 * <p>
	 * The datestamp of the deleted records must be updated as well, according to the OAI specs available at
	 * http://www.openarchives.org/OAI/openarchivesprotocol.html#DeletedRecords: if a repository does keep track of deletions then the
	 * datestamp of the deleted record must be the date and time that it was deleted.
	 * </p>
	 *
	 * @param feedDate
	 * @param source
	 */
	private void setDeletedFlags(final Date feedDate, final String source) {
		// get the collection with ACKNOWLEDGE Write concern
		final MongoCollection<DBObject> ackCollection = this.collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
		final Thread deletedSetter = new Thread(new Runnable() {

			@Override
			public void run() {
				Bson filter = Filters.and(Filters.eq(OAIConfigurationReader.DELETED_FIELD, false),
						Filters.lt(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate));

				if (!StringUtils.isBlank(source)) {
					filter = Filters.and(filter, Filters.eq(OAIConfigurationReader.SET_FIELD, mongoSetCollection.normalizeSetSpec(source)));
				}
				log.debug("Delete flag query: " + filter);
				final BasicDBObject update = new BasicDBObject("$set",
						BasicDBObjectBuilder.start(OAIConfigurationReader.DELETED_FIELD, true).append(OAIConfigurationReader.DATESTAMP_FIELD, feedDate)
								.append(OAIConfigurationReader.UPDATED_FIELD, true).get());
				log.debug("Updating as: " + update.toString());
				final UpdateResult updateResult = ackCollection.updateMany(filter, update, new UpdateOptions().upsert(false));
				log.debug("Deleted flags set for source: " + source + " #records = " + updateResult.getModifiedCount());
			}
		});

		deletedSetter.start();
		try {
			deletedSetter.join();
		} catch (final InterruptedException e) {
			throw new IllegalStateException(e);
		}
	}

	@Override
	public void drop() {
		this.collection.drop();
	}

	@Override
	public void drop(final String queryString) {
		Bson query = parseQuery(queryString);
		final DeleteResult deleteResult = this.collection.deleteMany(query);
		log.debug("Deleted by query: " + queryString + " #deleted: " + deleteResult.getDeletedCount());
	}

	@Override
	public int count() {
		return (int) this.collection.count();
	}

	@Override
	public int count(final String queryString) {
		if (StringUtils.isBlank(queryString)) return (int) this.collection.count();
		Bson query = parseQuery(queryString);
		return (int) this.collection.count(query);
	}

	public List<String> getDistinctSetNamesFromRecords() {
		log.info("Going to ask for all distinct sets in the oaistore " + this.id + ": this may take a long time...");
		return Lists.newArrayList(this.collection.distinct(OAIConfigurationReader.SET_FIELD, String.class));
	}

	// ***********************************************************************************************//
	// Feed utilities
	// ***********************************************************************************************//
	private boolean safeFeedRecord(final String record, final String source, final Date feedDate, final MongoCollection<DBObject> unackCollection) {
		try {
			if (!record.isEmpty()) { return feedRecord(record, source, feedDate, unackCollection); }
		} catch (final Throwable e) {
			log.error("Got unhandled exception while parsing record", e);
			this.discardedCollection.insertOne(new BasicDBObject(OAIConfigurationReader.SET_FIELD, source).append(OAIConfigurationReader.BODY_FIELD, record));
		}
		return false;
	}

	/**
	 * Feed the record to the store.
	 *
	 * @return true if the record is new, false otherwise
	 */
	private boolean feedRecord(final String record, final String source, final Date feedDate, final MongoCollection<DBObject> unackCollection) {
		final PublisherRecordParser parser = new PublisherRecordParser(this.mongoFields);
		log.debug("configured parser for fields: "+this.mongoFields);
		final Multimap<String, String> recordProperties = parser.parseRecord(record, source);
		String id;
		String oaiID;
		if (recordProperties.containsKey(OAIConfigurationReader.ID_FIELD)) {
			id = recordProperties.get(OAIConfigurationReader.ID_FIELD).iterator().next();
			oaiID = getOAIIdentifier(id);
			if (isNewRecord(oaiID)) {
				feedNew(oaiID, record, recordProperties, feedDate, unackCollection);
				return true;
			} else {
				if (isChanged(oaiID, record)) {
					updateRecord(oaiID, record, recordProperties, feedDate, unackCollection);
				} else {
					// it is not changed, I only have to update the last collection date
					handleRecord(oaiID, feedDate, unackCollection);
				}
			}
		} else {
			log.error("parsed record seems invalid -- no identifier property with name: " + OAIConfigurationReader.ID_FIELD);
			log.error("Extracted property map: \n"+recordProperties);
			log.debug("from: \n"+record);
			this.discardedCollection
					.insertOne(new BasicDBObject(OAIConfigurationReader.SET_FIELD, source).append(OAIConfigurationReader.BODY_FIELD, record).append(
							OAIConfigurationReader.DATESTAMP_FIELD, feedDate));
		}
		return false;
	}

	private BasicDBObject createBasicObject(final String oaiID, final String record, final Multimap<String, String> recordProperties) {
		final BasicDBObject obj = new BasicDBObject();
		for (final String key : recordProperties.keySet()) {
			if (key.equals(OAIConfigurationReader.ID_FIELD)) {
				obj.put(key, oaiID);
			} else {
				final Collection<String> values = recordProperties.get(key);
				if (key.equals(OAIConfigurationReader.SET_FIELD)) {

					final Iterable<String> setSpecs =
							values.stream().map(s -> MongoPublisherStore.this.mongoSetCollection.normalizeSetSpec(s)).collect(Collectors.toList());
					obj.put(key, setSpecs);
				} else {
					// let's check if the key is the name of a repeatable field or not
					final PublisherField keyField = Iterables.find(this.mongoFields, field -> field.getFieldName().equals(key), null);
					if (keyField == null) {
						log.warn("Expected field to index: " + key + " could not be found, but we keep going...");
					}
					if ((keyField != null) && !keyField.isRepeatable()) {
						if ((values != null) && !values.isEmpty()) {
							obj.put(key, values.iterator().next());
						}
					} else {
						obj.put(key, values);
					}
				}
			}
		}
		try {
			obj.put(OAIConfigurationReader.BODY_FIELD, createCompressRecord(record));
			obj.put(OAIConfigurationReader.DELETED_FIELD, false);
			return obj;
		} catch (final IOException e) {
			throw new OaiPublisherRuntimeException(e);
		}
	}

	/**
	 * @param record
	 * @throws IOException
	 */
	public Binary createCompressRecord(final String record) throws IOException {
		final ByteArrayOutputStream os = new ByteArrayOutputStream();
		final ZipOutputStream zos = new ZipOutputStream(os);
		final ZipEntry entry = new ZipEntry(OAIConfigurationReader.BODY_FIELD);
		zos.putNextEntry(entry);
		zos.write(record.getBytes());
		zos.closeEntry();
		zos.flush();
		zos.close();
		return new Binary(os.toByteArray());
	}

	private void feedNew(final String oaiID,
			final String record,
			final Multimap<String, String> recordProperties,
			final Date feedDate,
			final MongoCollection<DBObject> unackCollection) {
		log.debug("New record received. Assigned oai id: " + oaiID);
		final DBObject obj = this.createBasicObject(oaiID, record, recordProperties);
		obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate);
		obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate);
		obj.put(OAIConfigurationReader.UPDATED_FIELD, false);
		unackCollection.insertOne(obj);
		this.upsertSets(recordProperties.get(OAIConfigurationReader.SET_FIELD));
	}

	private void updateRecord(final String oaiID,
			final String record,
			final Multimap<String, String> recordProperties,
			final Date feedDate,
			final MongoCollection<DBObject> unackCollection) {
		log.debug("updating record " + oaiID);
		final BasicDBObject obj = this.createBasicObject(oaiID, record, recordProperties);
		obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate);
		obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate);
		obj.put(OAIConfigurationReader.UPDATED_FIELD, true);
		final Bson oldObj = Filters.eq(OAIConfigurationReader.ID_FIELD, oaiID);
		unackCollection.replaceOne(oldObj, obj, new UpdateOptions().upsert(true));
		//		unackCollection.updateOne(oldObj, new Document("$set",obj), new UpdateOptions().upsert(true));
		this.upsertSets(recordProperties.get(OAIConfigurationReader.SET_FIELD));
	}

	public void upsertSets(final Iterable<String> setNames) {
		// feed the list of sets, if any
		if (setNames != null) {
			for (final String setName : setNames) {
				if (StringUtils.isNotBlank(setName)) {
					final SetInfo set = new SetInfo();
					final String setSpec = this.mongoSetCollection.normalizeSetSpec(setName);
					set.setSetSpec(setSpec);
					set.setSetName(setName);
					set.setSetDescription("This set contains metadata records whose provenance is " + setName);
					set.setEnabled(true);
					final String query = "(" + OAIConfigurationReader.SET_FIELD + " = \"" + setSpec + "\") ";
					set.setQuery(query);
					this.mongoSetCollection.upsertSet(set, false, getCollection().getNamespace().getDatabaseName());
				}
			}
		}
	}

	private void handleRecord(final String oaiID, final Date lastCollectionDate, final MongoCollection<DBObject> unackCollection) {
		log.debug("handling unchanged record " + oaiID);
		final Bson oldObj = Filters.eq(OAIConfigurationReader.ID_FIELD, oaiID);
		final BasicDBObject update = new BasicDBObject("$set", new BasicDBObject(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, lastCollectionDate));
		unackCollection.updateOne(oldObj, update, new UpdateOptions().upsert(true));
	}

	private boolean isNewRecord(final String oaiIdentifier) {
		if (this.alwaysNewRecord || (this.collection.count() == 0)) { return true; }
		return this.collection.find(Filters.eq(OAIConfigurationReader.ID_FIELD, oaiIdentifier)).first() == null;
	}

	// ***********************************************************************************************//
	// Setters / Getters / Basic utilities
	// ***********************************************************************************************//

	private boolean isChanged(final String oaiID, final String record) {
		final RecordInfo oldRecord = getRecord(oaiID);
		if (oldRecord == null) { return StringUtils.isBlank(record); }
		return this.recordChangeDetector.differs(oldRecord.getMetadata(), record);
	}

	private String getOAIIdentifier(final String id) {
		return this.idScheme + ":" + this.idNamespace + ":" + id;
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = (prime * result) + ((this.collection == null) ? 0 : this.collection.hashCode());
		result = (prime * result) + ((this.id == null) ? 0 : this.id.hashCode());
		result = (prime * result) + ((this.interpretation == null) ? 0 : this.interpretation.hashCode());
		result = (prime * result) + ((this.layout == null) ? 0 : this.layout.hashCode());
		result = (prime * result) + ((this.metadataFormat == null) ? 0 : this.metadataFormat.hashCode());
		return result;
	}

	@Override
	public boolean equals(final Object obj) {
		if (this == obj) { return true; }
		if (obj == null) { return false; }
		if (!(obj instanceof MongoPublisherStore)) { return false; }
		final MongoPublisherStore other = (MongoPublisherStore) obj;
		if (this.collection == null) {
			if (other.collection != null) { return false; }
		} else if (!this.collection.equals(other.collection)) { return false; }
		if (this.id == null) {
			if (other.id != null) { return false; }
		} else if (!this.id.equals(other.id)) { return false; }
		if (this.interpretation == null) {
			if (other.interpretation != null) { return false; }
		} else if (!this.interpretation.equals(other.interpretation)) { return false; }
		if (this.layout == null) {
			if (other.layout != null) { return false; }
		} else if (!this.layout.equals(other.layout)) { return false; }
		if (this.metadataFormat == null) {
			if (other.metadataFormat != null) { return false; }
		} else if (!this.metadataFormat.equals(other.metadataFormat)) { return false; }
		return true;
	}

	public MongoCollection<DBObject> getCollection() {
		return this.collection;
	}

	public void setCollection(final MongoCollection<DBObject> collection) {
		this.collection = collection;
	}

	public MongoCollection<DBObject> getDiscardedCollection() {
		return this.discardedCollection;
	}

	public void setDiscardedCollection(final MongoCollection<DBObject> discardedCollection) {
		this.discardedCollection = discardedCollection;
	}

	public String getIdScheme() {
		return this.idScheme;
	}

	public void setIdScheme(final String idScheme) {
		this.idScheme = idScheme;
	}

	public String getIdNamespace() {
		return this.idNamespace;
	}

	public void setIdNamespace(final String idNamespace) {
		this.idNamespace = idNamespace;
	}

	public RecordInfoGenerator getRecordInfoGenerator() {
		return this.recordInfoGenerator;
	}

	public void setRecordInfoGenerator(final RecordInfoGenerator recordInfoGenerator) {
		this.recordInfoGenerator = recordInfoGenerator;
	}

	public MetadataExtractor getMetadataExtractor() {
		return this.metadataExtractor;
	}

	public void setMetadataExtractor(final MetadataExtractor metadataExtractor) {
		this.metadataExtractor = metadataExtractor;
	}

	public RecordChangeDetector getRecordChangeDetector() {
		return this.recordChangeDetector;
	}

	public void setRecordChangeDetector(final RecordChangeDetector recordChangeDetector) {
		this.recordChangeDetector = recordChangeDetector;
	}

	@Override
	public String getId() {
		return this.id;
	}

	public void setId(final String id) {
		this.id = id;
	}

	@Override
	public String getMetadataFormat() {
		return this.metadataFormat;
	}

	public void setMetadataFormat(final String metadataFormat) {
		this.metadataFormat = metadataFormat;
	}

	@Override
	public String getInterpretation() {
		return this.interpretation;
	}

	public void setInterpretation(final String interpretation) {
		this.interpretation = interpretation;
	}

	@Override
	public String getLayout() {
		return this.layout;
	}

	public void setLayout(final String layout) {
		this.layout = layout;
	}

	public MongoSetCollection getMongoSetCollection() {
		return this.mongoSetCollection;
	}

	public void setMongoSetCollection(final MongoSetCollection mongoSetCollection) {
		this.mongoSetCollection = mongoSetCollection;
	}

	public List<PublisherField> getMongoFields() {
		return this.mongoFields;
	}

	public void setMongoFields(final List<PublisherField> mongoFields) {
		this.mongoFields = mongoFields;
	}

	public boolean isAlwaysNewRecord() {
		return this.alwaysNewRecord;
	}

	public void setAlwaysNewRecord(final boolean alwaysNewRecord) {
		this.alwaysNewRecord = alwaysNewRecord;
	}



}
