package eu.dnetlib.data.hadoop;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import eu.dnetlib.rmi.data.hadoop.ClusterName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.mapred.JobClient;
import org.apache.oozie.client.OozieClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.context.annotation.Lazy;

import com.google.common.collect.Maps;
import com.google.gson.Gson;

import eu.dnetlib.data.hadoop.hbase.HBaseAdminFactory;
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory;
import eu.dnetlib.data.hadoop.mapreduce.JobClientFactory;

public class HadoopClientMap {

	private static final Log log = LogFactory.getLog(HadoopClientMap.class); // NOPMD by marko on 11/24/08 5:02 PM

	@Lazy
	@Autowired
	private JobClientFactory jobClientFactory;

	@Lazy
	@Autowired
	private OozieClientFactory oozieClientFactory;

	@Lazy
	@Autowired
	private HBaseAdminFactory hbaseAdminFactory;

	private int clientsInitTime;

	private Map<String, Map<String, String>> enabledClients = Maps.newHashMap();

	private final Map<ClusterName, HadoopClients> clients = Maps.newHashMap();

	private final ExecutorService executor = Executors.newSingleThreadExecutor();

	public void init() {
		log.info("clients conf: " + getEnabledClients());
		executor.execute(new Runnable() {

			@Override
			public void run() {
				for (final String name : enabledClients.keySet()) {
					doInit(name);
				}
			}
		});
	}

	private void doInit(final String name) {
		try {
			log.info("initializing clients for hadoop cluster: " + name);
			final ClusterName clusterName = ClusterName.valueOf(name);

			final Map<String, String> clientsConf = enabledClients.get(name);

			final JobClient jobClient = Boolean.valueOf(clientsConf.get("mapred")) ? getJobClientFactory().newInstance(name) : null;
			final OozieClient oozieClient = Boolean.valueOf(clientsConf.get("oozie")) ? getOozieClientFactory().newInstance(clusterName) : null;
			final HBaseAdmin hbaseAdmin = Boolean.valueOf(clientsConf.get("hbase")) ? getHbaseAdminFactory().newInstance(clusterName) : null;

			clients.put(clusterName, new HadoopClients(jobClient, oozieClient, hbaseAdmin));
		} catch (final Throwable e) {
			log.error("Error initializing hadoop client for cluster: " + name, e);
			throw new RuntimeException(e);
		}
	}

	public boolean waitClients() throws InterruptedException {
		return executor.awaitTermination(getClientsInitTime(), TimeUnit.SECONDS);
	}

	public JobClient getJtClient(final ClusterName name) {
		return getClients(name).getJtClient();
	}

	public boolean isMapreduceAvailable(final ClusterName name) {
		return getClients(name).isMapredAvailable();
	}

	public OozieClient getOozieClient(final ClusterName name) {
		return getClients(name).getOozieClient();
	}

	public boolean isOozieAvailable(final ClusterName name) {
		return getClients(name).isOozieAvailable();
	}

	public HBaseAdmin getHbaseAdmin(final ClusterName name) {
		return getClients(name).getHbaseAdmin();
	}

	public HadoopClients getClients(final ClusterName name) {
		final HadoopClients hadoopClients = clients.get(name);
		if (hadoopClients == null) throw new IllegalArgumentException("cluster " + name.toString() + " is currently disabled");
		return hadoopClients;
	}

	// //////////

	public String getEnabledClients() {
		return new Gson().toJson(enabledClients);
	}

	@Required
	@SuppressWarnings("unchecked")
	public void setEnabledClients(final String enabledClients) {
		this.enabledClients = new Gson().fromJson(enabledClients, Map.class);
	}

	public JobClientFactory getJobClientFactory() {
		return jobClientFactory;
	}

	public void setJobClientFactory(JobClientFactory jobClientFactory) {
		this.jobClientFactory = jobClientFactory;
	}

	public OozieClientFactory getOozieClientFactory() {
		return oozieClientFactory;
	}

	public void setOozieClientFactory(OozieClientFactory oozieClientFactory) {
		this.oozieClientFactory = oozieClientFactory;
	}

	public HBaseAdminFactory getHbaseAdminFactory() {
		return hbaseAdminFactory;
	}

	public void setHbaseAdminFactory(HBaseAdminFactory hbaseAdminFactory) {
		this.hbaseAdminFactory = hbaseAdminFactory;
	}

	public int getClientsInitTime() {
		return clientsInitTime;
	}

	@Required
	public void setClientsInitTime(int clientsInitTime) {
		this.clientsInitTime = clientsInitTime;
	}

}
