package eu.dnetlib.functionality.index.solr;

import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import javax.annotation.Resource;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.SpellCheckResponse;
import org.apache.solr.client.solrj.response.SpellCheckResponse.Suggestion;
import org.apache.solr.common.SolrDocument;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.io.SAXReader;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.util.StringUtils;
import org.z3950.zing.cql.CQLParseException;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;

import eu.dnetlib.common.ws.dataprov.ResultsResponse;
import eu.dnetlib.data.index.IndexServiceException;
import eu.dnetlib.enabling.resultset.client.IterableResultSetClient;
import eu.dnetlib.functionality.index.cql.CqlTranslator;
import eu.dnetlib.functionality.index.solr.feed.DocumentFeeder;
import eu.dnetlib.functionality.index.solr.feed.DocumentMapperFactory;
import eu.dnetlib.functionality.index.solr.feed.FeedMode;
import eu.dnetlib.functionality.index.solr.feed.FeedResult;
import eu.dnetlib.functionality.index.solr.feed.IndexDocument;
import eu.dnetlib.functionality.index.solr.query.IndexQuery;
import eu.dnetlib.functionality.index.solr.query.IndexQueryFactory;
import eu.dnetlib.functionality.index.solr.query.QueryLanguage;
import eu.dnetlib.functionality.index.solr.query.QueryResponseFactory;
import eu.dnetlib.functionality.index.solr.query.QueryResponseParser;
import eu.dnetlib.functionality.index.solr.suggest.Hint;
import eu.dnetlib.functionality.index.solr.utils.IndexMap;
import eu.dnetlib.functionality.index.solr.utils.MetadataReference;
import eu.dnetlib.functionality.index.solr.utils.ServiceTools;
import eu.dnetlib.functionality.index.solr.utils.Weights;
import eu.dnetlib.miscutils.collections.Pair;

/**
 * SolrIndexServer.
 *
 * @author claudio
 *
 */
public class SolrIndexServer {

	/**
	 * logger.
	 */
	private static final Log log = LogFactory.getLog(SolrIndexServer.class);	// NOPMD

	/**
	 * query factory.
	 */
	@Resource
	private transient IndexQueryFactory indexQueryFactory;

	/**
	 * document factory used for the feed process.
	 */
	@Resource
	private transient DocumentMapperFactory docMapperFactory;

	/**
	 * response factory.
	 */
	@Resource
	private transient QueryResponseFactory queryRespFactory;

	/**
	 * contains handled index data structures.
	 */
	@Resource
	private transient IndexMap indexMap;
	
	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.solr.utils.Weights
	 */
	@Resource
	private transient Weights weights;

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.solr.utils.ServiceTools
	 */
	@Resource
	private transient ServiceTools serviceTools;
	
	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.cql.CqlTranslator
	 */
	@Resource
	private transient CqlTranslator cqlTranslator;
	
	/**
	 * if enabled, allows to create an index on the first feed it receives.
	 */
	private boolean registerOnFeed;	
	
	/**
	 * Thread pool used for the feeding process.
	 */
	private final transient ExecutorService threadPool = Executors.newCachedThreadPool();

	/**
	 * initializer.
	 *
	 * @throws IndexServiceException
	 * 				could happen 
	 */
	public void init() throws IndexServiceException {
		log.info("Initializing SolrIndexServer, trying to resume existing indexes");
	    final List<MetadataReference> indexNames = indexMap.getExistingIndexNames();

	    for (MetadataReference mdRef : indexNames) {
	        final List<String> dsId = serviceTools.getIndexDsIdsList(mdRef);

	        if (dsId != null) {
				weights.initialize();
	        	
	        	indexMap.register(parseFields(serviceTools.getIndexFields(mdRef)), mdRef, dsId);
	            log.info("loaded " + indexMap.getDsId(mdRef).size() + " dataStructure references for index " + mdRef.toString());
	        } else {
	            log.warn("couldn't find any referenced dataStructure lo load for index: " + mdRef.toString());
	        }
	    }
	    log.info("resume report: " + indexNames.size() + " indexes resumed: " + indexNames.toString());
	}
	
