package eu.dnetlib.index.solr;

import java.io.IOException;
import java.io.StringReader;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.io.SAXReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;

import com.google.common.collect.Maps;

import eu.dnetlib.clients.index.model.Any.ValueType;
import eu.dnetlib.clients.index.query.IndexQueryFactory;
import eu.dnetlib.cql.CqlValueTransformerMap;
import eu.dnetlib.index.AbstractBackendDescriptor;
import eu.dnetlib.index.IndexCollection;
import eu.dnetlib.index.IndexServerDAO;
import eu.dnetlib.index.feed.DocumentMapperFactory;
import eu.dnetlib.index.query.SolrIndexQueryFactory;
import eu.dnetlib.index.query.SolrIndexQueryResponseFactory;
import eu.dnetlib.index.solr.cql.SolrTypeBasedCqlValueTransformerMapFactory;
import eu.dnetlib.index.solr.feed.SolrDocumentMapperFactory;
import eu.dnetlib.index.utils.IndexConfigFactory;
import eu.dnetlib.index.utils.RemoteSolrAdministrator;
import eu.dnetlib.index.utils.ZkServers;
import eu.dnetlib.index.utils.ZkUtils;
import eu.dnetlib.rmi.provision.IndexServiceException;
import eu.dnetlib.utils.MetadataReference;

/**
 * The Class SolrIndexServerDAO.
 */
public class SolrIndexServerDAO extends AbstractBackendDescriptor implements IndexServerDAO {

	/**
	 * The log.
	 */
	private static final Log log = LogFactory.getLog(SolrIndexServerDAO.class); // NOPMD by marko on 11/24/08 5:02 PM

	/** The zk utils. */
	@Autowired
	private ZkUtils zkUtils;

	/** The query response factory. */
	@Autowired
	private SolrIndexQueryResponseFactory queryResponseFactory;

	@Autowired
	private SolrIndexQueryFactory solrIndexQueryFactory;

	/** The solr document mapper factory. */
	@Autowired
	private SolrDocumentMapperFactory solrDocumentMapperFactory;

	/** The solr administrator. */
	private RemoteSolrAdministrator solrAdministrator;

