package eu.dnetlib.index.action;

import java.util.Map;
import java.util.UUID;

import eu.dnetlib.enabling.resultset.client.ResultSetClient;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerAction;
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
import eu.dnetlib.index.actors.*;
import eu.dnetlib.index.feed.FeedMode;
import eu.dnetlib.rmi.common.ResultSet;
import eu.dnetlib.rmi.provision.IndexServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;

import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;

/**
 * The Class FeedIndexAction.
 */
public class FeedIndexAction extends AbstractIndexAction implements BlackboardServerAction<IndexAction> {

	/**
	 * The Constant log.
	 */
	private static final Log log = LogFactory.getLog(FeedIndexAction.class);

	/**
	 * The actor map.
	 */
	@Autowired
	private ActorMap actorMap;

	/**
	 * The feed actor factory.
	 */
	@Autowired
	private IndexFeedActorFactory feedActorFactory;

	/**
	 * The result set client factory.
	 */
	@Autowired
	private transient ResultSetClient resultSetClient;

	/**
	 * Start deferred jobs, used to keep resultsets alive.
	 */
	private Scheduler jobScheduler;

	/**
	 * ResultSet keep alive JobDetail.
	 */
	private transient JobDetail rsKeepaliveJob;

	/**
	 * ResultSet keepAlive trigger's repeat delay.
	 */

	private long repeatDelay;

	/**
	 * {@inheritDoc}
	 *
	 * @see BlackboardServerAction#execute(BlackboardServerHandler,
	 * BlackboardJob)
	 */
	@Override
	public void execute(final BlackboardServerHandler handler, final BlackboardJob job) throws Exception {
		handler.ongoing(job);
		log.info("FEED job set to ONGOING");

		final JobDetail tmp = getJobScheduler().getJobDetail(new JobKey(ResultsetKeepAliveJob.JOB_NAME, ResultsetKeepAliveJob.JOB_GROUP));
		final String epr = getEpr(job);
		final String triggerId = UUID.randomUUID().toString();
		final String dsId = getIndexDSId(job);
		final FeedMode feedMode = getFeedMode(job);
		final String backendId = getBackend(job);
		final boolean emptyResult = getEmptyResult(job);
		if (backendId == null) throw new IndexServiceException("No backend identifier information in CREATE message");

		if (tmp == null) {
			log.fatal("re-registering job detail");
			getJobScheduler().addJob(getRsKeepaliveJob(), true);
		}
		log.debug("\n\n scheduling resultSet keepalive trigger: " + triggerId + "\n\n");
		getJobScheduler().scheduleJob(getResultsetTrigger(epr, triggerId));
		if (!actorMap.hasActor(backendId)) {
			actorMap.addActor(backendId, feedActorFactory.newInstance());
		}

		final ResultSet<?> resultSet = ResultSet.fromJson(epr);

		final Iterable<String> records = resultSetClient.iter(resultSet, String.class);

		actorMap.getActor(backendId)
				.feedIndex(dsId, feedMode, records, newRSKeepAliveCallback(triggerId), newBBActorCallback(handler, job), backendId, emptyResult);
	}

	private boolean getEmptyResult(BlackboardJob job) {

		if (job.getParameters().containsKey("emptyResult")) {
			final String emptyResult = job.getParameters().get("emptyResult").toLowerCase().trim();
			return "true".equals(emptyResult);

		} else return false;
	}

	/**
	 * Constructor for triggers used by the resultSet keepAlive job.
	 *
	 * @param rsEpr     resultSet epr to keep alive.
	 * @param triggerId trigger identifier.
	 * @return a new org.quartz.SimpleTrigger instance.
	 */
	private SimpleTrigger getResultsetTrigger(final String rsEpr, final String triggerId) {

		final SimpleTrigger trigger = newTrigger()
				.withIdentity(triggerId, ResultsetKeepAliveJob.JOB_GROUP)
				.withSchedule(simpleSchedule().repeatForever().withIntervalInMilliseconds(getRepeatDelay())).forJob(getRsKeepaliveJob()).build();
		trigger.getJobDataMap().put(BBParam.RS_EPR, rsEpr);
		return trigger;
	}

	/**
	 * Constructor for a blackboard callback to handle the job termination.
	 *
	 * @param handler the handler
	 * @param job     the BB job.
	 * @return a new eu.dnetlib.functionality.index.solr.actors.BlackboardActorCallback
	 */
	private BlackboardActorCallback newBBActorCallback(final BlackboardServerHandler handler, final BlackboardJob job) {
		return new BlackboardActorCallback() {

			@Override
			public void setJobDone(final Map<String, String> params) {
				log.info(job.getAction() + " job set to DONE");
				job.getParameters().put("added", "" + params.get("added"));
				job.getParameters().put("skipped", "" + params.get("skipped"));
				job.getParameters().put("marked", "" + params.get("marked"));
				job.getParameters().put("time", "" + params.get("time"));
				handler.done(job);
			}

			@Override
			public void setJobFailed(final Throwable exception) {
				log.error(job.getAction() + " job set to FAILED ", exception);
				handler.failed(job, exception);
			}
		};
	}

	/**
	 * Constructor for a resultSet keepAlive callbacks.
	 *
	 * @param triggerId trigger identifier.
	 * @return a new eu.dnetlib.functionality.index.solr.actors.ResultsetKeepAliveCallback
	 */
	private ResultsetKeepAliveCallback newRSKeepAliveCallback(final String triggerId) {
		return () -> {
			try {
				log.info("\n\n unscheduling resultSet keepalive trigger: " + triggerId + "\n\n");
				jobScheduler.unscheduleJob(new TriggerKey(triggerId, ResultsetKeepAliveJob.JOB_GROUP));
			} catch (SchedulerException e) {
				log.warn("cannot unschedule RSKeepAlive triggerId: " + triggerId);
				throw new RuntimeException(e); // NOPMD
			}
		};
	}

	/**
	 * Gets the rs keepalive job.
	 *
	 * @return the rsKeepaliveJob
	 */
	public JobDetail getRsKeepaliveJob() {
		return rsKeepaliveJob;
	}

	/**
	 * Sets the rs keepalive job.
	 *
	 * @param rsKeepaliveJob the rsKeepaliveJob to set
	 */
	@Required
	public void setRsKeepaliveJob(final JobDetail rsKeepaliveJob) {
		this.rsKeepaliveJob = rsKeepaliveJob;
	}

	/**
	 * Gets the job scheduler.
	 *
	 * @return the jobScheduler
	 */
	public Scheduler getJobScheduler() {
		return jobScheduler;
	}

	/**
	 * Sets the job scheduler.
	 *
	 * @param jobScheduler the jobScheduler to set
	 */
	@Required
	public void setJobScheduler(final Scheduler jobScheduler) {
		this.jobScheduler = jobScheduler;
	}

	/**
	 * Gets the repeat delay.
	 *
	 * @return the repeatDelay
	 */
	public long getRepeatDelay() {
		return repeatDelay;
	}

	/**
	 * Sets the repeat delay.
	 *
	 * @param repeatDelay the repeatDelay to set
	 */
	@Required
	public void setRepeatDelay(final long repeatDelay) {
		this.repeatDelay = repeatDelay;
	}

}
