package eu.dnetlib.functionality.index.solr;

import java.io.IOException;
import java.io.StringReader;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import javax.xml.parsers.ParserConfigurationException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.SpellCheckResponse;
import org.apache.solr.client.solrj.response.SpellCheckResponse.Suggestion;
import org.apache.solr.common.SolrDocument;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.io.SAXReader;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.util.StringUtils;
import org.xml.sax.SAXException;
import org.z3950.zing.cql.CQLParseException;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;

import eu.dnetlib.common.ws.dataprov.ResultsResponse;
import eu.dnetlib.enabling.resultset.client.IterableResultSetClient;
import eu.dnetlib.functionality.index.cql.CqlTranslator;
import eu.dnetlib.functionality.index.solr.SolrIndexServiceImpl.FeedMode;
import eu.dnetlib.functionality.index.solr.feed.FeedResult;
import eu.dnetlib.functionality.index.solr.feed.IndexDocument;
import eu.dnetlib.functionality.index.solr.feed.IndexDocument.STATUS;
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory;
import eu.dnetlib.functionality.index.solr.query.IndexQuery;
import eu.dnetlib.functionality.index.solr.query.IndexQueryFactory;
import eu.dnetlib.functionality.index.solr.query.QueryLanguage;
import eu.dnetlib.functionality.index.solr.query.QueryResponseFactory;
import eu.dnetlib.functionality.index.solr.query.QueryResponseParser;
import eu.dnetlib.functionality.index.solr.suggest.Hint;
import eu.dnetlib.functionality.index.solr.utils.IndexMap;
import eu.dnetlib.functionality.index.solr.utils.MetadataReference;
import eu.dnetlib.functionality.index.solr.utils.ServiceTools;
import eu.dnetlib.miscutils.collections.Pair;

/**
 * SolrIndexServer.
 *
 * @author claudio
 *
 */
public class SolrIndexServer {

	/**
	 * logger.
	 */
	private static final Log log = LogFactory.getLog(SolrIndexServer.class);

	/**
	 * query factory.
	 */
	private IndexQueryFactory indexQueryFactory;

	/**
	 * document factory used for the feed process.
	 */
	private InputDocumentFactory documentFactory;

	/**
	 * response factory.
	 */
	private QueryResponseFactory queryResponseFactory;

	/**
	 * contains handled index data structures.
	 */
	private IndexMap indexMap;

	/**
	 * Query used to match all the records.
	 */
	private static String queryAll = "*";

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.solr.utils.ServiceTools
	 */
	private ServiceTools serviceTools;

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.functionality.index.cql.CqlTranslator
	 */
	private CqlTranslator translator;

	/**
	 * initializer.
	 *
	 * @throws Exception 
	 */
	public void init() throws Exception {
		log.info("Initializing SolrIndexServer, trying to resume existing indexes");
	    final List<String> indexNames = indexMap.getExistingIndexNames();

	    for (String indexName : indexNames) {
	    	final MetadataReference mdRef = indexMap.getMetadataReference(indexName);
	        final List<String> dsId = serviceTools.getIndexDsIdsList(mdRef);

	        if (dsId != null) {
	        	indexMap.register(parseFields(serviceTools.getIndexFields(mdRef)), mdRef, dsId);
	            log.info("loaded " + indexMap.getDsId(indexName).size() + " dataStructure references for index " + indexName);
	        } else 
	            log.warn("couldn't find any referenced dataStructure lo load for index: " + indexName);
	    }
	    log.info("resume report: " + indexNames.size() + " indexes resumed: " + indexNames.toString());
	}
	
	/**
	 * helper method, parses a list of fields.
	 * 
	 * @param fields
	 * 			the given fields
	 * @return
	 * 			the parsed fields
	 * @throws DocumentException
	 * 			if cannot parse the fields
	 */
	private Document parseFields(final String fields) throws DocumentException {
		return (new SAXReader()).read(new StringReader(fields));
	}
	
	/**
	 * Creates an index.
	 * 
	 * @param dsId
	 * 			the given index dataStructure id
	 * @param mdRef
	 * @param fields
	 * @throws Exception
	 */
	public void create(
			final String dsId,
			final MetadataReference mdRef,
			final String fields) 
		throws Exception {
		
		log.info("registering DSId: " + dsId);
		indexMap.register(parseFields(fields), mdRef, Lists.newArrayList(dsId));
	}
	
