package eu.dnetlib.index.solr;

import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Map;

import eu.dnetlib.clients.index.model.Any.ValueType;
import eu.dnetlib.clients.index.query.IndexQueryFactory;
import eu.dnetlib.utils.MetadataReference;
import eu.dnetlib.index.AbstractBackendDescriptor;
import eu.dnetlib.index.IndexCollection;
import eu.dnetlib.index.IndexServerDAO;
import eu.dnetlib.cql.CqlValueTransformerMap;
import eu.dnetlib.index.feed.DocumentMapperFactory;
import eu.dnetlib.index.query.SolrIndexQueryFactory;
import eu.dnetlib.index.query.SolrIndexQueryResponseFactory;
import eu.dnetlib.index.solr.cql.SolrTypeBasedCqlValueTransformerMapFactory;
import eu.dnetlib.index.solr.feed.SolrDocumentMapperFactory;
import eu.dnetlib.index.utils.IndexConfigFactory;
import eu.dnetlib.index.utils.RemoteSolrAdministrator;
import eu.dnetlib.index.utils.ZkUtils;
import eu.dnetlib.rmi.provision.IndexServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.io.SAXReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;

/**
 * The Class SolrIndexServerDAO.
 */
public class SolrIndexServerDAO extends AbstractBackendDescriptor implements IndexServerDAO {

	/**
	 * The log.
	 */
	private static final Log log = LogFactory.getLog(SolrIndexServerDAO.class); // NOPMD by marko on 11/24/08 5:02 PM

	/**
	 * The zk utils.
	 */
	@Autowired
	private ZkUtils zkUtils;

	/**
	 * The query response factory.
	 */
	@Autowired
	private SolrIndexQueryResponseFactory queryResponseFactory;

	@Autowired
	private SolrIndexQueryFactory solrIndexQueryFactory;

	/**
	 * The solr document mapper factory.
	 */
	@Autowired
	private SolrDocumentMapperFactory solrDocumentMapperFactory;

	/**
	 * The solr administrator.
	 */
	private RemoteSolrAdministrator solrAdministrator;

	/**
	 * The solr type based cql value transformer map.
	 */
	@Autowired
	private SolrTypeBasedCqlValueTransformerMapFactory tMapFactory;

	/**
	 * {@inheritDoc}
	 * <p>
	 * String)
	 */
	@Override
	public void createIndexCollection(final MetadataReference mdref, final String fields) throws IndexServiceException {
		CloudSolrClient client = null;
		try {

			client = getClient();
			client.connect();
			final Map<String, String> p = getServiceProperties();

			if (!solrAdministrator.indexCollectionExists(mdref.toString(), client, p.get("host"), p.get("port"))) {
				Map<String, String> params = new HashMap<>();

				params.put("numShards", p.get("numShards"));
				params.put("replicationFactor", p.get("replicationFactor"));

				for (IndexConfigFactory.CONFIG_PARAMS param_Name : IndexConfigFactory.CONFIG_PARAMS.values()) {
					params.put(param_Name.toString(), p.get(param_Name.toString()));
				}
				params.put(IndexConfigFactory.CONFIG_PARAMS.indexDataDir.toString(), mdref.toString());

				zkUtils.uploadZookeperConfig(client.getZkStateReader().getZkClient(), mdref.toString(), parse(fields), params, false);
				solrAdministrator.createSolrIndex(p.get("host"), p.get("port"), mdref.toString(), p.get("numShards"), p.get("replicationFactor"),
						mdref.toString());
			}
			client.getZkStateReader().close();

		} catch (Exception e) {
			log.error("Error on creating IndexCollection", e);
			throw new IndexServiceException("Error on creating IndexCollection", e);
		} finally {
			if (client != null) {
				try {
					client.close();
				} catch (IOException e) {
					throw new IndexServiceException("Error while closing client", e);
				}
			}
		}
	}

