package eu.dnetlib.functionality.index;

import java.io.StringReader;
import java.util.Map;

import eu.dnetlib.functionality.cql.CqlValueTransformerMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.io.SAXReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;

import com.google.common.collect.Maps;

import eu.dnetlib.data.provision.index.rmi.IndexServiceException;

import eu.dnetlib.functionality.index.feed.DocumentMapperFactory;
import eu.dnetlib.functionality.index.model.Any.ValueType;
import eu.dnetlib.functionality.index.query.IndexQueryFactory;
import eu.dnetlib.functionality.index.query.SolrIndexQueryFactory;
import eu.dnetlib.functionality.index.query.SolrIndexQueryResponseFactory;
import eu.dnetlib.functionality.index.solr.cql.SolrTypeBasedCqlValueTransformerMapFactory;
import eu.dnetlib.functionality.index.solr.feed.SolrDocumentMapperFactory;
import eu.dnetlib.functionality.index.utils.IndexConfigFactory;
import eu.dnetlib.functionality.index.utils.MetadataReference;
import eu.dnetlib.functionality.index.utils.RemoteSolrAdministrator;
import eu.dnetlib.functionality.index.utils.ZkUtils;

/**
 * The Class SolrIndexServerDAO.
 */
public class SolrIndexServerDAO extends AbstractBackendDescriptor implements IndexServerDAO {

	/**
	 * The log.
	 */
	private static final Log log = LogFactory.getLog(SolrIndexServerDAO.class); // NOPMD by marko on 11/24/08 5:02 PM

	/** The zk utils. */
	@Autowired
	private ZkUtils zkUtils;

	/** The query response factory. */
	@Autowired
	private SolrIndexQueryResponseFactory queryResponseFactory;

	@Autowired
	private SolrIndexQueryFactory solrIndexQueryFactory;

	/** The solr document mapper factory. */
	@Autowired
	private SolrDocumentMapperFactory solrDocumentMapperFactory;

	/** The solr administrator. */
	private RemoteSolrAdministrator solrAdministrator;