	public int updateDocuments(
			final String query, 
			final MetadataReference mdRef, 
			final Map<String, String> fieldXPath, 
			final String regex, 
			final String replace) throws Exception {
		
		final ExecutorService executor = Executors.newSingleThreadExecutor();
		final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(100);
		final Object sentinel = new Object();
		final SolrIndex index = indexMap.getIndexByMetadata(mdRef);
		
		IndexQuery indexQuery = indexQueryFactory.getBaseQuery(query, mdRef, "all");
		QueryResponse response = performQuery(indexQuery, mdRef);
		
		Future<Integer> nUpdates = executor.submit(
				new IndexDocumentUpdater(queue, sentinel, fieldXPath, index, regex, replace));
		
		for(SolrDocument document : response.getResults())
			queue.put(document);
		
		queue.put(sentinel);
		
		return nUpdates.get();
	}
	
	/**
	 * feeds the documents provided by the {@link IterableResultSetClient} to the index specified by the dsId.
	 * 
	 * @param dsId
	 * @param version
	 * @param documentIterator
	 * @return
	 * @throws SolrServerException
	 * @throws IOException
	 * @throws CQLParseException 
	 * @throws ParseException 
	 * @throws SAXException 
	 * @throws ParserConfigurationException 
	 */
	public void feed(
			final String dsId, 
			final String version, 
			final FeedMode feedMode,
			final IterableResultSetClient documentIterator) 
		throws SolrServerException, IOException, CQLParseException, ParseException, ParserConfigurationException, SAXException {
		
		FeedResult res = new FeedResult(System.currentTimeMillis());
		
		SolrIndex index = indexMap.getIndexByDs(dsId);
		
		if (feedMode.equals(FeedMode.FULLTEXT))
			doFeedFulltext(index, documentIterator, res);
		else {
			doFeedRecords(index, version, feedMode, documentIterator, res);
			cleanMarkedDocuments(dsId);

			if (feedMode.equals(FeedMode.REFRESH))
				deleteByVersion(dsId, version);
				// otherwise, solr automatically overwrites records with the same objIdentifier: INCREMENTAL mode.
		}

		commit(dsId);
		res.setTimeElapsed(System.currentTimeMillis());
		log.info("FeedResult: " + res.toString());
	}

	private void doFeedRecords(final SolrIndex index, final String version, final FeedMode feedMode, final IterableResultSetClient documentIterator, final FeedResult res) 
		throws SolrServerException, IOException {

		for (String doc : documentIterator) {
			final IndexDocument inputDocument = documentFactory.getInputDocument(index, version, doc);
			parseDocumentStatus(inputDocument, index, res);
		}
	}

	/**
	 * TODO feed documents w/o needing to retrieve the original document from the index: simply adding 
	 * 		the new field
	 */
	private void doFeedFulltext(final SolrIndex index, final IterableResultSetClient documentIterator, final FeedResult res) 
		throws SolrServerException, IOException, CQLParseException, ParserConfigurationException, SAXException {
		
		for (String fullTextMetadata : documentIterator) {
			
			final IndexDocument document = documentFactory.getFullTextDocument(new IndexDocument(index), fullTextMetadata);
			
			if (document.getStatus().equals(STATUS.OK)) {
		
				String recordId = (String) document.getFieldValue(IndexMap.INDEX_RECORD_ID);
				SolrDocument solrDocument = getSingleDocument(index, recordId);
				
				if (solrDocument != null) {
					solrDocument.addField(IndexMap.FULLTEXT_ID, document.getFieldValue(IndexMap.FULLTEXT_ID));
					document.setContent(solrDocument);
				} else
					document.setError(new IllegalStateException("document id not found"));
			}			
			
			parseDocumentStatus(document, index, res);
		}
	}
	
	private SolrDocument getSingleDocument(final SolrIndex index, final String recordId) throws CQLParseException, IOException {
		
		final MetadataReference mdRef = indexMap.dsMetadataReference(index.getDsId());
		String query = IndexMap.INDEX_RECORD_ID + " = \"" + recordId + "\"";
		IndexQuery indexQuery = indexQueryFactory.getBaseQuery(query, mdRef, "all");
		
		try {
			return Iterables.getOnlyElement(index.getServer().query(indexQuery).getResults());
		} catch (SolrServerException e) {
			throw new RuntimeException("cannot fetch single document from index: " + mdRef.toString(), e);
		} catch (NoSuchElementException e) {
			log.warn("cannot find record with id: " + recordId);
			return null;
		}
	}
	
	/**
	 * helper method, parses document's {@link IndexDocument.STATUS} to updates the FeedResult,
	 * if the STATUS is OK, feeds the document.
	 * 
	 * @param document
	 * 			document whose status must be parsed
	 * @param res
	 * 			FeedResult to update
	 * @throws SolrServerException
	 * @throws IOException
	 */
	private void parseDocumentStatus(final IndexDocument document, final SolrIndex index, final FeedResult res) 
		throws SolrServerException, IOException {
		
		switch (document.getStatus()) {

		case OK:
			index.add(document);
			res.add();
			break;
	
		case MARKED:
			res.mark();
			break;
			
		case ERROR:
			res.skip();
			break;					
			
		default:
			throw new IllegalStateException("unknow document status");
		}
	}
	