	@Override
	public void updateIndexCollection(final MetadataReference mdRef, final Document fields) throws IndexServiceException {
		CloudSolrClient client = null;
		try {
			client = getClient();
			client.connect();
			Map<String, String> params = new HashMap<>();

			params.put("numShards", getServiceProperties().get("numShards"));
			params.put("replicationFactor", getServiceProperties().get("replicationFactor"));

			for (IndexConfigFactory.CONFIG_PARAMS param_Name : IndexConfigFactory.CONFIG_PARAMS.values()) {
				params.put(param_Name.toString(), getServiceProperties().get(param_Name.toString()));
			}
			params.put(IndexConfigFactory.CONFIG_PARAMS.indexDataDir.toString(), mdRef.toString());
			zkUtils.uploadZookeperConfig(client.getZkStateReader().getZkClient(), mdRef.toString(), fields, params, true);
			client.getZkStateReader().close();
			client.shutdown();
			solrAdministrator.reloadCollection(getServiceProperties().get("host"), getServiceProperties().get("port"), mdRef.toString());

		} catch (Exception e) {
			log.error("Error on updating IndexCollection", e);
			throw new IndexServiceException("Error on updating IndexCollection", e);
		} finally {
			if (client != null) {
				client.shutdown();
			}
		}
	}

	/**
	 * Parses the fields parameter.
	 *
	 * @param fields the fields
	 * @return the document
	 * @throws IndexServiceException the index service exception
	 */
	private Document parse(final String fields) throws IndexServiceException {
		try {
			return new SAXReader().read(new StringReader(fields));
		} catch (DocumentException e) {
			throw new IndexServiceException("unable to parse fields: " + fields, e);
		}
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public IndexCollection getIndexCollection(final MetadataReference mdref) throws IndexServiceException {
		CloudSolrClient newServer = getClient(mdref);
		try {
			newServer.ping();
		} catch (SolrServerException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return new SolrIndexCollection(newServer);
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public Map<String, ValueType> getSchema(final MetadataReference mdRef) throws IndexServiceException {
		CloudSolrClient server = getClient(mdRef);
		Map<String, ValueType> fields = solrAdministrator.getFieldNamesAndTypes(mdRef.toString(), server);
		try {
			server.close();
		} catch (IOException e) {
			throw new IndexServiceException("Error on closing client");
		}
		return fields;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public CqlValueTransformerMap getCqlValueTransformerMap(final MetadataReference mdRef) throws IndexServiceException {
		return tMapFactory.getIt(getSchema(mdRef));
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public DocumentMapperFactory getDocumentMapperFactory() throws IndexServiceException {
		return solrDocumentMapperFactory;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void shutdown(final MetadataReference mdRef) throws IndexServiceException {
		try {
			getClient(mdRef).close();
		} catch (IOException e) {
			throw new IndexServiceException("Error on closing client", e);
		}

	}

	/**
	 * Gets a server with the default collection set according to the given mdRef.
	 *
	 * @param mdRef the md ref
	 * @return a server instance
	 * @throws IndexServiceException the index service exception
	 */
	private CloudSolrClient getClient(final MetadataReference mdRef) throws IndexServiceException {
		CloudSolrClient client = getClient();
		client.setDefaultCollection(mdRef.toString());
		return client;
	}

	/**
	 * Gets the server.
	 *
	 * @return a server instance
	 */
	private CloudSolrClient getClient() {
		String address = getEndpoint().get(ADDRESS);
		log.info("connecting to address: " + address);
		return new CloudSolrClient(address);

	}

	/**
	 * Gets the solr administrator.
	 *
	 * @return the solrAdministrator
	 */
	public RemoteSolrAdministrator getSolrAdministrator() {
		return solrAdministrator;
	}

	/**
	 * Sets the solr administrator.
	 *
	 * @param solrAdministrator the solrAdministrator to set
	 */
	@Required
	public void setSolrAdministrator(final RemoteSolrAdministrator solrAdministrator) {
		this.solrAdministrator = solrAdministrator;
	}

	@Override
	public IndexQueryFactory getIndexQueryFactory() {
		return solrIndexQueryFactory;
	}

}
