package eu.dnetlib.data.hadoop;

import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import eu.dnetlib.rmi.data.hadoop.ClusterName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Required;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import eu.dnetlib.data.hadoop.HadoopJob.Status;
import eu.dnetlib.rmi.data.hadoop.HadoopJobDescriptor;
import eu.dnetlib.rmi.data.hadoop.HadoopServiceException;
import eu.dnetlib.data.hadoop.utils.HadoopUtils;

public class JobRegistry {

	private static final Log log = LogFactory.getLog(JobRegistry.class); // NOPMD by marko on 11/24/08 5:02 PM

	private int maxJobs;

	private final BiMap<String, HadoopJob> jobs = HashBiMap.create();

	public String registerJob(HadoopJob hadoopJob) throws HadoopServiceException {

		if (jobs.containsValue(hadoopJob)) { return jobs.inverse().get(hadoopJob); }

		if (jobs.size() > getMaxJobs()) {
			removeOldestProcess();
		}

		jobs.put(hadoopJob.getId(), hadoopJob);
		log.info("Registered hadoop job " + hadoopJob.getId());
		hadoopJob.startMonitor();

		return hadoopJob.getId();
	}

	public Status getJobStatus(String id) {
		return findJob(id).getStatus();
	}

	public HadoopJob findJob(String id) {
		return jobs.get(id);
	}

	public void unregisterJob(String id) throws HadoopServiceException {

		if (!jobs.containsKey(id)) { throw new HadoopServiceException("unable to unregister job, could not find jobId in registry: " + id); }

		log.info("unregistering job: " + id);
		jobs.get(id).getJobMonitor().kill();
		jobs.remove(id);
	}

	private void removeOldestProcess() throws HadoopServiceException {
		Date oldDate = new Date();
		String oldId = null;

		for (Entry<String, HadoopJob> e : jobs.entrySet()) {
			final HadoopJob hadoopJob = e.getValue();

			if (hadoopJob.isComplete()) {
				final Date date = hadoopJob.getLastActivity();
				if (date.before(oldDate)) {
					oldDate = date;
					oldId = e.getKey();
				}
			}
		}

		if (oldId != null) {
			unregisterJob(oldId);
		}

	}

	public List<HadoopJobDescriptor> listJobs(ClusterName clusterName) {
		Map<String, HadoopJob> filtered = Maps.filterValues(jobs, HadoopUtils.filterByCluster(clusterName));
		return Lists.newArrayList(Iterables.transform(filtered.entrySet(), HadoopUtils.hadoopJobDescriptor()));
	}

	@Required
	public void setMaxJobs(final int maxJobs) {
		this.maxJobs = maxJobs;
	}

	public int getMaxJobs() {
		return maxJobs;
	}

}
