package eu.dnetlib.index.feed;

import java.util.concurrent.Callable;
import java.util.stream.Stream;

import eu.dnetlib.clients.index.model.document.IndexDocument;
import eu.dnetlib.clients.index.utils.IndexFieldUtility;
import eu.dnetlib.index.IndexCollection;
import eu.dnetlib.rmi.provision.IndexServiceException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * The Class DocumentFeeder.
 */
public class DocumentFeeder implements Callable<FeedResult> {

	/**
	 * The Constant log.
	 */
	private static final Log log = LogFactory.getLog(DocumentFeeder.class);

	/**
	 * The index collection.
	 */
	private IndexCollection indexCollection;

	private Stream<IndexDocument> docStream;
	
	public DocumentFeeder(final IndexCollection indexCollection, final Stream<IndexDocument> docStream) {
		this.indexCollection = indexCollection;
		this.docStream = docStream;
	}

	/**
	 * {@inheritDoc}
	 *
	 * @see Callable#call()
	 */
	@Override
	public FeedResult call() {
		final FeedResult res = new FeedResult(System.currentTimeMillis());
		docStream.forEach(doc->{
			switch (doc.getStatus()) {
			
			case OK:
				boolean rsp = false;
				try {
					rsp = indexCollection.add(doc);
				} catch (IndexServiceException e) {
					log.error(e);
				}
				if (rsp) {
					res.add();
				} else {
					res.mark();
				}
				break;

			case MARKED:
				res.mark();
				log.debug("skipping record: " + doc.getFieldValue(IndexFieldUtility.INDEX_RECORD_ID));
				break;

			case ERROR:
				res.skip();
				log.info("Error on record: " + doc.getFieldValue(IndexFieldUtility.INDEX_RECORD_ID));
				break;

			default:
				throw new IllegalStateException("unknow document status");
			}
		});
		return res;
	}
	
}