	/**
	 * helper method, parses a list of fields.
	 * 
	 * @param fields
	 * 			the given fields
	 * @return
	 * 			the parsed fields
	 * @throws IndexServiceException
	 * 				if cannot parse the fields 
	 */
	private Document parseFields(final String fields) throws IndexServiceException {
		try {
			return new SAXReader().read(new StringReader(fields));
		} catch (DocumentException e) {
			throw new IndexServiceException(e);
		}
	}
	
	/**
	 * Creates an index.
	 * 
	 * @param dsId
	 * 			the given index dataStructure id
	 * @param mdRef
	 * 			the metadataReference that identifies the index.
	 * @param fields
	 * 			index fields used to build the schema.
	 * @throws IndexServiceException 
	 * 			could happen
	 */
	public void create(
			final String dsId,
			final MetadataReference mdRef,
			final String fields) throws IndexServiceException {
		
		log.info("registering DSId: " + dsId);
		indexMap.register(parseFields(fields), mdRef, Lists.newArrayList(dsId));
	}
	
	/**
	 * Given a set of documents matching a query, updates those that matches the given regex, 
	 * with a given replacement.
	 * 
	 * @param query
	 * @param mdRef
	 * @param fieldXPath
	 * @param regex
	 * @param replace
	 * @return
	 * @throws IndexServiceException 
	 * @throws Exception
	 */
	public int updateDocuments(
			final String query, 
			final MetadataReference mdRef, 
			final Map<String, String> fieldXPath, 
			final String regex, 
			final String replace) throws IndexServiceException {
		
		final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(100);
		final Object sentinel = new Object();
		final SolrIndex index = indexMap.getIndexByMetadata(mdRef);
		
		final IndexQuery indexQuery = indexQueryFactory.getBaseQuery(query, mdRef, "all");
		final QueryResponse response = performQuery(indexQuery, mdRef);
		
		final Future<Integer> nUpdates = threadPool.submit(
				new IndexDocumentUpdater(queue, sentinel, fieldXPath, index, regex, replace));
		try {
			for (SolrDocument document : response.getResults()) {
				queue.put(document);
			}
			
			queue.put(sentinel);
			
			return nUpdates.get();
		} catch (InterruptedException e) {
			throw new IndexServiceException(e);
		} catch (ExecutionException e) {
			throw new IndexServiceException(e);
		}
	}
	
	/**
	 * feeds the documents provided by the {@link IterableResultSetClient} to the index specified by the dsId.
	 * 
	 * @param dsId
	 * @param version
	 * @param feedMode
	 * @param documentIterator
	 * @throws IndexServiceException
	 */
	public void feed(
			final String dsId, 
			final String version, 
			final FeedMode feedMode,
			final IterableResultSetClient documentIterator) throws IndexServiceException { 
		
		if (!indexMap.isRegistered(dsId) && isRegisterOnFeed()) {
			final MetadataReference mdRef = serviceTools.getMetadataRef(dsId);
			create(dsId, mdRef, serviceTools.getIndexFields(mdRef));
		}
		
		final SolrIndex index = indexMap.getIndexByDs(dsId);
		
		Function<String, IndexDocument> inputDocumentFunction;
				
		switch(feedMode) {
			case UPDATE:
				inputDocumentFunction = docMapperFactory.getUpdateMapper(index);
				break;
		
			default:
				inputDocumentFunction = docMapperFactory.getMetatadaMapper(index, version);
		}
		
		try {
			final FeedResult res = threadPool.submit(
					new DocumentFeeder(index, Iterables.transform(documentIterator, inputDocumentFunction))).get();
			cleanMarkedDocuments(dsId);

			if (feedMode.equals(FeedMode.REFRESH)) {
				deleteByVersion(dsId, version);
				// otherwise, solr automatically overwrites records with the same objIdentifier: INCREMENTAL mode.
			}

			index.getServer().commit();
			log.info("FeedResult: " + res.setTimeElapsed(System.currentTimeMillis()));
		} catch (InterruptedException e) {
			throw new IndexServiceException(e);
		} catch (ExecutionException e) {
			throw new IndexServiceException(e);
		} catch (SolrServerException e) {
			throw new IndexServiceException(e);
		} catch (IOException e) {
			throw new IndexServiceException(e);
		}
	}
	
