package eu.dnetlib.data.search.app;

import eu.dnetlib.api.data.BrowseDataNotAvailableException;
import eu.dnetlib.api.data.IndexService;
import eu.dnetlib.api.data.IndexServiceException;
import eu.dnetlib.api.data.SearchService;
import eu.dnetlib.api.data.SearchServiceException;
import eu.dnetlib.api.enabling.ResultSetService;
import eu.dnetlib.data.search.app.plan.FieldRewriteRule;
import eu.dnetlib.data.search.app.plan.Query;
import eu.dnetlib.data.search.app.plan.QueryRewriteRule;
import eu.dnetlib.domain.ActionType;
import eu.dnetlib.domain.EPR;
import eu.dnetlib.domain.ResourceType;
import eu.dnetlib.domain.data.BrowseData;
import eu.dnetlib.domain.enabling.Notification;
import eu.dnetlib.domain.enabling.Vocabulary;
import eu.dnetlib.utils.cql.CqlBoolean;
import eu.dnetlib.utils.cql.CqlClause;
import eu.dnetlib.utils.cql.CqlException;
import eu.dnetlib.utils.cql.CqlRelation;
import gr.uoa.di.driver.app.DriverServiceImpl;
import gr.uoa.di.driver.enabling.ISLookUpException;
import gr.uoa.di.driver.enabling.issn.NotificationListener;
import gr.uoa.di.driver.enabling.resultset.ResultSet;
import gr.uoa.di.driver.enabling.resultset.ResultSetFactory;
import gr.uoa.di.driver.enabling.vocabulary.VocabularyLoader;
import gr.uoa.di.driver.util.ServiceLocator;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