	/**
	 * method deletes all the documents that matches the specified cql 
	 * query from the index that handles the given dsId.
	 * 
	 * @param query
	 * @param dsId
	 * @return
	 * @throws CQLParseException
	 * @throws IOException
	 * @throws SolrServerException
	 */
	public long deleteByQuery(final String query, final String dsId, final boolean ignoreDsId) 
		throws CQLParseException, IOException, SolrServerException {
		
		final String cqlQuery = ignoreDsId ? 
				query : 
				query + " and " + IndexMap.DS_ID + " exact " + dsId;
		final String luceneQuery = translator.toLucene(cqlQuery);
		final String indexName = indexMap.getIndexName(dsId);

		log.info("DELETE BY QUERY: " + luceneQuery + " on '" + indexName + "' physical index");
		
		return indexMap.getIndexByDs(dsId).getServer().deleteByQuery(luceneQuery).getElapsedTime();
	}
	
	/**
	 * method deletes all the documents of a specified dsId whose {@link IndexMap}.DS_VERSION 
	 * field is older than the specified mdFormatVersion. 
	 * 
	 * @param dsId
	 * @param mdFormatVersion
	 * @return
	 * 		the time elapsed to complete the operation.
	 * @throws SolrServerException
	 * @throws IOException
	 * @throws CQLParseException
	 * @throws ParseException
	 */
	public long deleteByVersion(final String dsId, final String mdFormatVersion) 
		throws SolrServerException, IOException, CQLParseException, ParseException {

		final String query = IndexMap.DS_VERSION + " < \"" + 
							 documentFactory.getParsedDateField(mdFormatVersion) + "\"";

		return deleteByQuery(query, dsId, false);
	}
	
	/**
	 * method delete documents where IndexMap.DELETE_DOCUMENT field is true 
	 * 
	 * @param dsId
	 * @return
	 * 		the time elapsed to complete the operation.
	 * @throws SolrServerException
	 * @throws IOException
	 * @throws CQLParseException
	 */
	public long cleanMarkedDocuments(final String dsId) 
		throws SolrServerException, IOException, CQLParseException {
	
		final String query = IndexMap.DELETE_DOCUMENT + " all true ";

		return deleteByQuery(query, dsId, false);
	}

	/**
	 * method queries the index to get the number of indexed documents
	 * of a specific dataStructure.
	 * 
	 * @param dsId
	 * @return
	 * 		the time elapsed to complete the operation.
	 * @throws CQLParseException
	 * @throws IOException
	 * @throws SolrServerException
	 * @throws DocumentException
	 */
	public ResultsResponse getNumberOfRecords(final String dsId) 
		throws CQLParseException, IOException, SolrServerException, DocumentException {

		final MetadataReference mdRef = indexMap.dsMetadataReference(dsId);
		final IndexQuery query = indexQueryFactory.getIndexQuery(QueryLanguage.CQL, queryAll, mdRef, dsId);
		final QueryResponseParser lookupRes = this.lookup(query, mdRef);
		 
		return newResultsResponse(lookupRes.getStatus(), lookupRes.getNumFound());
	}
	
	/**
	 * method performs a commit of the physical index delegated to handle the given dsId
	 * 
	 * @param dsId
	 * 			needed to retrieve the physical index.
	 * @return
	 * 			the time (ms) elapsed to complete the operation.
	 * @throws SolrServerException
	 * @throws IOException
	 * @throws SAXException 
	 * @throws ParserConfigurationException 
	 */
	public long commit(final String dsId) 
		throws SolrServerException, IOException, ParserConfigurationException, SAXException {

		return indexMap.getIndexByDs(dsId).getServer().commit().getElapsedTime();
	}

	/**
	 * method performs an optimization of the physical index delegated to handle the given dsId
	 * 
	 * @param dsId
	 * 			needed to retrieve the physical index.
	 * @return
	 * 			the time (ms) elapsed to complete the operation.
	 * @throws SolrServerException
	 * @throws IOException
	 * @throws SAXException 
	 * @throws ParserConfigurationException 
	 */
	public long optimize(final String dsId) 
		throws SolrServerException, IOException, ParserConfigurationException, SAXException {

		return indexMap.getIndexByDs(dsId).getServer().optimize().getElapsedTime();
	}
	
