package eu.dnetlib.index.solr;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;

import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;

import eu.dnetlib.clients.index.model.document.IndexDocument;
import eu.dnetlib.clients.index.utils.IndexFieldUtility;
import eu.dnetlib.index.IndexCollection;
import eu.dnetlib.index.solr.model.SolrIndexDocument;
import eu.dnetlib.rmi.provision.IndexServiceException;

/**
 * The Class SolrIndexCollection.
 */
public class SolrIndexCollection implements IndexCollection {

	/**
	 * The log.
	 */
	private static final Log log = LogFactory.getLog(SolrIndexCollection.class); // NOPMD by marko on 11/24/08 5:02 PM

	/** The Constant STATUS_INDEX_OK. */
	public static final int STATUS_INDEX_OK = 0;

	/** The client. */
	private CloudSolrClient client;

	private boolean shutdown = false;

	/**
	 * The Constructor.
	 *
	 * @param newServer
	 *            the client
	 */
	public SolrIndexCollection(final CloudSolrClient newServer) {
		this.client = newServer;
		client.connect();
	}

	/**
	 * {@inheritDoc}
	 *
	 * @see eu.dnetlib.index.IndexCollection#add(eu.dnetlib.index.model.document.IndexDocument)
	 */
	@Override
	public boolean add(final IndexDocument doc) throws IndexServiceException {
		if (isShutdown()) throw new IndexServiceException("Please get another SolrIndexCollection: this has been shut down");

		final SolrIndexDocument solrDocument = (SolrIndexDocument) doc;
		try {
			final UpdateResponse response = client.add(solrDocument.getSolrDocument());
			return response.getStatus() == 0;
		} catch (final Exception e) {
			throw new IndexServiceException("Unable to add document", e);
		}
	}

	/**
	 * {@inheritDoc}
	 *
	 * @see eu.dnetlib.index.IndexCollection#addAll(java.util.Iterator)
	 */
	@Override
	public boolean addAll(final Iterator<IndexDocument> docs) throws IndexServiceException {
		if (isShutdown()) throw new IndexServiceException("Please get another SolrIndexCollection: this has been shut down");
		final Iterator<SolrInputDocument> solrDocs = Iterators.transform(docs, new Function<IndexDocument, SolrInputDocument>() {

			@Override
			public SolrInputDocument apply(final IndexDocument doc) {
				final SolrIndexDocument solrDocument = (SolrIndexDocument) doc;
				return solrDocument.getSolrDocument();
			}
		});

		try {
			final UpdateResponse response = client.add(Lists.newArrayList(solrDocs));
			return response.getStatus() == 0;
		} catch (final Exception e) {
			throw new IndexServiceException("Unable to add document", e);
		}
	}

	/**
	 * {@inheritDoc}
	 *
	 * @see eu.dnetlib.index.IndexCollection#addAll(java.util.Collection)
	 */
	@Override
	public boolean addAll(final Collection<IndexDocument> docs) throws IndexServiceException {
		if (isShutdown()) throw new IndexServiceException("Please get another SolrIndexCollection: this has been shut down");
		return addAll(docs.iterator());
	}

	/**
	 * {@inheritDoc}
	 *
	 * @see eu.dnetlib.index.IndexCollection#deleteIndex(java.lang.String)
	 */
	@Override
	public boolean deleteIndex(final String dsId) throws IndexServiceException {
		if (isShutdown()) throw new IndexServiceException("Please get another SolrIndexCollection: this has been shut down");
		return doDelete(IndexFieldUtility.DS_ID + " : \"" + dsId + "\"");
	}

	/**
	 * {@inheritDoc}
	 *
	 * @see eu.dnetlib.index.IndexCollection#deleteByQuery(java.lang.String, java.lang.String)
	 */
	@Override
	public boolean deleteByQuery(final String query, final String dsId) throws IndexServiceException {
		if (isShutdown()) throw new IndexServiceException("Please get another SolrIndexCollection: this has been shut down");
		if (StringUtils.isBlank(dsId)) return doDelete(query);
		return doDelete(query + " AND " + IndexFieldUtility.DS_ID + " : \"" + dsId + "\"");
	}

	/**
	 * Do delete.
	 *
	 * @param query
	 *            the query
	 * @return true, if do delete
	 * @throws IndexServiceException
	 *             the index service exception
	 */
	protected boolean doDelete(final String query) throws IndexServiceException {
		if (isShutdown()) throw new IndexServiceException("Please get another SolrIndexCollection: this has been shut down");
		try {
			log.debug("delete by query: " + query);
			return client.deleteByQuery(query).getStatus() == STATUS_INDEX_OK;
		} catch (final Exception e) {
			throw new IndexServiceException("unable to run delete by query: " + query, e);
		}
	}

	/**
	 * {@inheritDoc}
	 *
	 * @see eu.dnetlib.index.IndexCollection#commit()
	 */
	@Override
	public boolean commit() throws IndexServiceException {
		if (isShutdown()) throw new IndexServiceException("Please get another SolrIndexCollection: this has been shut down");
		try {
			log.info("performing commit");
			final UpdateResponse rsp = client.commit();
			log.info(String.format("commit completed in %s, status %s", rsp.getElapsedTime(), rsp.getStatus()));
			return rsp.getStatus() == STATUS_INDEX_OK;
		} catch (final Throwable e) {
			throw new IndexServiceException("unable to perform index commit", e);
		}
	}

	@Override
	public void shutdown() {
		try {
			client.close();
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
		shutdown = true;
	}

	public boolean isShutdown() {
		return shutdown;
	}

	public void setShutdown(final boolean shutdown) {
		this.shutdown = shutdown;
	}

}
