package eu.dnetlib.functionality.index.solr;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.Resource;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.SpellCheckResponse;
import org.apache.solr.client.solrj.response.SpellCheckResponse.Suggestion;
import org.dom4j.DocumentException;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.util.StringUtils;
import org.z3950.zing.cql.CQLParseException;

import com.google.common.base.Function;
import com.google.common.collect.BiMap;
import com.google.common.collect.Iterables;

import eu.dnetlib.common.ws.dataprov.ResultsResponse;
import eu.dnetlib.data.index.IndexServiceException;
import eu.dnetlib.enabling.resultset.client.IterableResultSetClient;
import eu.dnetlib.functionality.index.cql.CqlTranslator;
import eu.dnetlib.functionality.index.cql.CqlValueTransformerMap;
import eu.dnetlib.functionality.index.model.Any.ValueType;
import eu.dnetlib.functionality.index.solr.cql.SolrTypeBasedCqlValueTransformerMapFactory;
import eu.dnetlib.functionality.index.solr.feed.DocumentFeeder;
import eu.dnetlib.functionality.index.solr.feed.DocumentMapperFactory;
import eu.dnetlib.functionality.index.solr.feed.FeedMode;
import eu.dnetlib.functionality.index.solr.feed.FeedResult;
import eu.dnetlib.functionality.index.solr.feed.IndexDocument;
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory;
import eu.dnetlib.functionality.index.solr.query.BrowseAliases;
import eu.dnetlib.functionality.index.solr.query.IndexQuery;
import eu.dnetlib.functionality.index.solr.query.IndexQueryFactory;
import eu.dnetlib.functionality.index.solr.query.QueryLanguage;
import eu.dnetlib.functionality.index.solr.query.QueryResponseFactory;
import eu.dnetlib.functionality.index.solr.query.QueryResponseParser;
import eu.dnetlib.functionality.index.solr.query.Weights;
import eu.dnetlib.functionality.index.solr.suggest.Hint;
import eu.dnetlib.functionality.index.solr.utils.IndexField;
import eu.dnetlib.functionality.index.solr.utils.MetadataReference;
import eu.dnetlib.functionality.index.solr.utils.MetadataReferenceFactory;
import eu.dnetlib.functionality.index.solr.utils.ServiceTools;
import eu.dnetlib.functionality.index.solr.utils.XmlUtils;
import eu.dnetlib.miscutils.collections.Pair;

/**
 * SolrIndexServer.
 * 
 * @author claudio
 * 
 */
public class SolrIndexServer {

	/**
	 * logger.
	 */
	private static final Log log = LogFactory.getLog(SolrIndexServer.class); // NOPMD

	/**
	 * query factory.
	 */
	@Resource
	private transient IndexQueryFactory indexQueryFactory;

	/**
	 * document factory used for the feed process.
	 */
	@Resource
	private transient DocumentMapperFactory docMapperFactory;

	/**
	 * response factory.
	 */
	@Resource
	private transient QueryResponseFactory queryRespFactory;

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.solr.query.Weights
	 */
	@Resource
	private transient Weights weights;

	/**
	 * Alias map for browsing fields.
	 */
	@Resource
	private transient BrowseAliases browseAliases;

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.solr.utils.ServiceTools
	 */
	@Resource
	private transient ServiceTools serviceTools;

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.cql.CqlTranslator
	 */
	@Resource
	private transient CqlTranslator cqlTranslator;

	@Resource
	private transient MetadataReferenceFactory mdFactory;

	@Resource
	private SolrManager solrManager;

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.solr.cql.SolrTypeBasedCqlValueTransformerMapFactory
	 */
	@Resource
	private transient SolrTypeBasedCqlValueTransformerMapFactory tMapFactory;

	/**
	 * if enabled, allows to create an index on the first feed it receives.
	 */
	private boolean registerOnFeed;