	/**
	 * method returns list of registered Store Data Structure identifiers.
	 * 
	 * @return a list of available indexes
	 */
	public String[] getIndexList() {

		return indexMap.getDsIdArray();
	}
	
	/**
	 * method returns list of registered Store Data Structure identifiers as comma-separated string.
	 * @return comma separated list index identifiers
	 */
	public String getIndexListCSV() {
	
		return StringUtils.arrayToCommaDelimitedString(getIndexList());
	}
	
	/**
	 * 
	 * 
	 * @param term
	 * @param dsId
	 * @param mdRef
	 * @return
	 * @throws SolrServerException
	 */
	public Pair<String, Hint> suggest(QueryLanguage lang, String term, MetadataReference mdRef, String dsId) throws SolrServerException {
		try {
			IndexQuery query = indexQueryFactory.getSuggestionQuery(lang, term, mdRef, dsId);
			
			log.info("running suggestion query: " + query.toString());
			
			QueryResponse rsp = performQuery(query, mdRef); 
			SpellCheckResponse splCk = rsp.getSpellCheckResponse();
			long numFound = rsp.getResults().getNumFound();
			
			Hint hint = new Hint();
			String newQuery = "";
			
			if (splCk != null) {
				
				for (Suggestion s : splCk.getSuggestions()) {
					String alternateTerm = Iterables.getFirst(s.getAlternatives(), s.getToken());
					if(!alternateTerm.equalsIgnoreCase(s.getToken()))
						hint.addHint(s.getToken(), alternateTerm);
				}
				
				switch (lang) {
					case CQL:  newQuery = query.applyHints(splCk.getSuggestionMap()).getQueryRoot().toCQL(); break;
					case SOLR: newQuery = splCk.getCollatedResult(); break;
					default: throw new SolrServerException("Unknow query language: " + lang);
				}				
				query = indexQueryFactory.getIndexQuery(lang, newQuery, mdRef, dsId);
				query.setRows(0).set("spellcheck.build", false);
				
				long spellcheck = performQuery(query, mdRef).getResults().getNumFound();
				
				log.info("numFound: " + numFound + ", spellcheck: " + spellcheck);
				hint.setAutofollow(spellcheck > numFound);
			}			
			
			return new Pair<String, Hint>(newQuery, hint);

		} catch (CQLParseException e) {
			throw new SolrServerException(e);
		} catch (IOException e) {
			throw new SolrServerException(e);
		}
	}
	
	public QueryResponse performQuery(IndexQuery query, MetadataReference mdRef) throws SolrServerException {
		return indexMap.getIndexByMetadata(mdRef).getServer().query(query);
	}
	
	private QueryResponseParser getResponseParser(QueryResponse rsp, MetadataReference mdRef) {
		return queryResponseFactory.getQueryResponseParser(rsp, mdRef);	
	}
	
	/**
	 * 
	 * @param idxId
	 * @param query
	 * @param fromPosition
	 * @param toPosition
	 * @return List<String> of matching records
	 * @throws SolrServerException
	 * @throws CQLParseException
	 * @throws IOException
	 * @throws DocumentException
	 */
	public QueryResponseParser lookup(final IndexQuery query, final MetadataReference mdRef) 
		throws SolrServerException, CQLParseException, IOException, DocumentException {
		
		if (log.isDebugEnabled())
			log.debug("performing query: " + query.toString());
		
		return getResponseParser(performQuery(query, mdRef), mdRef);
	}
	
	/////////////////////////// helper
	
	private ResultsResponse newResultsResponse(final String status, final long total) {
		ResultsResponse res = new ResultsResponse();
		res.setStatus(status);
		res.setTotal((int)total);
		return res;
	}
	
	/////////////////////////// setters and getters
	
	public IndexQueryFactory getQueryFactory() {
		return indexQueryFactory;
	}
	
	@Required
	public void setQueryFactory(final IndexQueryFactory indexQueryFactory) {
		this.indexQueryFactory = indexQueryFactory;
	}
	
	@Required
	public void setDocumentFactory(final InputDocumentFactory documentFactory) {
		this.documentFactory = documentFactory;
	}
	
	@Required
	public void setQueryResponseFactory(final QueryResponseFactory queryResponseFactory) {
		this.queryResponseFactory = queryResponseFactory; 
	}
	
	@Required
	public void setServiceTools(final ServiceTools serviceTools) {
		this.serviceTools = serviceTools;
	}

	@Required
	public void setIndexMap(final IndexMap indexMap) {
		this.indexMap = indexMap;
	}
	
	public IndexMap getIndexMap() {
		return indexMap;
	}

	@Required
	public void setTranslator(final CqlTranslator translator) {
		this.translator = translator;
	}

}
