package eu.dnetlib.functionality.notification.app;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.apache.log4j.Logger;

import eu.dnetlib.api.functionality.NotificationService;
import eu.dnetlib.api.functionality.NotificationServiceException;
import eu.dnetlib.clients.functionality.alert.ws.AlertWebService;
import eu.dnetlib.clients.functionality.alert.ws.AlertWebServiceException;
import eu.dnetlib.domain.functionality.NotificationEvent;
import eu.dnetlib.domain.functionality.NotificationQuery;
import eu.dnetlib.domain.functionality.NotificationResult;
import eu.dnetlib.domain.functionality.NotificationSchedule;
import eu.dnetlib.domain.functionality.NotificationSubscription;
import eu.dnetlib.domain.functionality.ObjectPage;
import eu.dnetlib.domain.functionality.ResultPage;
import eu.dnetlib.functionality.notification.executor.NotificationQueryExecutor;
import eu.dnetlib.functionality.notification.executor.NotificationQueryExecutorException;
import gr.uoa.di.driver.app.DriverServiceImpl;

/**
 * This bean implements notification service using a notification service transaction helper to manage queries and events and a scheduled executor service to execute queries periodically.
 * @author thanos@di.uoa.gr
 * @see eu.dnetlib.api.functionality.NotificationService
 * @see eu.dnetlib.domain.functionality.NotificationQuery
 * @see eu.dnetlib.domain.functionality.NotificationEvent
 * @see eu.dnetlib.functionality.notification.dao.NotificationDAO
 * @see eu.dnetlib.api.functionality.NotificationServiceException
 *
 */
public class NotificationServiceImpl extends DriverServiceImpl implements NotificationService {
	private static final Logger logger = Logger.getLogger(NotificationServiceImpl.class);
	
	private NotificationServiceTransactionHelper transactionHelper;
	private final Map<String, NotificationQueryExecutor> executors;
	private final SortedSet<String> supportedQueryLanguages;
	private final Map<String, ScheduledFuture<?>> schedules;
	private ScheduledExecutorService scheduler;
	private JaxWsProxyFactoryBean serviceFactory;
	private Map<URL, AlertWebService> alertServices;
	private int pageSize;
	
	/**
	 * Construct a new notification service implementation.
	 */
	public NotificationServiceImpl() {
		executors = new HashMap<String, NotificationQueryExecutor>();
		supportedQueryLanguages = new TreeSet<String>();
		schedules = new HashMap<String, ScheduledFuture<?>>();
		serviceFactory = new JaxWsProxyFactoryBean();
		serviceFactory.setServiceClass(AlertWebService.class);
		alertServices = new HashMap<URL, AlertWebService>();
	}
	
	/**
	 * Set the transaction helper.
	 * @param transactionHelper the transaction helper to use to manage queries and events
	 */
	public void setTransactionHelper(final NotificationServiceTransactionHelper transactionHelper) {
		this.transactionHelper = transactionHelper;
	}
	
	/**
	 * Set the thread pool size.
	 * @param threadPoolSize the thread pool size of the underlying scheduled executor service
	 */
	public void setThreadPoolSize(final int threadPoolSize) {
		scheduler = Executors.newScheduledThreadPool(threadPoolSize);
	}
	
	/**
	 * Set the page size.
	 * @param pageSize the page size to use
	 */
	public void setPageSize(final int pageSize) {
		this.pageSize = pageSize;
	}
	
	/**
	 * Set the notification query executors.
	 * @param executors a set containing notification service executors to use
	 */
	public void setExecutors(final Set<NotificationQueryExecutor> executors) {
		for (NotificationQueryExecutor executor : executors) {
			supportedQueryLanguages.addAll(executor.getSupportedQueryLanguages());
			for (String queryLanguage : executor.getSupportedQueryLanguages())
				this.executors.put(queryLanguage, executor);
		}
	}
	
