package eu.dnetlib.data.hadoop;

import java.util.Date;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import eu.dnetlib.rmi.data.hadoop.ClusterName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import eu.dnetlib.data.hadoop.blackboard.JobMonitor;
import eu.dnetlib.rmi.data.hadoop.HadoopJobDescriptor;
import eu.dnetlib.rmi.data.hadoop.HadoopServiceException;
import eu.dnetlib.data.hadoop.utils.JobProfile;

public class HadoopJob {

	private static final Log log = LogFactory.getLog(HadoopJob.class); // NOPMD by marko on 11/24/08 5:02 PM

	/**
	 * Defines the possible stati of an hadoop job.
	 */
	public static enum Status {
		PREP, RUNNING, SUCCEEDED, KILLED, FAILED, SUSPENDED, UNKNOWN
	}

	private final Executor executor = Executors.newSingleThreadExecutor();

	private final JobMonitor jobMonitor;

	private final JobProfile jobProfile;

	private final ClusterName clusterName;

	private final String id;

	public static HadoopJob newInstance(String id, ClusterName clusterName, JobProfile profile, JobMonitor jobMonitor) {
		return new HadoopJob(id, clusterName, profile, jobMonitor);
	}

	private HadoopJob(String id, ClusterName clusterName, JobProfile jobProfile, JobMonitor jobMonitor) {
		super();
		this.id = id;
		this.clusterName = clusterName;
		this.jobProfile = jobProfile;
		this.jobMonitor = jobMonitor;
	}

	public void startMonitor() {
		log.info("start monitoring for job: " + getId());
		executor.execute(jobMonitor);
	}

	public String getId() {
		return id;
	}

	public String getHadoopId() {
		return getJobMonitor().getHadoopId();
	}

	public JobMonitor getJobMonitor() {
		return jobMonitor;
	}

	public Status getStatus() {
		return Status.valueOf(getJobMonitor().getStatus());
	}

	public boolean isComplete() {
		Status status = getStatus();
		return status.equals(Status.SUCCEEDED) || status.equals(Status.FAILED) || status.equals(Status.KILLED);
	}

	public Date getStartTime() throws HadoopServiceException {
		return jobMonitor.getStartTime();
	}

	public Date getLastActivity() {
		return jobMonitor.getLastActivity();
	}

	public ClusterName getClusterName() {
		return clusterName;
	}

	public JobProfile getJobProfile() {
		return jobProfile;
	}

	public HadoopJobDescriptor asDescriptor() throws HadoopServiceException {
		return new HadoopJobDescriptor(getJobProfile().getName(), getJobProfile().getDescription(), getId(), getStatus().toString(), getStartTime().toString(),
				getLastActivity().toString(), getJobMonitor().getHadoopId(), getJobMonitor().getTrackerUrl());
	}

}