	/**
	 * method deletes all the documents that matches the specified cql 
	 * query from the index that handles the given dsId.
	 * 
	 * @param query
	 * @param dsId
	 * @return
	 * @throws IndexServiceException 
	 * @throws CQLParseException
	 * @throws IOException
	 * @throws SolrServerException
	 */
	public long deleteByQuery(final String query, final String dsId, final boolean ignoreDsId) 
			throws IndexServiceException  {
		
		final String cqlQuery = ignoreDsId ? 
				query : 
				query + " and " + IndexMap.DS_ID + " exact " + dsId;
		try {
			final String luceneQuery = cqlTranslator.toLucene(cqlQuery);
			final MetadataReference mdRef = indexMap.getMdRefById(dsId);

			log.info("DELETE BY QUERY: " + luceneQuery + " on '" + mdRef.toString() + "' physical index");
			
			return indexMap.getIndexByDs(dsId).getServer().deleteByQuery(luceneQuery).getElapsedTime();
		} catch (CQLParseException e) {
			throw new IndexServiceException(e);
		} catch (IOException e) {
			throw new IndexServiceException(e);
		} catch (SolrServerException e) {
			throw new IndexServiceException(e);
		}
	}
	
	/**
	 * method deletes all the documents of a specified dsId whose {@link IndexMap}.DS_VERSION 
	 * field is older than the specified mdFormatVersion. 
	 * 
	 * @param dsId
	 * @param mdFormatVersion
	 * @return
	 * 		the time elapsed to complete the operation.
	 * @throws IndexServiceException 
	 */
	public long deleteByVersion(final String dsId, final String mdFormatVersion) 
		throws IndexServiceException {

		final String query = IndexMap.DS_VERSION + " < \""
				+ docMapperFactory.getDateMapper().apply(mdFormatVersion) + "\"";

		return deleteByQuery(query, dsId, false);
	}
	
	/**
	 * method delete documents where IndexMap.DELETE_DOCUMENT field is true 
	 * 
	 * @param dsId
	 * @return
	 * 		the time elapsed to complete the operation.
	 * @throws IndexServiceException 
	 * @throws SolrServerException
	 * @throws IOException
	 * @throws CQLParseException
	 */
	public long cleanMarkedDocuments(final String dsId) throws IndexServiceException {
	
		final String query = IndexMap.DELETE_DOCUMENT + " all true ";

		return deleteByQuery(query, dsId, false);
	}

	/**
	 * method queries the index to get the number of indexed documents
	 * of a specific dataStructure.
	 * 
	 * @param dsId
	 * @return
	 * 		the time elapsed to complete the operation.
	 * @throws IndexServiceException 
	 * @throws CQLParseException
	 * @throws IOException
	 * @throws SolrServerException
	 * @throws DocumentException
	 */
	public ResultsResponse getNumberOfRecords(final String dsId) throws IndexServiceException {

		final MetadataReference mdRef = indexMap.dsMetadataReference(dsId);
		final IndexQuery query = indexQueryFactory.getIndexQuery(QueryLanguage.SOLR, IndexMap.queryAll, mdRef, dsId);
		final QueryResponseParser lookupRes = this.lookup(query, mdRef);
		 
		return newResultsResponse(lookupRes.getStatus(), lookupRes.getNumFound());
	}
	
	/**
	 * method performs a commit of the physical index delegated to handle the given dsId
	 * 
	 * @param dsId
	 * 			needed to retrieve the physical index.
	 * @return
	 * 			the time (ms) elapsed to complete the operation.
	 * @throws IndexServiceException 
	 */
	public long commit(final String dsId) throws IndexServiceException  {
		try {
			return indexMap.getIndexByDs(dsId).getServer().commit().getElapsedTime();
		} catch (SolrServerException e) {
			throw new IndexServiceException(e);
		} catch (IOException e) {
			throw new IndexServiceException(e);
		}
	}

	/**
	 * method performs an optimization of the physical index delegated to handle the given dsId
	 * 
	 * @param dsId
	 * 			needed to retrieve the physical index.
	 * @return
	 * 			the time (ms) elapsed to complete the operation.
	 * @throws IndexServiceException 
	 */
	public long optimize(final String dsId) throws IndexServiceException {
		try {
			return indexMap.getIndexByDs(dsId).getServer().optimize().getElapsedTime();
		} catch (SolrServerException e) {
			throw new IndexServiceException(e);
		} catch (IOException e) {
			throw new IndexServiceException(e);
		}
	}
	