	@Override
	public void init() {
		super.init();
		try {
			for (int offset = 0; ; offset += pageSize) {
				final SortedSet<NotificationSchedule> schedules = transactionHelper.getEnabledSchedules(pageSize, offset);
				for (NotificationSchedule schedule : schedules) {
					try {
						final NotificationQuery query = transactionHelper.getQuery(schedule.getQueryId());
						if (executors.containsKey(query.getQueryLanguage())) { // query language is supported, restore schedule execution
							schedule(schedule.getQueryId(), schedule.getExecutionPeriod());
							logger.info("Restored execution of schedule " + schedule.getQueryId());
						} else { // query language is not supported, disable query
							transactionHelper.disableSchedule(schedule.getQueryId());
							logger.info("Disabled schedule " + schedule.getQueryId() + " because query language " + query.getQueryLanguage() + " is not supported");
						}
					} catch (final NotificationServiceException e) {
						logger.warn("Error restoring execution of schedule " + schedule.getQueryId());
					}
				}
				if (schedules.size() < pageSize)
					break;
			}
			logger.info("Notification service initialization complete");
		} catch (final NotificationServiceException e) {
			logger.warn("Error initializing notification service", e);
		}
	}
	
	@Override
	public SortedSet<String> getSupportedQueryLanguages() {
		return supportedQueryLanguages;
	}
	
	@Override
	public ObjectPage<NotificationQuery> getQueries(final int pageNumber, final int pageSize) throws NotificationServiceException {
		return transactionHelper.getQueries(pageNumber, pageSize);
	}
	
	@Override
	public void addQuery(final NotificationQuery query) throws NotificationServiceException {
		transactionHelper.addQuery(query);
	}
	
	@Override
	public void removeQuery(final String queryId) throws NotificationServiceException {
		transactionHelper.removeQuery(queryId);
	}
	
	@Override
	public ObjectPage<NotificationSchedule> getSchedules(final int pageNumber, final int pageSize) throws NotificationServiceException {
		return transactionHelper.getSchedules(pageNumber, pageSize);
	}
	
	@Override
	public void addSchedule(final NotificationSchedule schedule) throws NotificationServiceException {
		transactionHelper.addSchedule(schedule);
	}
	
	@Override
	public void enableSchedule(final String queryId) throws NotificationServiceException {
		final NotificationSchedule schedule = transactionHelper.enableSchedule(queryId);
		schedule(queryId, schedule.getExecutionPeriod());
	}
	
	@Override
	public void disableSchedule(final String queryId) throws NotificationServiceException {
		transactionHelper.disableSchedule(queryId);
		cancel(queryId);
	}
	
	@Override
	public void removeSchedule(final String queryId) throws NotificationServiceException {
		transactionHelper.removeSchedule(queryId);
	}
	
	@Override
	public ObjectPage<NotificationEvent> getEvents(final int pageNumber, final int pageSize) throws NotificationServiceException {
		return transactionHelper.getEvents(pageNumber, pageSize);
	}
	
	@Override
	public ObjectPage<NotificationResult> getResults(final int pageNumber, final int pageSize) throws NotificationServiceException {
		return transactionHelper.getResults(pageNumber, pageSize);
	}
	
	@Override
	public NotificationResult getResult(final String queryId, final Date date, final String resultId) throws NotificationServiceException {
		return transactionHelper.getResult(queryId, date, resultId);
	}
	
	@Override
	public NotificationResult getPreviousResult(final String queryId, final Date date, final String resultId) throws NotificationServiceException {
		return transactionHelper.getPreviousResult(queryId, date, resultId);
	}
	
	@Override
	public ObjectPage<NotificationSubscription> getSubscriptions(final int pageNumber, final int pageSize) throws NotificationServiceException {
		return transactionHelper.getSubscriptions(pageNumber, pageSize);
	}
	
	@Override
	public void addSubscription(final NotificationSubscription subscription) throws NotificationServiceException {
		transactionHelper.addSubscription(subscription);
	}
	
	@Override
	public void enableSubscription(final String queryId, final URL alertService) throws NotificationServiceException {
		transactionHelper.enableSubscription(queryId, alertService);
	}
	
	@Override
	public void disableSubscription(final String queryId, final URL alertService) throws NotificationServiceException {
		transactionHelper.disableSubscription(queryId, alertService);
	}
	
	@Override
	public void removeSubscription(final String queryId, final URL alertService) throws NotificationServiceException {
		transactionHelper.removeSubscription(queryId, alertService);
	}
	