	/**
	 * Thread pool used for the feeding process.
	 */
	private final transient ExecutorService threadPool = Executors.newCachedThreadPool();

	/**
	 * initializer.
	 * 
	 * @throws IndexServiceException
	 *             could happen
	 * @throws SolrServerException
	 */
	public void init() throws IndexServiceException, SolrServerException {
		log.info("Initializing SolrIndexServer, trying to resume existing indexes ");

		weights.initialize();
		browseAliases.initialize();
	}

	/**
	 * Creates an index.
	 * 
	 * @param dsId
	 *            the given index dataStructure id
	 * @param mdRef
	 *            the metadataReference that identifies the index.
	 * @param fields
	 *            index fields used to build the schema.
	 * @throws IndexServiceException
	 *             could happen
	 * @throws SolrServerException
	 */
	public void create(final String dsId, final MetadataReference mdRef, final String fields) throws SolrServerException, IndexServiceException {

		if (fields.isEmpty()) { throw new IndexServiceException("No result getting index layout informations"); }

		solrManager.registerServer(mdRef.toString(), XmlUtils.parse(fields));
		browseAliases.put(mdRef);
	}

	/**
	 * feeds the documents provided by the {@link IterableResultSetClient} to the index specified by the dsId.
	 * 
	 * @param dsId
	 * @param version
	 * @param feedMode
	 * @param documents
	 * @throws IndexServiceException
	 */
	public void feed(final String dsId, final String version, final FeedMode feedMode, final IterableResultSetClient documents) throws IndexServiceException {

		final MetadataReference mdRef = serviceTools.getMetadataRef(dsId);
		try {
			if (!solrManager.isRegistered(mdRef) && isRegisterOnFeed()) {
				create(dsId, mdRef, serviceTools.getIndexFields(mdRef));
			}

			final SolrServer solrServer = solrManager.getSolrServer(mdRef);
			final Function<String, IndexDocument> docMapper = docMapperFactory.getMetatadaMapper(solrManager, mdRef, dsId, version);

			final FeedResult res = threadPool.submit(new DocumentFeeder(solrServer, Iterables.transform(documents, docMapper))).get();
			cleanMarkedDocuments(dsId, mdRef);

			if (feedMode.equals(FeedMode.REFRESH)) {
				deleteByVersion(dsId, mdRef, version);
				// otherwise, solr automatically overwrites records with the same objIdentifier: INCREMENTAL mode.
			}

			solrServer.commit();
			log.info("FeedResult: " + res.setTimeElapsed(System.currentTimeMillis()));
		} catch (Exception e) {
			throw new IndexServiceException(e);
		}
	}

	/**
	 * method deletes all the documents that matches the specified cql query from the index that handles the given dsId.
	 * 
	 * @param query
	 * @param dsId
	 * @return
	 * @throws IndexServiceException
	 * @throws CQLParseException
	 * @throws IOException
	 * @throws SolrServerException
	 */
	public void deleteByQuery(final String query, final String dsId, final MetadataReference mdRef, final boolean ignoreDsId) throws IndexServiceException {

		final String cqlQuery = ignoreDsId ? query : query + " and " + IndexField.DS_ID + " exact " + dsId;
		try {
			final String luceneQuery = cqlTranslator.toLucene(cqlQuery);

			log.info("DELETE BY QUERY: " + luceneQuery + " on '" + mdRef.toString() + "' physical index");

			SolrServer solrServer = solrManager.getSolrServer(mdRef);

			solrServer.deleteByQuery(luceneQuery).getElapsedTime();
			solrServer.commit();
		} catch (CQLParseException e) {
			throw new IndexServiceException(e);
		} catch (IOException e) {
			throw new IndexServiceException(e);
		} catch (SolrServerException e) {
			throw new IndexServiceException(e);
		}
	}