	/**
	 * method returns list of registered Store Data Structure identifiers.
	 * 
	 * @return a list of available indexes
	 */
	public String[] getIndexList() {

		return indexMap.getDsIdArray();
	}
	
	/**
	 * method returns list of registered Store Data Structure identifiers as comma-separated string.
	 * @return comma separated list index identifiers
	 */
	public String getIndexListCSV() {
	
		return StringUtils.arrayToCommaDelimitedString(getIndexList());
	}
	
	/**
	 * 
	 * 
	 * @param term
	 * @param dsId
	 * @param mdRef
	 * @return
	 * @throws IndexServiceException 
	 * @throws SolrServerException
	 */
	public Pair<String, Hint> suggest(final QueryLanguage lang, final String term, final MetadataReference mdRef, final String dsId) 
			throws IndexServiceException  {
		
		IndexQuery query = indexQueryFactory.getSuggestionQuery(lang, term, mdRef, dsId);
		
		log.info("running suggestion query: " + query.toString());
		
		final QueryResponse rsp = performQuery(query, mdRef); 
		final SpellCheckResponse splCk = rsp.getSpellCheckResponse();
		
		final Hint hint = new Hint();
		String newQuery = "";
		
		if (splCk != null) {
			
			for (Suggestion s : splCk.getSuggestions()) {
				final String alternateTerm = Iterables.getFirst(s.getAlternatives(), s.getToken());
				if(!alternateTerm.equalsIgnoreCase(s.getToken())) {
					hint.addHint(s.getToken(), alternateTerm);
				}
			}
			
			switch (lang) {
				case CQL:  newQuery = query.applyHints(splCk.getSuggestionMap()).getQueryRoot().toCQL(); break;
				case SOLR: newQuery = splCk.getCollatedResult(); break;
				default: throw new IndexServiceException("Unknow query language: " + lang);
			}				
			query = indexQueryFactory.getIndexQuery(lang, newQuery, mdRef, dsId);
			query.setRows(0).set("spellcheck.build", false);
			
			final long spellcheck = performQuery(query, mdRef).getResults().getNumFound();
			
			final long numFound = rsp.getResults().getNumFound();
			log.info("numFound: " + numFound + ", spellcheck: " + spellcheck);
			hint.setAutofollow(spellcheck > numFound);
		}			
		
		return new Pair<String, Hint>(newQuery, hint);
	}
	
	public QueryResponse performQuery(final IndexQuery query, final MetadataReference mdRef) throws IndexServiceException {
		try {
			return indexMap.getIndexByMetadata(mdRef).getServer().query(query);
		} catch (SolrServerException e) {
			throw new IndexServiceException(e);
		}
	}
	
	private QueryResponseParser getResponseParser(final QueryResponse rsp, final MetadataReference mdRef) {
		return queryRespFactory.getQueryResponseParser(rsp, mdRef);	
	}
	
	/**
	 * 
	 * @param idxId
	 * @param query
	 * @param fromPosition
	 * @param toPosition
	 * @return List<String> of matching records
	 * @throws IndexServiceException 
	 */
	public QueryResponseParser lookup(final IndexQuery query, final MetadataReference mdRef) 
			throws IndexServiceException {
		
		if (log.isDebugEnabled()) {
			log.debug("performing query: " + query.toString());
		}
		
		return getResponseParser(performQuery(query, mdRef), mdRef);
	}
	
	/////////////////////////// helper
	
	private ResultsResponse newResultsResponse(final String status, final long total) {
		final ResultsResponse res = new ResultsResponse();
		res.setStatus(status);
		res.setTotal((int)total);
		return res;
	}
	
	public IndexMap getIndexMap() {
		return indexMap;
	}

	public boolean isRegisterOnFeed() {
		return registerOnFeed;
	}

	@Required
	public void setRegisterOnFeed(final boolean registerOnFeed) {
		this.registerOnFeed = registerOnFeed;
	}

}
