package eu.dnetlib.data.hadoop.blackboard;

import java.io.StringReader;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import javax.annotation.Resource;

import eu.dnetlib.rmi.data.hadoop.ClusterName;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import org.springframework.beans.factory.annotation.Autowired;

import com.google.common.collect.Sets;

import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
import eu.dnetlib.rmi.data.hadoop.HadoopBlackboardActions;
import eu.dnetlib.rmi.data.hadoop.HadoopJobType;
import eu.dnetlib.rmi.data.hadoop.HadoopServiceException;
import eu.dnetlib.data.hadoop.utils.JobProfile;
import eu.dnetlib.data.hadoop.utils.ScanFactory;
import eu.dnetlib.rmi.enabling.ISLookUpDocumentNotFoundException;
import eu.dnetlib.rmi.enabling.ISLookUpException;
import eu.dnetlib.rmi.enabling.ISLookUpService;
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerAction;
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;

public abstract class AbstractHadoopAction implements BlackboardServerAction<HadoopBlackboardActions> {

	private static final Log log = LogFactory.getLog(AbstractHadoopAction.class); // NOPMD by marko on 11/24/08 5:02 PM

	/**
	 * Special I/O hdfs property names for which we support relative paths.
	 */
	public final static Set<String> HDFS_SPECIAL_PROPERTIES = Sets.newHashSet("mapred.input.dir", "mapred.output.dir");

	@Resource
	protected UniqueServiceLocator serviceLocator;

	@Autowired
	protected ConfigurationEnumerator configurationEnumerator;

	@Autowired
	private ScanFactory scanFactory;

	private final Executor executor = Executors.newCachedThreadPool();

	protected abstract void executeAsync(final BlackboardServerHandler handler, final BlackboardJob job) throws HadoopServiceException;

	@Override
	public void execute(final BlackboardServerHandler handler, final BlackboardJob job) {
		executor.execute(() -> {
			try {
				handler.ongoing(job);
				executeAsync(handler, job);
			} catch (final Throwable e) {
				log.error(ExceptionUtils.getStackTrace(e));
				handler.failed(job, e);
			}
		});
	}

	protected JobProfile loadISJobConfiguration(final String jobName, final Map<String, String> bbParams) throws HadoopServiceException {

		log.info("reading job configuration profile: " + jobName);

		try {
			final String profile = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(
					"/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='" + jobName + "']");
			return parseJobProfile(profile, bbParams);
		} catch (final ISLookUpDocumentNotFoundException e) {
			throw new HadoopServiceException("cannot find job profile: " + jobName, e);
		} catch (final ISLookUpException e) {
			throw new HadoopServiceException("unable to read job profile: " + jobName, e);
		}
	}

	private JobProfile parseJobProfile(final String profile, final Map<String, String> bbParams) throws HadoopServiceException {
		final JobProfile jobProfile = new JobProfile();
		try {
			final Document doc = new SAXReader().read(new StringReader(profile));

			log.debug("setting job description");
			jobProfile.setDescription(doc.valueOf("//DESCRIPTION"));

			log.debug("setting job name");
			jobProfile.setName(doc.valueOf("//HADOOP_JOB/@name"));

			log.debug("setting job type");
			jobProfile.setJobType(HadoopJobType.valueOf(doc.valueOf("//HADOOP_JOB/@type")));

			log.debug("setting job static configuration");
			for (final Object o : doc.selectNodes("//STATIC_CONFIGURATION/PROPERTY")) {
				final Node node = (Node) o;
				jobProfile.getJobDefinition().put(node.valueOf("./@key"), node.valueOf("./@value"));
			}

			log.debug("setting job required parameters");
			for (final Object o : doc.selectNodes("//JOB_INTERFACE/PARAM[./@required = 'true']")) {
				final Node node = (Node) o;
				jobProfile.getRequiredParams().add(node.valueOf("./@name"));
			}

			if (doc.selectSingleNode("//SCAN") != null) {
				jobProfile.setScanProperties(scanFactory.parseScanProperties(doc, bbParams));
			}

		} catch (final DocumentException e) {
			throw new HadoopServiceException("cannot parse job profile");
		}

		if (jobProfile.isEmpty()) throw new HadoopServiceException("job configuration is empty");

		return jobProfile;
	}

	protected Configuration getConf(final ClusterName clusterName) {
		return configurationEnumerator.get(clusterName);
	}

	protected boolean checkHdfsProperty(final Entry<String, String> e) {
		return HDFS_SPECIAL_PROPERTIES.contains(e.getKey()) && !e.getValue().isEmpty() && !e.getValue().startsWith("hdfs://");
	}

	public UniqueServiceLocator getServiceLocator() {
		return serviceLocator;
	}

}