	/**
	 * method deletes all the documents of a specified dsId whose {@link IndexMap}.DS_VERSION field is older than the specified
	 * mdFormatVersion.
	 * 
	 * @param dsId
	 * @param mdFormatVersion
	 * @return the time elapsed to complete the operation.
	 * @throws IndexServiceException
	 */
	public void deleteByVersion(final String dsId, final MetadataReference mdRef, final String mdFormatVersion) throws IndexServiceException {

		final String query = IndexField.DS_VERSION + " < \"" + InputDocumentFactory.getParsedDateField(mdFormatVersion) + "\"";
		deleteByQuery(query, dsId, mdRef, false);
	}

	/**
	 * method delete documents where IndexMap.DELETE_DOCUMENT field is true
	 * 
	 * @param dsId
	 * @return the time elapsed to complete the operation.
	 * @throws IndexServiceException
	 * @throws SolrServerException
	 * @throws IOException
	 * @throws CQLParseException
	 */
	public void cleanMarkedDocuments(final String dsId, final MetadataReference mdRef) throws IndexServiceException {

		final String query = IndexField.DELETE_DOCUMENT + " all true ";
		deleteByQuery(query, dsId, mdRef, false);
	}

	/**
	 * method queries the index to get the number of indexed documents of a specific dataStructure.
	 * 
	 * @param dsId
	 * @return the time elapsed to complete the operation.
	 * @throws IndexServiceException
	 * @throws CQLParseException
	 * @throws IOException
	 * @throws SolrServerException
	 * @throws DocumentException
	 */
	public ResultsResponse getNumberOfRecords(final String dsId) throws IndexServiceException {

		final MetadataReference mdRef = serviceTools.getMetadataRef(dsId);
		final IndexQuery query = indexQueryFactory.getIndexQuery(QueryLanguage.SOLR, IndexField.queryAll, mdRef, dsId);
		final QueryResponseParser lookupRes = this.lookup(query, mdRef);

		return newResultsResponse(lookupRes.getStatus(), lookupRes.getNumFound());
	}

	/**
	 * method performs a commit of the physical index delegated to handle the given dsId
	 * 
	 * @param dsId
	 *            needed to retrieve the physical index.
	 * @return the time (ms) elapsed to complete the operation.
	 * @throws IndexServiceException
	 */
	public long commit(final String dsId) throws IndexServiceException {
		try {
			return solrManager.getSolrServer(serviceTools.getMetadataRef(dsId)).commit().getElapsedTime();
		} catch (SolrServerException e) {
			throw new IndexServiceException(e);
		} catch (IOException e) {
			throw new IndexServiceException(e);
		}
	}

	/**
	 * method performs an optimization of the physical index delegated to handle the given dsId
	 * 
	 * @param dsId
	 *            needed to retrieve the physical index.
	 * @return the time (ms) elapsed to complete the operation.
	 * @throws IndexServiceException
	 */
	public long optimize(final String dsId) throws IndexServiceException {
		return optimizeCollection(serviceTools.getMetadataRef(dsId));
	}

	public long optimizeCollection(final MetadataReference mdRef) throws IndexServiceException {
		long start = System.currentTimeMillis();
		solrManager.getSolrAdministration().optimize(mdRef.toString());
		return System.currentTimeMillis() - start;
	}

	/**
	 * @return CqlValueTransformerMap
	 * @throws Exception
	 */
	public CqlValueTransformerMap getValueTransformerMap(final MetadataReference mdRef) throws Exception {
		Map<String, ValueType> schema = solrManager.getSolrAdministration().getFieldNamesAndTypes(mdRef.toString());
		return tMapFactory.getIt(schema);
	}

	/**
	 * 
	 * @param mdRef
	 * @return browsing aliases for given mdRef.
	 */
	public BiMap<String, String> getAliases(final MetadataReference mdRef) {
		return browseAliases.get(mdRef);
	}