	/** The solr type based cql value transformer map. */
	@Autowired
	private SolrTypeBasedCqlValueTransformerMapFactory tMapFactory;

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.IndexServerDAO#createIndexCollection(eu.dnetlib.functionality.index.utils.MetadataReference,
	 *      java.lang.String)
	 */
	@Override
	public void createIndexCollection(final MetadataReference mdref, final String fields) throws IndexServiceException {
		CloudSolrServer server = null;
		try {

			server = getServer();
			server.connect();

			if (!solrAdministrator.indexCollectionExists(mdref.toString(), server)) {
				Map<String, String> params = Maps.newHashMap();

				final Map<String, String> p = getServiceProperties();
				params.put("numShards", p.get("numShards"));
				params.put("replicationFactor", p.get("replicationFactor"));

				for (IndexConfigFactory.CONFIG_PARAMS param_Name : IndexConfigFactory.CONFIG_PARAMS.values()) {
					params.put(param_Name.toString(), p.get(param_Name.toString()));
				}

				zkUtils.uploadZookeperConfig(server.getZkStateReader().getZkClient(), mdref.toString(), parse(fields), params, false);
				solrAdministrator.createSolrIndex(p.get("host"), p.get("port"), mdref.toString(), p.get("numShards"), p.get("replicationFactor"),
						mdref.toString(), p.get("maxShardsPerNode"));
			}
			server.getZkStateReader().close();

		} catch (Exception e) {
			log.error("Error on creating IndexCollection", e);
			throw new IndexServiceException("Error on creating IndexCollection", e);
		} finally {
			if (server != null) {
				server.shutdown();
			}
		}
	}

	@Override
	public void updateIndexCollection(final MetadataReference mdRef, final Document fields) throws IndexServiceException {
		CloudSolrServer server = null;
		try {
			server = getServer();
			server.connect();
			Map<String, String> params = Maps.newHashMap();

			params.put("numShards", getServiceProperties().get("numShards"));
			params.put("replicationFactor", getServiceProperties().get("replicationFactor"));

			for (IndexConfigFactory.CONFIG_PARAMS param_Name : IndexConfigFactory.CONFIG_PARAMS.values()) {
				params.put(param_Name.toString(), getServiceProperties().get(param_Name.toString()));
			}

			zkUtils.uploadZookeperConfig(server.getZkStateReader().getZkClient(), mdRef.toString(), fields, params, true);
			server.getZkStateReader().close();
			server.shutdown();
			solrAdministrator.reloadCollection(getServiceProperties().get("host"), getServiceProperties().get("port"), mdRef.toString());

		} catch (Exception e) {
			log.error("Error on updating IndexCollection", e);
			throw new IndexServiceException("Error on updating IndexCollection", e);
		} finally {
			if (server != null) {
				server.shutdown();
			}
		}
	}

	/**
	 * Parses the fields parameter.
	 * 
	 * @param fields
	 *            the fields
	 * @return the document
	 * @throws IndexServiceException
	 *             the index service exception
	 */
	private Document parse(final String fields) throws IndexServiceException {
		try {
			return new SAXReader().read(new StringReader(fields));
		} catch (DocumentException e) {
			throw new IndexServiceException("unable to parse fields: " + fields, e);
		}
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.IndexServerDAO#getIndexCollection(eu.dnetlib.functionality.index.utils.MetadataReference)
	 */
	@Override
	public IndexCollection getIndexCollection(final MetadataReference mdref) throws IndexServiceException {
		CloudSolrServer newServer = getServer(mdref);
		return new SolrIndexCollection(newServer);
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.IndexServerDAO#getSchema(MetadataReference)
	 */
	@Override
	public Map<String, ValueType> getSchema(final MetadataReference mdRef) throws IndexServiceException {
		CloudSolrServer server = getServer(mdRef);
		Map<String, ValueType> fields = solrAdministrator.getFieldNamesAndTypes(mdRef.toString(), server);
		server.shutdown();
		return fields;
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.IndexServerDAO#getCqlValueTransformerMap(MetadataReference)
	 */
	@Override
	public CqlValueTransformerMap getCqlValueTransformerMap(final MetadataReference mdRef) throws IndexServiceException {
		return tMapFactory.getIt(getSchema(mdRef));
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.IndexServerDAO#getDocumentMapperFactory()
	 */
	@Override
	public DocumentMapperFactory getDocumentMapperFactory() throws IndexServiceException {
		return solrDocumentMapperFactory;
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.IndexServerDAO#shutdown(MetadataReference)
	 */
	@Override
	public void shutdown(final MetadataReference mdRef) throws IndexServiceException {
		getServer(mdRef).shutdown();

	}

	/**
	 * Gets a server with the default collection set according to the given mdRef.
	 * 
	 * @param mdRef
	 *            the md ref
	 * @return a server instance
	 * @throws IndexServiceException
	 *             the index service exception
	 */
	private CloudSolrServer getServer(final MetadataReference mdRef) throws IndexServiceException {
		CloudSolrServer server = getServer();
		server.setDefaultCollection(mdRef.toString());
		return server;
	}

	/**
	 * Gets the server.
	 * 
	 * @return a server instance
	 */
	private CloudSolrServer getServer() {
		String address = getEndpoint().get(ADDRESS);
		log.info("connecting to address: " + address);
		return new CloudSolrServer(address);

	}

	/**
	 * Gets the solr administrator.
	 * 
	 * @return the solrAdministrator
	 */
	public RemoteSolrAdministrator getSolrAdministrator() {
		return solrAdministrator;
	}

	/**
	 * Sets the solr administrator.
	 * 
	 * @param solrAdministrator
	 *            the solrAdministrator to set
	 */
	@Required
	public void setSolrAdministrator(final RemoteSolrAdministrator solrAdministrator) {
		this.solrAdministrator = solrAdministrator;
	}

	@Override
	public IndexQueryFactory getIndexQueryFactory() {
		return solrIndexQueryFactory;
	}

}