	/** The solr type based cql value transformer map. */
	@Autowired
	private SolrTypeBasedCqlValueTransformerMapFactory tMapFactory;

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.index.IndexServerDAO#createIndexCollection(eu.dnetlib.index.utils.MetadataReference,
	 *      java.lang.String)
	 */
	@Override
	public void createIndexCollection(final MetadataReference mdref, final String fields) throws IndexServiceException {
		
		log.debug("createIndexCollection  " + mdref.toString() + "   " + fields );
		
		try(CloudSolrClient client = getClient()) {

			client.connect();

			if (!solrAdministrator.indexCollectionExists(mdref.toString(), client)) {
				Map<String, String> params = Maps.newHashMap();

				final Map<String, String> p = getServiceProperties();
				params.put("numShards", p.get("numShards"));
				params.put("replicationFactor", p.get("replicationFactor"));

				for (IndexConfigFactory.CONFIG_PARAMS param_Name : IndexConfigFactory.CONFIG_PARAMS.values()) {
					params.put(param_Name.toString(), p.get(param_Name.toString()));
				}

				zkUtils.uploadZookeperConfig(client.getZkStateReader().getZkClient(), mdref.toString(), parse(fields), params, true);
				solrAdministrator.createSolrIndex(p.get("host"), p.get("port"), mdref.toString(), p.get("numShards"), p.get("replicationFactor"),
						mdref.toString(), p.get("maxShardsPerNode"));
			}
			client.getZkStateReader().close();

		} catch (Exception e) {
			log.error("Error on creating IndexCollection", e);
			throw new IndexServiceException("Error on creating IndexCollection", e);
		}
	}

	@Override
	public void updateIndexCollection(final MetadataReference mdRef, final Document fields) throws IndexServiceException {
		try(CloudSolrClient client = getClient()) {

			client.connect();
			Map<String, String> params = Maps.newHashMap();

			params.put("numShards", getServiceProperties().get("numShards"));
			params.put("replicationFactor", getServiceProperties().get("replicationFactor"));

			for (IndexConfigFactory.CONFIG_PARAMS param_Name : IndexConfigFactory.CONFIG_PARAMS.values()) {
				params.put(param_Name.toString(), getServiceProperties().get(param_Name.toString()));
			}

			zkUtils.uploadZookeperConfig(client.getZkStateReader().getZkClient(), mdRef.toString(), fields, params, true);
			client.getZkStateReader().close();
			solrAdministrator.reloadCollection(getServiceProperties().get("host"), getServiceProperties().get("port"), mdRef.toString());

		} catch (Exception e) {
			log.error("Error on updating IndexCollection", e);
			throw new IndexServiceException("Error on updating IndexCollection", e);
		}
	}

	/**
	 * Parses the fields parameter.
	 * 
	 * @param fields
	 *            the fields
	 * @return the document
	 * @throws IndexServiceException
	 *             the index service exception
	 */
	private Document parse(final String fields) throws IndexServiceException {
		try {
			return new SAXReader().read(new StringReader(fields));
		} catch (DocumentException e) {
			throw new IndexServiceException("unable to parse fields: " + fields, e);
		}
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.index.IndexServerDAO#getIndexCollection(eu.dnetlib.index.utils.MetadataReference)
	 */
	@Override
	public IndexCollection getIndexCollection(final MetadataReference mdref) throws IndexServiceException {
		CloudSolrClient client = getClient(mdref);
		return new SolrIndexCollection(client);
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.index.IndexServerDAO#getSchema(MetadataReference)
	 */
	@Override
	public Map<String, ValueType> getSchema(final MetadataReference mdRef) throws IndexServiceException {
		CloudSolrClient client = getClient(mdRef);
		Map<String, ValueType> fields = solrAdministrator.getFieldNamesAndTypes(mdRef.toString(), client);
		shutdown(mdRef);
		return fields;
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.index.IndexServerDAO#getCqlValueTransformerMap(MetadataReference)
	 */
	@Override
	public CqlValueTransformerMap getCqlValueTransformerMap(final MetadataReference mdRef) throws IndexServiceException {
		return tMapFactory.getIt(getSchema(mdRef));
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.index.IndexServerDAO#getDocumentMapperFactory()
	 */
	@Override
	public DocumentMapperFactory getDocumentMapperFactory() throws IndexServiceException {
		return solrDocumentMapperFactory;
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.index.IndexServerDAO#shutdown(MetadataReference)
	 */
	@Override
	public void shutdown(final MetadataReference mdRef) throws IndexServiceException {
		try {
			getClient(mdRef).close();
		} catch (IOException e) {
			throw new IndexServiceException(e);
		}

	}

	/**
	 * Gets a server with the default collection set according to the given mdRef.
	 * 
	 * @param mdRef
	 *            the md ref
	 * @return a server instance
	 * @throws IndexServiceException
	 *             the index service exception
	 */
	private CloudSolrClient getClient(final MetadataReference mdRef) throws IndexServiceException {
		CloudSolrClient client = getClient();
		client.setDefaultCollection(mdRef.toString());
		return client;
	}

	/**
	 * Gets the server.
	 * 
	 * @return a server instance
	 */
	private CloudSolrClient getClient() {
		String address = getEndpoint().get(ADDRESS);
		log.info("connecting to address: " + address);

		final ZkServers zk = ZkServers.newInstance(address);
		return new CloudSolrClient.Builder(zk.getHosts(), zk.getChroot()).build();
	}

	/**
	 * Gets the solr administrator.
	 * 
	 * @return the solrAdministrator
	 */
	public RemoteSolrAdministrator getSolrAdministrator() {
		return solrAdministrator;
	}

	/**
	 * Sets the solr administrator.
	 * 
	 * @param solrAdministrator
	 *            the solrAdministrator to set
	 */
	@Required
	public void setSolrAdministrator(final RemoteSolrAdministrator solrAdministrator) {
		this.solrAdministrator = solrAdministrator;
	}

	@Override
	public IndexQueryFactory getIndexQueryFactory() {
		return solrIndexQueryFactory;
	}

}
