package eu.dnetlib.data.hadoop;

import java.io.IOException;
import java.util.Map;

import com.google.common.collect.Maps;
import com.google.gson.Gson;
import eu.dnetlib.data.hadoop.config.ClusterName;
import eu.dnetlib.data.hadoop.hbase.HBaseAdminFactory;
import eu.dnetlib.data.hadoop.mapred.JobClientFactory;
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory;
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.mapred.JobClient;
import org.apache.oozie.client.OozieClient;
import org.springframework.beans.factory.annotation.Required;


public class HadoopClientMap {

	private static final Log log = LogFactory.getLog(HadoopClientMap.class); // NOPMD by marko on 11/24/08 5:02 PM

	private JobClientFactory jobClientFactory;

	private OozieClientFactory oozieClientFactory;

	private HBaseAdminFactory hbaseAdminFactory;

	private Map<String, Map<String, String>> enabledClients = Maps.newHashMap();

	public boolean isMapreduceAvailable(final ClusterName name) {
		return isClientAvailable(name, "mapred");
	}

	public boolean isOozieAvailable(final ClusterName name) {
		return isClientAvailable(name, "oozie");
	}

	public boolean isHbaseAvailable(final ClusterName name) {
		return isClientAvailable(name, "hbase");
	}

	private boolean isClientAvailable(final ClusterName name, final String clientName) {
		final String clusterName = name.toString();
		return enabledClients.containsKey(clusterName) && "true".equals(enabledClients.get(clusterName).get(clientName));
	}

	public JobClient getJtClient(final ClusterName clusterName, final String username) throws HadoopServiceException, IOException {
		if (!isMapreduceAvailable(clusterName)) {
			throw new HadoopServiceException("jobtracker is not available for cluster " + clusterName.toString());
		}
		return getJobClientFactory().newInstance(clusterName, username);
	}

	public JobClient getJtClient(final ClusterName clusterName) throws HadoopServiceException, IOException {
		if (!isMapreduceAvailable(clusterName)) {
			throw new HadoopServiceException("jobtracker is not available for cluster " + clusterName.toString());
		}
		return getJobClientFactory().newInstance(clusterName);
	}

	public OozieClient getOozieClient(final ClusterName name) throws HadoopServiceException {
		if (!isOozieAvailable(name)) {
			throw new HadoopServiceException("oozie is not available for cluster " + name.toString());
		}
		return getOozieClientFactory().newInstance(name);
	}

	public HBaseAdmin getHbaseAdmin(final ClusterName name) throws HadoopServiceException {
		if (!isHbaseAvailable(name)) {
			throw new HadoopServiceException("hbase is not available for cluster " + name.toString());
		}
		return getHbaseAdminFactory().newInstance(name);
	}

	// //////////

	public String getEnabledClients() {
		return new Gson().toJson(enabledClients);
	}

	@Required
	@SuppressWarnings("unchecked")
	public void setEnabledClients(final String enabledClients) {
		this.enabledClients = new Gson().fromJson(enabledClients, Map.class);
	}

	public JobClientFactory getJobClientFactory() {
		return jobClientFactory;
	}

	@Required
	public void setJobClientFactory(final JobClientFactory jobClientFactory) {
		this.jobClientFactory = jobClientFactory;
	}

	public OozieClientFactory getOozieClientFactory() {
		return oozieClientFactory;
	}

	@Required
	public void setOozieClientFactory(final OozieClientFactory oozieClientFactory) {
		this.oozieClientFactory = oozieClientFactory;
	}

	public HBaseAdminFactory getHbaseAdminFactory() {
		return hbaseAdminFactory;
	}

	@Required
	public void setHbaseAdminFactory(final HBaseAdminFactory hbaseAdminFactory) {
		this.hbaseAdminFactory = hbaseAdminFactory;
	}

}
