package eu.dnetlib.data.index;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;

import eu.dnetlib.functionality.index.solr.feed.StreamingInputDocumentFactory;
import eu.dnetlib.miscutils.datetime.HumanTime;
import eu.dnetlib.miscutils.functional.UnaryFunction;

/**
 * Created by michele on 11/11/15.
 */
public class CloudIndexClient {

	private static final Log log = LogFactory.getLog(CloudIndexClient.class);
	private static final String INDEX_RECORD_RESULT_FIELD = "dnetResult";

	private final CloudSolrServer solrServer;

	protected CloudIndexClient(final CloudSolrServer solrServer) {
		this.solrServer = solrServer;
	}

	public int feed(final String record, final String indexDsId, final UnaryFunction<String, String> toIndexRecord) throws CloudIndexClientException {
		return feed(record, indexDsId, toIndexRecord, true);
	}

	public int feed(final String record, final String indexDsId, final UnaryFunction<String, String> toIndexRecord, final boolean commit)
			throws CloudIndexClientException {
		try {
			final SolrInputDocument doc = prepareSolrDocument(record, indexDsId, toIndexRecord);
			if ((doc == null) || doc.isEmpty()) throw new CloudIndexClientException("Invalid solr document");
			return feed(doc, commit);
		} catch (final Throwable e) {
			throw new CloudIndexClientException("Error feeding document", e);
		}
	}

	public int feed(final SolrInputDocument document) throws CloudIndexClientException {
		return feed(document, true);
	}

	public int feed(final SolrInputDocument document, final boolean commit) throws CloudIndexClientException {
		try {
			final UpdateResponse res = solrServer.add(document);
			log.debug("feed time for single records, elapsed time: " + HumanTime.exactly(res.getElapsedTime()));
			if (res.getStatus() != 0) { throw new CloudIndexClientException("bad status: " + res.getStatus()); }
			if (commit) {
				solrServer.commit();
			}
			return res.getStatus();
		} catch (final Throwable e) {
			throw new CloudIndexClientException("Error feeding document", e);
		}
	}

	public void feed(final List<SolrInputDocument> docs, final AfterFeedingCallback callback) throws CloudIndexClientException {
		feed(docs, callback, true);
	}

	public void feed(final List<SolrInputDocument> docs, final AfterFeedingCallback callback, final boolean commit) throws CloudIndexClientException {
		try {
			if (docs.isEmpty()) {
				log.debug("Empty list of documents. Calling callback, if needed.");
				if (callback != null) {
					callback.doAfterFeeding(null);
				}
				return;
			}
			final UpdateResponse res = solrServer.add(docs);

			log.debug("feed time for " + docs.size() + " records, elapsed tipe: : " + HumanTime.exactly(res.getElapsedTime()));

			if (commit) {
				solrServer.commit();
			}
			if (callback != null) {
				callback.doAfterFeeding(res);
			}
			if (res.getStatus() != 0) throw new CloudIndexClientException("bad status: " + res.getStatus());
		} catch (final Throwable e) {
			throw new CloudIndexClientException("Error feeding documents", e);
		}
	}

	public SolrInputDocument prepareSolrDocument(final String record, final String indexDsId, final UnaryFunction<String, String> toIndexRecord)
			throws CloudIndexClientException {
		try {
			final StreamingInputDocumentFactory documentFactory = new StreamingInputDocumentFactory();

			final String version = (new SimpleDateFormat("yyyy-MM-dd\'T\'hh:mm:ss\'Z\'")).format(new Date());
			final String indexRecord = toIndexRecord.evaluate(record);

			if (log.isDebugEnabled()) {
				log.debug("***************************************\nSubmitting index record:\n" + indexRecord + "\n***************************************\n");
			}

			return documentFactory.parseDocument(version, indexRecord, indexDsId, INDEX_RECORD_RESULT_FIELD);
		} catch (final Throwable e) {
			throw new CloudIndexClientException("Error creating solr document", e);
		}
	}

	public boolean isRecordIndexed(final String id) throws CloudIndexClientException {
		final QueryResponse res = query("objidentifier:\"" + id + "\"", null);
		return res.getResults().size() > 0;
	}

	public int remove(final String id) throws CloudIndexClientException {
		return remove(id, true);
	}

	public int remove(final String id, final boolean commit) throws CloudIndexClientException {
		try {
			final UpdateResponse res = solrServer.deleteByQuery("objidentifier:\"" + id + "\"");
			if (commit) {
				solrServer.commit();
			}
			return res.getResponse().size();
		} catch (final Throwable e) {
			throw new CloudIndexClientException("Error removing documents", e);
		}
	}

	public int count(final String query) throws CloudIndexClientException {
		final QueryResponse res = query(query, 0);
		return res.getResults().size();
	}

	public QueryResponse query(final String query, Integer rows) throws CloudIndexClientException {
		try {
			final SolrQuery solrQuery = new SolrQuery();
			solrQuery.setQuery(query);
			if(rows != null && rows >= 0) {
				solrQuery.setRows(rows);
			}
			return solrServer.query(solrQuery);
		} catch (final Throwable e) {
			throw new CloudIndexClientException("Error searching documents", e);
		}
	}

	public void close() {
		if (solrServer != null) {
			solrServer.shutdown();
		}
	}

	public interface AfterFeedingCallback {

		void doAfterFeeding(final UpdateResponse response);
	}
}
