package eu.dnetlib.functionality.index;

import com.google.common.collect.Maps;
import eu.dnetlib.data.provision.index.rmi.IndexServiceException;
import eu.dnetlib.functionality.index.cql.CqlValueTransformerMap;
import eu.dnetlib.functionality.index.feed.DocumentMapperFactory;
import eu.dnetlib.functionality.index.model.Any.ValueType;
import eu.dnetlib.functionality.index.query.IndexQueryFactory;
import eu.dnetlib.functionality.index.query.SolrIndexQueryFactory;
import eu.dnetlib.functionality.index.query.SolrIndexQueryResponseFactory;
import eu.dnetlib.functionality.index.solr.cql.SolrTypeBasedCqlValueTransformerMapFactory;
import eu.dnetlib.functionality.index.solr.feed.SolrDocumentMapperFactory;
import eu.dnetlib.functionality.index.utils.IndexConfigFactory;
import eu.dnetlib.functionality.index.utils.MetadataReference;
import eu.dnetlib.functionality.index.utils.RemoteSolrAdministrator;
import eu.dnetlib.functionality.index.utils.ZkUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.io.SAXReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;

import java.io.IOException;
import java.io.StringReader;
import java.util.Map;

/**
 * The Class SolrIndexServerDAO.
 */
public class SolrIndexServerDAO extends AbstractBackendDescriptor implements IndexServerDAO {

    /**
     * The log.
     */
    private static final Log log = LogFactory.getLog(SolrIndexServerDAO.class); // NOPMD by marko on 11/24/08 5:02 PM

    /**
     * The zk utils.
     */
    @Autowired
    private ZkUtils zkUtils;

    /**
     * The query response factory.
     */
    @Autowired
    private SolrIndexQueryResponseFactory queryResponseFactory;

    @Autowired
    private SolrIndexQueryFactory solrIndexQueryFactory;

    /**
     * The solr document mapper factory.
     */
    @Autowired
    private SolrDocumentMapperFactory solrDocumentMapperFactory;

    /**
     * The solr administrator.
     */
    private RemoteSolrAdministrator solrAdministrator;

    /**
     * The solr type based cql value transformer map.
     */
    @Autowired
    private SolrTypeBasedCqlValueTransformerMapFactory tMapFactory;

    /**
     * {@inheritDoc}
     *
     * @see eu.dnetlib.functionality.index.IndexServerDAO#createIndexCollection(eu.dnetlib.functionality.index.utils.MetadataReference,
     * java.lang.String)
     */
    @Override
    public void createIndexCollection(final MetadataReference mdref, final String fields) throws IndexServiceException {
        CloudSolrClient server = null;
        try {

            server = getServer();
            server.connect();
            final Map<String, String> p = getServiceProperties();

            if (!solrAdministrator.indexCollectionExists(mdref.toString(), p.get("host"), p.get("port"))) {
                Map<String, String> params = Maps.newHashMap();


                params.put("numShards", p.get("numShards"));
                params.put("replicationFactor", p.get("replicationFactor"));

                for (IndexConfigFactory.CONFIG_PARAMS param_Name : IndexConfigFactory.CONFIG_PARAMS.values()) {
                    params.put(param_Name.toString(), p.get(param_Name.toString()));
                }
                params.put(IndexConfigFactory.CONFIG_PARAMS.indexDataDir.toString(), mdref.toString());

                zkUtils.uploadZookeperConfig(server.getZkStateReader().getZkClient(), mdref.toString(), parse(fields), params, false);
                solrAdministrator.createSolrIndex(p.get("host"), p.get("port"), mdref.toString(), p.get("numShards"), p.get("replicationFactor"),
                        mdref.toString());
            }
            server.getZkStateReader().close();

        } catch (Exception e) {
            log.error("Error on creating IndexCollection", e);
            throw new IndexServiceException("Error on creating IndexCollection", e);
        } finally {
            if (server != null) {
                closeClient(server, "An error occur on closing client to solr");
            }
        }
    }

