package eu.dnetlib.functionality.index.solr;

import java.io.IOException;
import java.util.UUID;

import javax.annotation.Resource;

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrServerException;
import org.dom4j.DocumentException;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleTrigger;
import org.z3950.zing.cql.CQLParseException;

import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.registry.ISRegistryDocumentNotFoundException;
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
import eu.dnetlib.enabling.tools.blackboard.AbstractBlackboardNotificationHandler;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
import eu.dnetlib.enabling.tools.blackboard.NotificationHandler;
import eu.dnetlib.functionality.index.solr.actors.BlackboardActorCallback;
import eu.dnetlib.functionality.index.solr.actors.ResultsetKeepAliveCallback;
import eu.dnetlib.functionality.index.solr.actors.ResultsetKeepAliveJob;

/**
 * @author claudio
 *
 */
public class SolrIndexNotificationHandler 
	extends AbstractBlackboardNotificationHandler<BlackboardServerHandler> 
	implements NotificationHandler {

	/**
	 * logger.
	 */
	private static final Log log = LogFactory.getLog(SolrIndexNotificationHandler.class); 

	/**
	 * index service.
	 */
	@Resource(name = "solrIndexService")
	private SolrIndexServiceImpl indexService;
	
	@Resource
	private JobDetail resultsetKeepaliveJob;
	
	/**
	 * Start deferred jobs, used to keep resultsets alive.
	 */
	@Resource(name = "keepAliveResultsetJobScheduler")
	private Scheduler jobScheduler;
	
	@Override
	protected void processJob(BlackboardJob job) {
		log.info("processing solr index job: " + job.getAction());
		
		final Action action = Action.valueOf(job.getAction());
		try {
			switch (action) {
			
			case CREATE: 
				performCreateIndex(job);
				break;
				
			case FEED:
				performFeedIndex(job);
				break;

			case DELETE:
				performDelete(job);
				break;
				
			case TRANSFORM:
				performTransform(job);
				break;				
				
			case IDENTIFY:
				job.getParameters().put(BBParam.SERVICE_ID, indexService.identify());
				break;
			
			default:
				job.setError("unsupported Action: " + action.name());
				log.warn("unsupported message action: " + action.name());
				throw new IllegalArgumentException("unsupported message action: " + action.name());				
			}
		} catch (Throwable e) {
			log.error(job.getAction() + " job set to FAILED ", e);
			getBlackboardHandler().failed(job, e);
		}
	}
	
	private void performTransform(final BlackboardJob job) throws ISLookUpException, IOException {
		getBlackboardHandler().ongoing(job);
		log.info("TRANSFORM job set to ONGOING");

		indexService.mergeIndex(job, newBBActorCallback(job));
	}

	private void performCreateIndex(final BlackboardJob job) throws Exception {
		getBlackboardHandler().ongoing(job);
		log.info("CREATE job set to ONGOING");

		final String profileId = indexService.createIndex(job, newBBActorCallback(job));
		job.getParameters().put("id", profileId);
	}
	
	private void performDelete(final BlackboardJob job) throws ISRegistryDocumentNotFoundException, ISLookUpException, ISRegistryException {
		getBlackboardHandler().ongoing(job);
		log.info("DELETE job set to ONGOING");

		indexService.deleteIndex(job, newBBActorCallback(job));
	}
	
	private void performFeedIndex(final BlackboardJob job) 
		throws SchedulerException, ISLookUpException, ISRegistryException, CQLParseException, IOException, SolrServerException, DocumentException {
		
		getBlackboardHandler().ongoing(job);
		log.info("FEED job set to ONGOING");

		JobDetail tmp = jobScheduler.getJobDetail(ResultsetKeepAliveJob.JOB_NAME, ResultsetKeepAliveJob.JOB_GROUP);
	
		if (tmp == null) {
			log.fatal("re-registering job detail");
			jobScheduler.addJob(resultsetKeepaliveJob, true);
		} 			
		final String epr = new String(Base64.decodeBase64(job.getParameters().get(BBParam.RS_EPR).getBytes()));
		final String triggerId = UUID.randomUUID().toString();
		
		log.debug("\n\n scheduling resultSet keepalive trigger: " + triggerId + "\n\n");
		jobScheduler.scheduleJob(getResultsetTrigger(epr, triggerId));
		
		indexService.feedIndex(job, newRSKeepAliveCallback(triggerId), newBBActorCallback(job));
	}

	/////////////////// helpers
	
	/**
	 * constructor for a blackboard callback to handle the job termination.
	 */
	private BlackboardActorCallback newBBActorCallback(final BlackboardJob job) {
		return new BlackboardActorCallback() {
			@Override
			public void setJobDone() {
				log.info(job.getAction() + " job set to DONE");
				getBlackboardHandler().done(job);
			}
			@Override
			public void setJobFailed(Throwable e) {
				log.error(job.getAction() + " job set to FAILED ", e);
				getBlackboardHandler().failed(job, e);
			}
		};
	}
	
	private ResultsetKeepAliveCallback newRSKeepAliveCallback(final String triggerId) {
		return new ResultsetKeepAliveCallback() {
			@Override
			public void unschedule() {
				try {
					log.info("\n\n unscheduling resultSet keepalive trigger: " + triggerId + "\n\n");
					jobScheduler.unscheduleJob(triggerId, ResultsetKeepAliveJob.JOB_GROUP);
				} catch (SchedulerException e) {
					log.warn("cannot unschedule RSKeepAlive triggerId: " + triggerId);
					throw new RuntimeException(e);
				}
			}
		};
	}
	
	private SimpleTrigger getResultsetTrigger(final String rsEpr, final String triggerId) {
		final long repeatDelay = 20000;
		final SimpleTrigger trigger = new SimpleTrigger(
				triggerId,
				ResultsetKeepAliveJob.JOB_GROUP,
                SimpleTrigger.REPEAT_INDEFINITELY,
                repeatDelay);
		trigger.getJobDataMap().put(BBParam.RS_EPR, rsEpr);
		trigger.setJobName(ResultsetKeepAliveJob.JOB_NAME);
		trigger.setJobGroup(ResultsetKeepAliveJob.JOB_GROUP);
		return trigger;
	}

}