package eu.dnetlib.functionality.index.feed;

import java.util.concurrent.Callable;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import eu.dnetlib.functionality.index.IndexCollection;
import eu.dnetlib.functionality.index.model.document.IndexDocument;
import eu.dnetlib.functionality.index.utils.IndexFieldUtility;

/**
 * The Class DocumentFeeder.
 */
public class DocumentFeeder implements Callable<FeedResult> {

	/** The Constant log. */
	private static final Log log = LogFactory.getLog(DocumentFeeder.class);

	/** The index collection. */
	private IndexCollection indexCollection;

	/** The records. */
	private Iterable<IndexDocument> records;

	/**
	 * Instantiates a new document feeder.
	 * 
	 * @param indexCollection
	 *            the index collection
	 * @param records
	 *            the records
	 */
	public DocumentFeeder(final IndexCollection indexCollection, final Iterable<IndexDocument> records) {
		this.indexCollection = indexCollection;
		this.records = records;
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see java.util.concurrent.Callable#call()
	 */
	@Override
	public FeedResult call() throws Exception {
		final FeedResult res = new FeedResult(System.currentTimeMillis());
		for (IndexDocument doc : records) {
			boolean rsp;
			switch (doc.getStatus()) {

			case OK:
				rsp = indexCollection.add(doc);
				if (rsp) {
					res.add();
				} else {
					res.mark();
				}
				break;

			case MARKED:
				res.mark();
				log.debug("skipping record: " + doc.getFieldValue(IndexFieldUtility.INDEX_RECORD_ID));
				break;

			case ERROR:
				res.skip();
				log.info("Error on record: " + doc.getFieldValue(IndexFieldUtility.INDEX_RECORD_ID));
				break;

			default:
				throw new IllegalStateException("unknow document status");
			}
		}
		return res;
	}

}