	@Override
	public ResultPage executeQuery(final String queryId, final String resultId, final Date fromDate, final Date toDate, final int limit, final int offset) throws NotificationServiceException {
		try {
			final NotificationQuery query = transactionHelper.getQuery(queryId);
			if (query == null) { // query does not exist
				cancel(queryId); // just in case
				throw new NotificationServiceException("error executing query " + queryId + ": query does not exist");
			}
			final NotificationQueryExecutor executor = executors.get(query.getQueryLanguage());  
			if (executor == null) { // unsupported query language
				cancel(queryId); // just in case
				throw new NotificationServiceException("error executing query " + queryId + ": query language " + query.getQueryLanguage() + " is not supported");
			}
			return executor.execute(query, resultId, fromDate, toDate, limit, offset);
		} catch (final NotificationQueryExecutorException e) {
			throw new NotificationServiceException("error executing query " + queryId, e);
		}
	}
	
	private void schedule(final String queryId, final long executionPeriod) {
		schedules.put(queryId, scheduler.scheduleAtFixedRate(new Runnable() {
					@Override
					public void run() {
						try {
							final NotificationQuery query = transactionHelper.getQuery(queryId);
							if (query == null) { // query does not exist
								cancel(queryId); // just in case
								logger.warn("Error executing query " + queryId + ": query does not exist");
								return;
							}
							final NotificationSchedule schedule = transactionHelper.getSchedule(queryId);
							if (schedule == null) { // schedule does not exist
								cancel(queryId); // just in case
								logger.warn("Error executing query " + queryId + ": schedule does not exist");
								return;
							}
							if (!schedule.isEnabled()) { // schedule is disabled
								cancel(queryId); // just in case
								logger.warn("Error executing query " + queryId + ": schedule is disabled");
								return;
							}
							final NotificationQueryExecutor executor = executors.get(query.getQueryLanguage());  
							if (executor == null) { // unsupported query language
								cancel(queryId); // just in case
								logger.warn("Error executing query " + queryId + ": query language " + query.getQueryLanguage() + " is not supported");
								return;
							}
							final NotificationEvent event = executor.execute(query, schedule);
							if (event == null) {
								logger.info("Query " + queryId + " did not generate an event");
								return;
							}
							logger.info("Query " + queryId + " generated event " + event);
							final URL notificationService = new URL(NotificationServiceImpl.this.getServiceEPR().getAddress());
							for (int offset = 0; ; offset += pageSize) { // alert enabled subscriptions
								final SortedSet<NotificationSubscription> subscriptions = transactionHelper.getEnabledSubscriptions(event.getQueryId(), pageSize, offset);
								for (NotificationSubscription subscription : subscriptions) {
									try {
										AlertWebService alertService = alertServices.get(subscription.getAlertService());
										if (alertService == null) { // not yet cached
											serviceFactory.setAddress(subscription.getAlertService().toString());
											alertService = (AlertWebService) serviceFactory.create();
											if (alertService == null) // error connecting
												throw new NotificationServiceException("error connecting to alert service " + subscription.getAlertService());
											alertServices.put(subscription.getAlertService(), alertService);
										}
										alertService.alert(notificationService, event);
										logger.info("Alerted subscription " + subscription + " about event " + event);
									} catch (final AlertWebServiceException e) {
										logger.warn("Error alerting subscription " + subscription + " about event " + event, e);
									} catch (final NotificationServiceException e) {
										logger.warn("Error alerting subscription " + subscription + " about event " + event, e);
									}
								}
								if (subscriptions.size() < pageSize)
									break;
							}
						} catch (final MalformedURLException e) {
							logger.warn("Error executing query " + queryId, e);
						} catch (final NotificationQueryExecutorException e) {
							logger.warn("Error executing query " + queryId, e);
						} catch (final NotificationServiceException e) {
							logger.warn("Error executing query " + queryId, e);
						}
					}
				}, 0L, executionPeriod, TimeUnit.MINUTES));
	}

	private void cancel(final String queryId) {
		if (schedules.get(queryId) != null)
			schedules.get(queryId).cancel(false);
	}
}
