package eu.dnetlib.data.hadoop.blackboard;

import java.net.URI;
import java.util.List;
import java.util.Map;

import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import eu.dnetlib.data.hadoop.HadoopClientMap;
import eu.dnetlib.data.hadoop.HadoopIsClient;
import eu.dnetlib.data.hadoop.JobRegistry;
import eu.dnetlib.data.hadoop.utils.JobProfile;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
import eu.dnetlib.miscutils.datetime.DateUtils;
import eu.dnetlib.miscutils.functional.hash.Hashing;
import eu.dnetlib.rmi.data.hadoop.ClusterName;
import eu.dnetlib.rmi.data.hadoop.HadoopServiceException;
import eu.dnetlib.rmi.enabling.ISLookUpException;
import eu.dnetlib.rmi.enabling.ISLookUpService;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class AbstractSubmitAction extends AbstractHadoopAction {

	private static final Log log = LogFactory.getLog(AbstractSubmitAction.class); // NOPMD by marko on 11/24/08 5:02 PM

	@Autowired
	protected HadoopClientMap hadoopClientMap;

	@Autowired
	protected JobRegistry jobRegistry;

	@Autowired
	private HadoopIsClient isClient;

	protected abstract void submit(final JobCompletion callback, final BlackboardJob job, final String jobName, final JobProfile jobProfile)
			throws HadoopServiceException;

	@Override
	protected void executeAsync(final BlackboardServerHandler handler, final BlackboardJob job) throws HadoopServiceException {
		final String jobName = job.getParameters().get("job.name");
		final JobProfile jobProfile = loadISJobConfiguration(jobName, job.getParameters());

		validateJobParams(handler, job, jobName, jobProfile);

		if (!isSimulation(job)) {

			submit(newCompletionCallback(handler, job), job, jobName, jobProfile);
			updateJobStatus(jobName);
		} else {
			log.info(String.format("simulating job: '%s', done!", jobName));
			handler.done(job);
		}
	}

	private boolean isSimulation(final BlackboardJob job) {
		if (!job.getParameters().containsKey("simulation")) { return false; }
		final String sim = job.getParameters().get("simulation");
		return !StringUtils.isBlank(sim) && sim.equals("true");
	}

	protected void updateJobStatus(final String jobName) {
		incrementCumulativeRun(jobName);
		incrementRunningJobs(jobName);
		updateDate(jobName);
	}

	protected void incrementRunningJobs(final String jobName) {
		log.debug("increment #running jobs: " + jobName);
		updateCountElement(jobName, "RUNNING_INSTANCES", "+ 1");
	}

	protected void decrementRunningJobs(final String jobName) {
		log.debug("decrement #running jobs: " + jobName);
		updateCountElement(jobName, "RUNNING_INSTANCES", "- 1");
	}

	protected void incrementCumulativeRun(final String jobName) {
		log.debug("increment #cumulative runs for job: " + jobName);
		updateCountElement(jobName, "CUMULATIVE_RUN", "+ 1");
	}

	protected void updateDate(final String jobName) {
		log.info("increment last submission date for job: " + jobName);
		final String xquery =
				"for $x in collection('')/RESOURCE_PROFILE[" + ".//RESOURCE_TYPE/@value='HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='"
						+ jobName + "'] " + " return update value $x//LAST_SUBMISSION_DATE/@value with '" + DateUtils.now_ISO8601() + "' ";

		isClient.executeXUpdate(xquery);
	}

	private void updateCountElement(final String jobName, final String element, final String delta) {
		final String xquery =
				"let $x := //RESOURCE_PROFILE[" + ".//RESOURCE_TYPE/@value='HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='" + jobName
						+ "'], $tot := $x//STATUS/" + element + "/@value/number() " + delta + " return update replace $x//STATUS/" + element + " with <"
						+ element + " value='{$tot}' />";

		isClient.executeXUpdate(xquery);
	}

	protected String newJobId(final ClusterName clusterName, final Object internalId) {
		return "job-" + Hashing.md5(clusterName.toString() + internalId.toString());
	}

	protected String getDefaultLibPath(final String defaultFs) throws HadoopServiceException {
		try {
			final String libPath = queryForServiceProperty("defaultLibPath");
			final Path path = new Path(URI.create(defaultFs + libPath));

			return path.toString();
		} catch (ISLookUpException e) {
			throw new HadoopServiceException("unable to get default lib path", e);
		}
	}

	protected void validateJobParams(final BlackboardServerHandler handler, final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile)
			throws HadoopServiceException {
		if (!bbJob.getParameters().keySet().containsAll(jobProfile.getRequiredParams())) {
			String msg =
					"required parameter is missing for job: " + jobName + ", required params: " + jobProfile.getRequiredParams() + "\n\nmissing params: "
							+ Sets.difference(jobProfile.getRequiredParams(), bbJob.getParameters().keySet());
			log.error(msg);
			HadoopServiceException e = new HadoopServiceException(msg);
			handler.failed(bbJob, e);
			throw e;
		}
	}

	protected JobCompletion newCompletionCallback(final BlackboardServerHandler handler, final BlackboardJob bbJob) {
		final String jobName = bbJob.getParameters().get("job.name");
		return new JobCompletion() {

			@Override
			public void done(final Map<String, String> properties) {
				bbJob.getParameters().putAll(properties);
				log.info(jobName + " completed successfully");
				handler.done(bbJob);
				decrementRunningJobs(jobName);
			}

			@Override
			public void failed(final String msg, final Throwable e) {
				log.debug(msg);
				handler.failed(bbJob, e);
				decrementRunningJobs(jobName);
			}
		};
	}

	private String queryForServiceProperty(final String key) throws ISLookUpException {
		return getServiceConfigValue("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='HadoopServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='"
				+ key + "']/@value/string()");
	}

	private String getServiceConfigValue(final String xquery) throws ISLookUpException {
		log.debug("quering for service property: " + xquery);
		final List<String> urls = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery);
		if (urls == null || urls.size() != 1) { throw new IllegalStateException("unable to find unique service property, xquery: " + xquery); }
		return Iterables.getOnlyElement(urls);
	}

}
