package eu.dnetlib.functionality.index;

import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import eu.dnetlib.data.provision.index.rmi.IndexServiceException;
import eu.dnetlib.functionality.index.model.document.IndexDocument;
import eu.dnetlib.functionality.index.model.util.SolrIndexDocument;
import eu.dnetlib.functionality.index.utils.IndexFieldUtility;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;

/**
 * The Class SolrIndexCollection.
 */
public class SolrIndexCollection implements IndexCollection {

    /**
     * The Constant STATUS_INDEX_OK.
     */
    public static final int STATUS_INDEX_OK = 0;
    /**
     * The log.
     */
    private static final Log log = LogFactory.getLog(SolrIndexCollection.class); // NOPMD by marko on 11/24/08 5:02 PM
    /**
     * The server.
     */
    private CloudSolrClient server;

    private boolean shutdown = false;

    /**
     * The Constructor.
     *
     * @param newServer            the server
     * @param queryResponseFactory the query response factory
     */
    public SolrIndexCollection(final CloudSolrClient newServer) {
        this.server = newServer;
        server.connect();
    }

    /**
     * {@inheritDoc}
     *
     * @see eu.dnetlib.functionality.index.IndexCollection#add(eu.dnetlib.functionality.index.model.document.IndexDocument)
     */
    @Override
    public boolean add(final IndexDocument doc) throws IndexServiceException {
        if (isShutdown())
            throw new IndexServiceException("Please get another SolrIndexCollection: this has been shut down");
        final SolrIndexDocument solrDocument = (SolrIndexDocument) doc;
        try {
            final UpdateResponse response = server.add(solrDocument.getSolrDocument());
            return response.getStatus() == 0;
        } catch (final Exception e) {
            throw new IndexServiceException("Unable to add document", e);
        }
    }

    /**
     * {@inheritDoc}
     *
     * @see eu.dnetlib.functionality.index.IndexCollection#addAll(java.util.Iterator)
     */
    @Override
    public boolean addAll(final Iterator<IndexDocument> docs) throws IndexServiceException {
        if (isShutdown())
            throw new IndexServiceException("Please get another SolrIndexCollection: this has been shut down");
        final Iterator<SolrInputDocument> solrDocs = Iterators.transform(docs, new Function<IndexDocument, SolrInputDocument>() {

            @Override
            public SolrInputDocument apply(final IndexDocument doc) {
                final SolrIndexDocument solrDocument = (SolrIndexDocument) doc;
                return solrDocument.getSolrDocument();
            }
        });

        try {
            final UpdateResponse response = server.add(Lists.newArrayList(solrDocs));
            return response.getStatus() == 0;
        } catch (final Exception e) {
            throw new IndexServiceException("Unable to add document", e);
        }
    }

    /**
     * {@inheritDoc}
     *
     * @see eu.dnetlib.functionality.index.IndexCollection#addAll(java.util.Collection)
     */
    @Override
    public boolean addAll(final Collection<IndexDocument> docs) throws IndexServiceException {
        if (isShutdown())
            throw new IndexServiceException("Please get another SolrIndexCollection: this has been shut down");
        return addAll(docs.iterator());
    }

    /**
     * {@inheritDoc}
     *
     * @see eu.dnetlib.functionality.index.IndexCollection#deleteIndex(java.lang.String)
     */
    @Override
    public boolean deleteIndex(final String dsId) throws IndexServiceException {
        if (isShutdown())
            throw new IndexServiceException("Please get another SolrIndexCollection: this has been shut down");
        return doDelete(IndexFieldUtility.DS_ID + " : \"" + dsId + "\"");
    }

    /**
     * {@inheritDoc}
     *
     * @see eu.dnetlib.functionality.index.IndexCollection#deleteByQuery(java.lang.String, java.lang.String)
     */
    @Override
    public boolean deleteByQuery(final String query, final String dsId) throws IndexServiceException {
        if (isShutdown())
            throw new IndexServiceException("Please get another SolrIndexCollection: this has been shut down");
        if (StringUtils.isBlank(dsId)) return doDelete(query);
        return doDelete(query + " AND " + IndexFieldUtility.DS_ID + " : \"" + dsId + "\"");
    }

    /**
     * Do delete.
     *
     * @param query the query
     * @return true, if do delete
     * @throws IndexServiceException the index service exception
     */
    protected boolean doDelete(final String query) throws IndexServiceException {
        if (isShutdown())
            throw new IndexServiceException("Please get another SolrIndexCollection: this has been shut down");
        try {
            log.debug("delete by query: " + query);
            return server.deleteByQuery(query).getStatus() == STATUS_INDEX_OK;
        } catch (final Exception e) {
            throw new IndexServiceException("unable to run delete by query: " + query, e);
        }
    }

    /**
     * {@inheritDoc}
     *
     * @see eu.dnetlib.functionality.index.IndexCollection#commit()
     */
    @Override
    public boolean commit() throws IndexServiceException {
        if (isShutdown())
            throw new IndexServiceException("Please get another SolrIndexCollection: this has been shut down");
        try {
            log.info("performing commit");
            final UpdateResponse rsp = server.commit();
            log.info(String.format("commit completed in %s, status %s", rsp.getElapsedTime(), rsp.getStatus()));
            return rsp.getStatus() == STATUS_INDEX_OK;
        } catch (final Throwable e) {
            throw new IndexServiceException("unable to perform index commit", e);
        }
    }

    @Override
    public void shutdown() {
        try {
            server.close();
        } catch (IOException e) {
            log.error("Error on closing client", e);
        }
        shutdown = true;
    }

    public boolean isShutdown() {
        return shutdown;
    }

    public void setShutdown(final boolean shutdown) {
        this.shutdown = shutdown;
    }

}