public class SearchServiceImpl extends DriverServiceImpl
		implements SearchService {
	
	private static Logger logger = Logger.getLogger(SearchServiceImpl.class);
	
	private ServiceLocator<IndexService> indexLocator = null;
	private ServiceLocator<ResultSetService> resultSetLocator = null;
	private ResultSetFactory rsFactory = null;
	private Map<String, String> vocabularyNames = null;
	private Map<String, VocabularyLoader> vocabularyLoaders = null;
	private List<QueryRewriteRule> queryRules = null;
	private Map<String, FieldRewriteRule> fieldRules = null;
	
	private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
	private int browseUpdatePeriod = 30;
	private boolean updateBrowseData = true;
	
	// <key1, <key2, value>> is <fieldname, <prefix, browse data>>
	private final Map<String, Map<String, BrowseData>> browseData;
	
	public ServiceLocator<IndexService> getIndexLocator() {
		return indexLocator;
	}

	public void setIndexLocator(ServiceLocator<IndexService> indexLocator) {
		this.indexLocator = indexLocator;
	}

	public ServiceLocator<ResultSetService> getResultSetLocator() {
		return resultSetLocator;
	}

	public void setResultSetLocator(
			ServiceLocator<ResultSetService> resultSetLocator) {
		this.resultSetLocator = resultSetLocator;
	}

	public Map<String, String> getVocabularyNames() {
		return vocabularyNames;
	}

	public void setVocabularyNames(Map<String, String> vocabularyNames) {
		this.vocabularyNames = vocabularyNames;
	}

	public Map<String, VocabularyLoader> getVocabularyLoaders() {
		return vocabularyLoaders;
	}

	public void setVocabularyLoaders(
			Map<String, VocabularyLoader> vocabularyLoaders) {
		this.vocabularyLoaders = vocabularyLoaders;
	}

	public ResultSetFactory getRsFactory() {
		return rsFactory;
	}

	public void setRsFactory(ResultSetFactory rsFactory) {
		this.rsFactory = rsFactory;
	}

	public Collection<FieldRewriteRule> getFieldRules() {
		return fieldRules.values();
	}

	public void setFieldRules(Collection<FieldRewriteRule> fieldRules) {
		this.fieldRules = new HashMap<String, FieldRewriteRule>();
		for (FieldRewriteRule rule : fieldRules) {
			String key = rule.getFieldName();
			if (this.fieldRules.containsKey(key)) {
				logger.warn("Multiple rules for field " + key);
				logger.warn("Keeping last rule " + rule.getName());
			}
			this.fieldRules.put(key, rule);
		}
	}

	public List<QueryRewriteRule> getQueryRules() {
		return queryRules;
	}

	public void setQueryRules(List<QueryRewriteRule> queryRules) {
		this.queryRules = queryRules;
	}

	public SearchServiceImpl() {
		super();
		this.browseData = Collections.synchronizedMap(
				new HashMap<String, Map<String, BrowseData>>());
	}
	
	@Override
	public void init() {
		super.init();
		
		if (!vocabularyNames.keySet().equals(vocabularyLoaders.keySet())) {
			throw new IllegalStateException("The keysets of the " +
					"vocabulary names and loaders beans differ: " +
					vocabularyNames.keySet() + " =!= " +
					vocabularyLoaders.keySet());
		}
		
		for (String field : vocabularyNames.keySet()) {
			browseData.put(field, null);
		}
		
		executor.scheduleAtFixedRate(new VocabularyUpdateTask(), 0, browseUpdatePeriod, TimeUnit.MINUTES);
		
		this.subscribe(ActionType.CREATE, ResourceType.REPOSITORYSERVICERESOURCETYPE);
		this.subscribe(ActionType.UPDATE, ResourceType.REPOSITORYSERVICERESOURCETYPE);
		this.subscribe(ActionType.DELETE, ResourceType.REPOSITORYSERVICERESOURCETYPE);
		this.subscribe(ActionType.CREATE, ResourceType.INDEXDSRESOURCETYPE);
		this.subscribe(ActionType.UPDATE, ResourceType.INDEXDSRESOURCETYPE);
		this.subscribe(ActionType.DELETE, ResourceType.INDEXDSRESOURCETYPE);
		this.subscribe(ActionType.UPDATE, ResourceType.VOCABULARYDSRESOURCETYPE);
		
		this.addNotificationListener(new BrowseNotificationListener());
	}
	
	public String search_new(String text) throws SearchServiceException {
	
		logger.info("Received query: " + text);
	
		// TODO: no query parsing --> i.e. no index exclusion
		Query query = null;
		try {
			query = new Query(text);
			
		} catch (CqlException e) {
			String msg = "Invalid CQL query: " + text;
			logger.warn(msg, e);
			throw new SearchServiceException(msg);
		}
		
		// TODO: enable cache? cache already in web ui
				
		// TODO: no planning --> i.e. no list of isv
		// TODO: no optimization --> i.e. selecting best isv
	
		// schedule query for execution and start operators
		logger.debug("Dispatch query execution...");
		//TODO: turn back on
		//dispatchQueryExecution(query);
		// TODO: removed if (query.getOperator() == null) {
		if (query == null) {
			logger.warn("Failed to schedule query execution. Return null epr.");
			return null;
		}
		
		// form query result set and keep in set of running result sets
		logger.debug("Create query result set...");
		// TODO: turn back on
		String epr = null; // createQueryResultSet(query);
		if (epr == null) {
			logger.warn("Failed to create pull result set. Return null epr.");
			return null;
		}
		// TODO: what happens if create pull rs fails?
		
		// TODO: clean timed-out queries
		
		if (logger.isDebugEnabled()) {
			logger.debug("Return epr for query " + text + " : " + epr);
		}
		return epr;
	}

	@Override
	public EPR search(String text) throws SearchServiceException {
		IndexService index = null;
		EPR epr = null;
		
		logger.info("Received Query: " + text);
		
		try {
			long time = System.currentTimeMillis();
		
			Query query = rewrite(new Query(text));
	
			index = getIndexLocator().getService();
			
			if (logger.isDebugEnabled()) {
				logger.debug("index lookup (all, query="
						+ query.toString() + ", DMF, index)");
			}
			// TODO: remove query= part			
			epr = index.indexLookup(
					"all", "query=" + query.toString(), "DMF", "index");
			
			if (logger.isDebugEnabled()) {
				logger.debug("epr = " + epr);
			}
			
			time = System.currentTimeMillis() - time;
			logger.debug("index query lasted " + time + " msec");
		
		} catch (IndexServiceException ise) {
			logger.error("Error calling index.", ise);
			throw new SearchServiceException("Error calling index.");
		} catch (CqlException cqle) {
			logger.error("Bad CQL query.", cqle);
			throw new SearchServiceException("Error calling index.");
		}
		
		if (epr == null) {
			throw new SearchServiceException(
					"Index returned null result set id.");
		}
		
		return epr;
	}
	
	Query rewrite(Query query) throws SearchServiceException {		
		if (logger.isDebugEnabled()) {
			logger.debug("Apply query rules on " + query + " ...");
		}
		
		if (queryRules != null) {
			for (QueryRewriteRule queryRule: queryRules) {
				try {
					if (logger.isDebugEnabled()) {
						logger.debug("Apply rule " + query);
					}
					query = queryRule.apply(query);
					if (logger.isDebugEnabled()) {
						logger.debug("Rewritten query is " + query);
					}
					
				} catch (CqlException cqle) {
					// log and continue with other rules if one fails
					logger.warn("Rule failed: " + cqle.getMessage(), cqle);
				}
			}
		}
		
		if (logger.isDebugEnabled()) {
			logger.debug("Apply field rules on " + query + " ...");
		}
		
		CqlClause root = rewrite(query.getCqlQuery().getRoot());
		try {
			query = new Query(root.toCqlString());
		} catch (CqlException cqle) {
			logger.warn("Malformed CQL: " + root.toCqlString(), cqle);
			throw new SearchServiceException(
					"Malformed CQL: " + root.toCqlString());
		}
		if (logger.isDebugEnabled()) {
			logger.debug("Rewrite yields " + query);
		}
		
		return query;
	}
	
	CqlClause rewrite(CqlClause clause) {
		logger.debug("Field rules are : " + fieldRules.toString());
		if (fieldRules == null) {
			return clause;
		}
		
		switch (clause.type) {
		case BOOLEAN:
			CqlBoolean bool = (CqlBoolean) clause;
			clause = new CqlBoolean(rewrite(bool.getLeft()),
					bool.getOperator(), rewrite(bool.getRight()));
			break;
		case TERM:
			// no rewrite
			break;
		case RELATION:
			CqlRelation relation = (CqlRelation) clause;
			FieldRewriteRule rule = fieldRules.get(relation.getIndex());
			if (rule != null) {
				try {
					if (logger.isDebugEnabled()) {
						logger.debug("Apply rule " + rule.getName()
								+ " on " + clause.toCqlString() + " ...");
					}
					clause = rule.apply(relation);
					if (logger.isDebugEnabled()) {
						logger.debug("New clause is " + clause.toCqlString());
					}
				} catch (CqlException cqle) {
					// log and continue with other rules if one fails
					logger.warn("Rule failed: " + cqle.getMessage(), cqle);
				}
			}
			break;
		default:
			throw new IllegalArgumentException("Unknown CQL clause type.");
		}
		
		return clause; 
	}
	
	@Override
	public EPR refine(String text, Collection<String> fields)
			throws SearchServiceException {
			
		IndexService index = null;
			EPR epr = null;
			Exception error = null;
		
		try {
			Query query = rewrite(new Query(text));
		
			if (query.toString() == null || query.toString().trim().length() == 0) {
				throw new SearchServiceException("No query is specified in refine.");
			}
			if (fields == null || fields.size() == 0) {
				throw new SearchServiceException("No fields specified in refine.");
			}
		
			logger.info("Received Refine Query: "
				+ query.toString() + " for fields: " + fields);
	
			long time = System.currentTimeMillis();
			
			logger.debug("looking for index");
			index = getIndexLocator().getService();
			
			StringBuffer buffer = new StringBuffer();
			buffer.append("query=").append(query.toString()).append("&groupby=");
			for (Iterator<String> iter = fields.iterator(); iter.hasNext();) {
				String field = (String) iter.next();
				buffer.append(field);
				if (iter.hasNext()) {
					buffer.append(",");
				}
			}
			if (logger.isDebugEnabled()) {
				logger.debug("index refine (" + buffer.toString()
						+ ", all, DMF, index)");
			}
			
			epr = index.getBrowsingStatistics(
					buffer.toString(), "all", "DMF", "index");
				
			if (logger.isDebugEnabled()) {
				logger.debug("epr = " + epr);
			}
			
			time = System.currentTimeMillis() - time;
			logger.debug("index query lasted " + time + " msec");
			
		} catch (IndexServiceException ise) {
			logger.error("Error calling index.", ise);
			throw new SearchServiceException("Error calling index.", error);
			
		} catch (CqlException cqle) {
			logger.error("Bad CQL query.", cqle);
			throw new SearchServiceException("Error calling index.");
		}
	
		if (epr == null) {
			throw new SearchServiceException(
					"Index returned null result set id.");
		}
		
		return epr;
	}
	
	@Override
	public BrowseData browse(String prefix, String field)
			throws BrowseDataNotAvailableException, SearchServiceException {

		Map<String, BrowseData> data = browseData.get(field);
		if (data == null) {
			if (vocabularyNames.keySet().contains(field)) {
				throw new BrowseDataNotAvailableException("Browse for field '"
						+ field + "' is currenty being updated.");
			} else {
				throw new SearchServiceException(
						"Invalid browse field '" + field + "'.");
			}
		}
		
		if (prefix != null) {
			prefix = prefix.trim().toLowerCase();
			prefix = (prefix.length()==0) ? null : prefix;
		}
		BrowseData bd = data.get(prefix);
		return (bd == null) ? new BrowseData() : bd;
	}

	/* TODO: turn back on later
	public Object createEntry(Object key) throws Exception {
		
		Object entry = null;
		
		try {
			CacheKey ckey = (CacheKey) key;
			Query query = results.get(ckey.getBdId());
			if (logger.isDebugEnabled()) {
				logger.debug("Results cache miss for record: "
						+ ckey.getRecordNumber() + " -- rs: " + ckey.bdId);
			}
			
			// if createEntry is called, then ckey.getRecordNumber()
			// is not in cahce -- advance until it is found
			// ASSUMPTION: cache never deletes a page, everything
			//             previously received is available
			while (ckey.getRecordNumber() > query.getNumofRecords()) {
				
				if (logger.isDebugEnabled()) {
					logger.debug("Get next page of results ("
							+ "received: " + query.getNumofRecords() + ")");
				}
				
				List<String> page = query.getOperator().getNextPage();
				int count = query.getNumofRecords();
				
				if (page == null) {
					logger.debug("No page found, retry...");
					if (!query.getOperator().isOpen()) {
						break;
					}
					Thread.sleep(100);
					continue;
				} else {
					if (logger.isDebugEnabled()) {
						logger.debug("Found page of " + page.size() + " recs.");
					}
				}
				
				for (int i = 0; i < page.size(); i++) {
						
					CacheKey newKey = new CacheKey(query, ++count);
					Element newElement = new Element(newKey, page.get(i));
					resultsCache.put(newElement);
					if (logger.isDebugEnabled()) {
						logger.debug("Added to cache: "
								+ i + " -- " + page.get(i));
					}
				
					if (count == ckey.getRecordNumber()) {
						if (logger.isDebugEnabled()) {
							logger.debug("Found cache miss at position "
									+ i + " in page.");
						}
						entry = page.get(i);
					}
				}
				
				if (logger.isDebugEnabled()) {
					logger.debug("Update consumed records to: " + count);
				}
				query.setNumofRecords(count);
			}
			
		} catch (NullPointerException npe) {
			logger.warn("Null key passed to cache lookup.", npe);
			
		} catch (ClassCastException cce) {
			logger.warn("Invalid key class passed to cahce lookup.", cce);
		}
		
		if (logger.isDebugEnabled()) {
			logger.debug("Results cache miss returns: " + entry);
		}
		
		return entry;
	}

	protected String createQueryResultSet(Query query) {
		
		String epr = null;

		try {
			long time = System.currentTimeMillis();
			// TODO: what is the role of CreatePushRSType?  (null arg)
			epr = resultSetService.createPullRS(
					SEARCH_SERVICE, query.getBdId(),
					PAGE_SIZE, RESULT_SET_TIMEOUT, null);
			if (logger.isDebugEnabled()) {
				logger.debug("result set creation with epr " + epr);
			}
			
			query.setEpr(epr);
			
			time = System.currentTimeMillis() - time;
			if (logger.isDebugEnabled()) {
				logger.debug("result set creation lasted: " + (time / 1000.0)
						+ " sec.");
			}
			
		} catch (GenericFaultMessage gfm) {
			logger.warn("Cannot create query result set.", gfm);
		}

		return epr;
	}

	protected void dispatchQueryExecution(Query query) {
		
		long time  = System.currentTimeMillis();
		
		// follow static plan (b) <-- (a):
		//   (a) one index operator per index service
		//   (b) one merge operator
		
		IndexOperator[] indexes = new IndexOperator[indexClients.size()];
		for (int i = 0; i < indexes.length; i++) {
			indexes[i] = new IndexOperator(indexClients.get(i), query, 10);
			indexes[i].setRsFactory(rsFactory);
			indexes[i].start();
		}
		MergeOperator merge = new MergeOperator(indexes, query, 10);
		merge.start();

		time = System.currentTimeMillis() - time;
		if (logger.isDebugEnabled()) {
			logger.debug("operator schedule lasted: "
					+ (time / 1000.0) + " sec.");
		}
		
		query.setOperator(merge);
	}
	*/
	
	class VocabularyUpdateTask implements Runnable {
		
		public void run() {

			if (updateBrowseData) {
				logger.debug("Updating browse data");
				updateBrowseData = false;
				
				for (String field : vocabularyNames.keySet()) {
					try {
						String vocabularyName = vocabularyNames.get(field);
						VocabularyLoader loader = vocabularyLoaders.get(field);
						Vocabulary vocabulary = loader
								.getVocabulary(vocabularyName);
						Map<String, BrowseData> data = null;

						data = new HashMap<String, BrowseData>();
						// null key has value for all prefixes
						data.put(null, new BrowseData());
						String query = field + "=\"";

						if (logger.isDebugEnabled()) {
							logger.debug("start browse update for field '"
									+ field + "'...");
						}

						for (String encoding : vocabulary.getEncodings()) {
							String value = vocabulary.getEnglishName(encoding);
							String prefix = value.substring(0, 1).toLowerCase();

							if (logger.isDebugEnabled()) {
								logger.debug("query for '" + field + " = "
										+ encoding + "'...");
							}
							int count = countResults(query + encoding + "\"");

							if (logger.isDebugEnabled()) {
								logger.debug("found " + count + " docs for '"
										+ encoding + "'...");
							}
							if (data.get(prefix) == null) {
								data.put(prefix, new BrowseData());
							}
							data.get(prefix).addFieldEntry(field, encoding,
									count);
							data.get(null)
									.addFieldEntry(field, encoding, count);
						}
						if (logger.isDebugEnabled()) {
							logger.debug("completed browse update for field '"
									+ field + "'.");
						}

						browseData.put(field, data);
						if (logger.isDebugEnabled()) {
							logger.debug("updated browse data for field '"
									+ field + "'.");
						}

					} catch (ISLookUpException e) {
						logger.warn("Failed to load vocabulary for " + field);
					}
				}
			} else {
				logger.debug("No reason to update browse data. See you in " +
						browseUpdatePeriod + " minutes.");
			}
		}
		
		int countResults(String query) {
			int size = 0;
			
			try {
				EPR epr = search(query);
				ResultSet<String> resultset = rsFactory.createResultSet(epr);
				size = resultset.size();
			
			} catch (SearchServiceException e) {
				logger.warn("Browse Update query '"
						+ query + "' failed. Return 0.");
			}
			
			return size;
		}
	}
	
	class BrowseNotificationListener implements NotificationListener {

		public void processNotification(Notification notification) {
			logger.debug("Received notification for object of type: "
					+ notification.getResourceType());

			if (notification.getResourceType().equals(
					ResourceType.VOCABULARYDSRESOURCETYPE) || 
					notification.getResourceType().equals(
					ResourceType.REPOSITORYSERVICERESOURCETYPE) || 
					notification.getResourceType().equals(ResourceType.INDEXDSRESOURCETYPE)) {

				updateBrowseData = true;
			}
		}
	}
}