    @Override
    public void updateIndexCollection(final MetadataReference mdRef, final Document fields) throws IndexServiceException {
        CloudSolrClient server = null;
        try {
            server = getServer();
            server.connect();
            Map<String, String> params = Maps.newHashMap();

            params.put("numShards", getServiceProperties().get("numShards"));
            params.put("replicationFactor", getServiceProperties().get("replicationFactor"));

            for (IndexConfigFactory.CONFIG_PARAMS param_Name : IndexConfigFactory.CONFIG_PARAMS.values()) {
                params.put(param_Name.toString(), getServiceProperties().get(param_Name.toString()));
            }
            params.put(IndexConfigFactory.CONFIG_PARAMS.indexDataDir.toString(), mdRef.toString());
            zkUtils.uploadZookeperConfig(server.getZkStateReader().getZkClient(), mdRef.toString(), fields, params, true);
            server.getZkStateReader().close();
            closeClient(server, "An error occur on closing client to solr");
            solrAdministrator.reloadCollection(getServiceProperties().get("host"), getServiceProperties().get("port"), mdRef.toString());

        } catch (Exception e) {
            log.error("Error on updating IndexCollection", e);
            throw new IndexServiceException("Error on updating IndexCollection", e);
        } finally {
            if (server != null) {
                closeClient(server, "An Error occur on closing the solr client");
            }
        }
    }

    protected void closeClient(CloudSolrClient server, String message) throws IndexServiceException {
        try {
            server.close();
        } catch (IOException e) {
            throw new IndexServiceException(message, e);
        }
    }

    /**
     * Parses the fields parameter.
     *
     * @param fields the fields
     * @return the document
     * @throws IndexServiceException the index service exception
     */
    private Document parse(final String fields) throws IndexServiceException {
        try {
            return new SAXReader().read(new StringReader(fields));
        } catch (DocumentException e) {
            throw new IndexServiceException("unable to parse fields: " + fields, e);
        }
    }

    /**
     * {@inheritDoc}
     *
     * @see eu.dnetlib.functionality.index.IndexServerDAO#getIndexCollection(eu.dnetlib.functionality.index.utils.MetadataReference)
     */
    @Override
    public IndexCollection getIndexCollection(final MetadataReference mdref) throws IndexServiceException {
        CloudSolrClient newServer = getServer(mdref);
        return new SolrIndexCollection(newServer);
    }

    /**
     * {@inheritDoc}
     *
     * @see eu.dnetlib.functionality.index.IndexServerDAO#getSchema()
     */
    @Override
    public Map<String, ValueType> getSchema(final MetadataReference mdRef) throws IndexServiceException {
        CloudSolrClient client = getServer(mdRef);
        Map<String, ValueType> fields = solrAdministrator.getFieldNamesAndTypes(mdRef.toString(), client);
        closeClient(client, "Cannot close the connection to the CloudSolrClient");
        return fields;
    }

    /**
     * {@inheritDoc}
     *
     * @see eu.dnetlib.functionality.index.IndexServerDAO#getCqlValueTransformerMap()
     */
    @Override
    public CqlValueTransformerMap getCqlValueTransformerMap(final MetadataReference mdRef) throws IndexServiceException {
        return tMapFactory.getIt(getSchema(mdRef));
    }

    /**
     * {@inheritDoc}
     *
     * @see eu.dnetlib.functionality.index.IndexServerDAO#getDocumentMapperFactory()
     */
    @Override
    public DocumentMapperFactory getDocumentMapperFactory() throws IndexServiceException {
        return solrDocumentMapperFactory;
    }

    /**
     * {@inheritDoc}
     *
     * @see eu.dnetlib.functionality.index.IndexServerDAO#shutdown()
     */
    @Override
    public void shutdown(final MetadataReference mdRef) throws IndexServiceException {
        closeClient(getServer(mdRef), "Error on closing client");

    }

    /**
     * Gets a server with the default collection set according to the given mdRef.
     *
     * @param mdRef the md ref
     * @return a server instance
     * @throws IndexServiceException the index service exception
     */
    private CloudSolrClient getServer(final MetadataReference mdRef) throws IndexServiceException {
        CloudSolrClient server = getServer();
        server.setDefaultCollection(mdRef.toString());
        return server;
    }

    /**
     * Gets the server.
     *
     * @return a server instance
     */
    private CloudSolrClient getServer() {
        String address = getEndpoint().get(ADDRESS);
        log.info("connecting to address: " + address);
        return new CloudSolrClient(address);

    }

    /**
     * Gets the solr administrator.
     *
     * @return the solrAdministrator
     */
    public RemoteSolrAdministrator getSolrAdministrator() {
        return solrAdministrator;
    }

    /**
     * Sets the solr administrator.
     *
     * @param solrAdministrator the solrAdministrator to set
     */
    @Required
    public void setSolrAdministrator(final RemoteSolrAdministrator solrAdministrator) {
        this.solrAdministrator = solrAdministrator;
    }

    @Override
    public IndexQueryFactory getIndexQueryFactory() {
        return solrIndexQueryFactory;
    }

}