	/**
	 * method returns list of registered Store Data Structure identifiers.
	 * 
	 * @return a list of available indexes
	 * @throws IndexServiceException
	 */
	public String[] getIndexList() throws IndexServiceException {
		return Iterables.toArray(serviceTools.listDsIds(), String.class);
	}

	/**
	 * method returns list of registered Store Data Structure identifiers as comma-separated string.
	 * 
	 * @return comma separated list index identifiers
	 * @throws IndexServiceException
	 */
	public String getIndexListCSV() throws IndexServiceException {
		return StringUtils.arrayToCommaDelimitedString(getIndexList());
	}

	public QueryResponse performQuery(final IndexQuery query, final MetadataReference mdRef) throws IndexServiceException {
		try {
			return solrManager.getSolrServer(mdRef).query(query);
		} catch (SolrServerException e) {
			throw new IndexServiceException(e);
		} catch (IOException e) {
			throw new IndexServiceException(e);
		}
	}

	/**
	 * 
	 * @param idxId
	 * @param query
	 * @param fromPosition
	 * @param toPosition
	 * @return List<String> of matching records
	 * @throws IndexServiceException
	 */
	public QueryResponseParser lookup(final IndexQuery query, final MetadataReference mdRef) throws IndexServiceException {

		if (log.isDebugEnabled()) {
			log.debug("performing query: " + query.toString());
		}

		return getResponseParser(performQuery(query, mdRef), mdRef);
	}

	/**
	 * 
	 * @param lang
	 * @param term
	 * @param mdRef
	 * @param dsId
	 * @return
	 * @throws IndexServiceException
	 */
	public Pair<String, Hint> suggest(final QueryLanguage lang, final String term, final MetadataReference mdRef, final String dsId)
			throws IndexServiceException {

		IndexQuery query = indexQueryFactory.getSuggestionQuery(lang, term, mdRef, dsId);

		log.info("running suggestion query: " + query.toString());

		final QueryResponse rsp = performQuery(query, mdRef);
		final SpellCheckResponse splCk = rsp.getSpellCheckResponse();

		final Hint hint = new Hint();
		String newQuery = "";

		if (splCk != null) {

			for (Suggestion s : splCk.getSuggestions()) {
				final String alternateTerm = Iterables.getFirst(s.getAlternatives(), s.getToken());
				if (!alternateTerm.equalsIgnoreCase(s.getToken())) {
					hint.addHint(s.getToken(), alternateTerm);
				}
			}

			switch (lang) {
			case CQL:
				newQuery = query.applyHints(splCk.getSuggestionMap()).getQueryRoot().toCQL();
				break;
			case SOLR:
				newQuery = splCk.getCollatedResult();
				break;
			default:
				throw new IndexServiceException("Unknow query language: " + lang);
			}
			query = indexQueryFactory.getIndexQuery(lang, newQuery, mdRef, dsId);
			query.setRows(0).set("spellcheck.build", false);

			final long spellcheck = performQuery(query, mdRef).getResults().getNumFound();

			final long numFound = rsp.getResults().getNumFound();
			log.info("numFound: " + numFound + ", spellcheck: " + spellcheck);
			hint.setAutofollow(spellcheck > numFound);
		}

		return new Pair<String, Hint>(newQuery, hint);
	}

	// ///////////////////////// helper

	private QueryResponseParser getResponseParser(final QueryResponse rsp, final MetadataReference mdRef) {
		return queryRespFactory.getQueryResponseParser(rsp, mdRef);
	}

	private ResultsResponse newResultsResponse(final String status, final long total) {
		final ResultsResponse res = new ResultsResponse();
		res.setStatus(status);
		res.setTotal((int) total);
		return res;
	}

	public boolean isRegisterOnFeed() {
		return registerOnFeed;
	}

	@Required
	public void setRegisterOnFeed(final boolean registerOnFeed) {
		this.registerOnFeed = registerOnFeed;
	}

	public SolrManager getSolrManager() {
		return solrManager;
	}

}
