package eu.dnetlib.data.hadoop.blackboard;

import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;

import eu.dnetlib.rmi.data.hadoop.ClusterName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;

import eu.dnetlib.data.hadoop.HadoopJob;
import eu.dnetlib.data.hadoop.oozie.OozieJobMonitor;
import eu.dnetlib.rmi.data.hadoop.HadoopServiceException;
import eu.dnetlib.data.hadoop.utils.JobProfile;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;

public class SubmitOozieJobAction extends AbstractSubmitAction {

	private static final Log log = LogFactory.getLog(SubmitOozieJobAction.class); // NOPMD by marko on 11/24/08 5:02 PM

	@Override
	public void submit(final JobCompletion callback, final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile)
			throws HadoopServiceException {

		final ClusterName clusterName = ClusterName.valueOf(bbJob.getParameters().get("cluster"));

		try {
			final Properties jobConf = prepareJob(getConf(clusterName), jobName, jobProfile, bbJob.getParameters());
			log.debug("oozie job configuration:\n" + jobConf);

			if (!hadoopClientMap.isOozieAvailable(clusterName)) throw new HadoopServiceException("oozie not available for cluster: " + clusterName.toString());

			logJobDetails(jobConf);

			final OozieClient oozieClient = hadoopClientMap.getOozieClient(clusterName);
			final String internalId = oozieClient.run(jobConf);
			final String jobId = newJobId(clusterName, internalId);

			jobRegistry.registerJob(HadoopJob.newInstance(jobId, clusterName, jobProfile,
					new OozieJobMonitor(oozieClient, internalId, callback)));

		} catch (final OozieClientException e) {
			throw new HadoopServiceException("error executing hadoop job: " + jobName, e);
		}
	}

	private Properties prepareJob(final Configuration configuration, final String jobName, final JobProfile jobProfile, final Map<String, String> parameters) {

		log.info("creating job: " + jobName);

		final Properties p = new Properties();

		merge(p, configuration);
		merge(p, jobProfile.getJobDefinition().entrySet());
		merge(p, parameters.entrySet());

		return p;
	}

	private void merge(final Properties p, final Iterable<Entry<String, String>> entrySet) {
		for (final Entry<String, String> e : entrySet) {
			p.setProperty(e.getKey(), e.getValue());
		}
	}

	protected void logJobDetails(final Properties jobConf) {
		for (final Entry<?, ?> e : jobConf.entrySet()) {
			if (log.isDebugEnabled()) {
				log.debug("\n" + e.getKey().toString() + " : " + e.getValue().toString());
			}
		}
	}

}
