package eu.dnetlib.data.hadoop;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import eu.dnetlib.enabling.tools.AbstractBaseService;
import eu.dnetlib.enabling.tools.blackboard.NotificationHandler;
import eu.dnetlib.rmi.data.hadoop.ClusterName;
import eu.dnetlib.rmi.data.hadoop.HadoopJobDescriptor;
import eu.dnetlib.rmi.data.hadoop.HadoopService;
import eu.dnetlib.rmi.data.hadoop.HadoopServiceException;
import eu.dnetlib.rmi.data.hadoop.hbase.HBaseRowDescriptor;
import org.apache.hadoop.conf.Configuration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;

/**
 * The Class HadoopServiceImpl.
 */
public class HadoopServiceImpl extends AbstractBaseService implements HadoopService {

	/**
	 * notification handler.
	 */
	private NotificationHandler notificationHandler;

	/** The hadoop service core. */
	@Autowired
	private HadoopServiceCore hadoopServiceCore;

	/** The job registry. */
	@Autowired
	private JobRegistry jobRegistry;

	/*
	 * (non-Javadoc)
	 *
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listAvailableJobs()
	 */
	@Override
	public List<String> listAvailableJobs() throws HadoopServiceException {
		List<String> res = Lists.newArrayList();
		return res;
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listJobs(java.lang.String)
	 */
	@Override
	public List<HadoopJobDescriptor> listJobs(final String clusterName) throws HadoopServiceException {
		return jobRegistry.listJobs(checkExists(clusterName));
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#killJob(java.lang.String)
	 */
	@Override
	public boolean killJob(final String jobId) throws HadoopServiceException {
		jobRegistry.unregisterJob(jobId);
		return true;
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see eu.dnetlib.enabling.tools.AbstractBaseService#notify(java.lang.String, java.lang.String, java.lang.String, java.lang.String)
	 */
	@Override
	public void notify(final String subscriptionId, final String topic, final String isId, final String message) {
		getNotificationHandler().notified(subscriptionId, topic, isId, message);
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listHbaseTables(java.lang.String)
	 */
	@Override
	public List<String> listHbaseTables(final String clusterName) throws HadoopServiceException {
		try {
			return hadoopServiceCore.listTables(checkExists(clusterName));
		} catch (IOException e) {
			throw new HadoopServiceException(e);
		}
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#ensureHbaseTable(java.lang.String, java.lang.String, java.util.Set)
	 */
	@Override
	public boolean ensureHbaseTable(final String clusterName, final String tableName, final Set<String> columns) throws HadoopServiceException {
		try {
			hadoopServiceCore.ensureTable(checkExists(clusterName), tableName, columns);
			return true;
		} catch (IOException e) {
			throw new HadoopServiceException(e);
		}
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#createHbaseTable(java.lang.String, java.lang.String, java.util.Set)
	 */
	@Override
	public boolean createHbaseTable(final String clusterName, final String tableName, final Set<String> columns) throws HadoopServiceException {
		try {
			hadoopServiceCore.createTable(checkExists(clusterName), tableName, columns);
			return true;
		} catch (IOException e) {
			throw new HadoopServiceException(e);
		}
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#createHbaseTable(java.lang.String, java.lang.String, java.util.String)
	 */
	@Override
	public boolean createConfiguredHbaseTable(final String clusterName, final String tableName, final String tableConfiguration) throws HadoopServiceException {
		try {
			hadoopServiceCore.createTable(checkExists(clusterName), tableName, tableConfiguration);
			return true;
		} catch (IOException e) {
			throw new HadoopServiceException(e);
		}
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#truncateHbaseTable(java.lang.String, java.lang.String)
	 */
	@Override
	public boolean truncateHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
		try {
			hadoopServiceCore.truncateTable(checkExists(clusterName), tableName);
		} catch (IOException e) {
			throw new HadoopServiceException(e);
		}
		return true;
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#dropHbaseTable(java.lang.String, java.lang.String)
	 */
	@Override
	public boolean dropHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
		try {
			hadoopServiceCore.dropTable(checkExists(clusterName), tableName);
		} catch (IOException e) {
			throw new HadoopServiceException(e);
		}
		return true;
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#existHbaseTable(java.lang.String, java.lang.String)
	 */
	@Override
	public boolean existHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
		try {
			return hadoopServiceCore.existTable(checkExists(clusterName), tableName);
		} catch (IOException e) {
			throw new HadoopServiceException(e);
		}
	}

	/*
	 * (non-Javadoc)
	 *
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#getClusterConfiguration(java.lang.String)
	 */
	@Override
	public Map<String, String> getClusterConfiguration(final String clusterName) throws HadoopServiceException {

		final Configuration conf = hadoopServiceCore.getClusterConiguration(checkExists(clusterName));
		final Map<String, String> res = Maps.newHashMap();
		for (Entry<String, String> e : conf) {
			res.put(e.getKey(), e.getValue());
		}

		return res;
	}

	@Override
	public boolean deleteHdfsPath(final String clusterName, final String path) throws HadoopServiceException {
		return hadoopServiceCore.deleteFromHdfs(checkExists(clusterName), path);
	}

	@Override
	public boolean existHdfsPath(final String clusterName, final String path) throws HadoopServiceException {
		return hadoopServiceCore.existHdfsPath(checkExists(clusterName), path);
	}

	@Override
	public boolean createHdfsDirectory(final String clusterName, final String path, final boolean force) throws HadoopServiceException {
		return hadoopServiceCore.createHdfsDir(checkExists(clusterName), path, force);
	}

	@Override
	public List<String> listClusters() throws HadoopServiceException {
		try {
			return ClusterName.asStringList();
		} catch (Throwable e) {
			throw new HadoopServiceException(e);
		}
	}

	@Override
	public List<String> describeHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
		try {
			return hadoopServiceCore.describeTable(checkExists(clusterName), tableName);
		} catch (IOException e) {
			throw new HadoopServiceException(e);
		}
	}

	@Override
	public HBaseRowDescriptor describeHBaseColumn(final String clusterName, final String tableName, final String rowKey) throws HadoopServiceException {
		try {
			return hadoopServiceCore.describeRow(checkExists(clusterName), tableName, rowKey);
		} catch (IOException e) {
			throw new HadoopServiceException(e);
		}
	}

	@Override
	public Map<String, HBaseRowDescriptor> describeHBaseColumns(final String clusterName, final String tableName, final List<String> rowKeys) throws HadoopServiceException {
		try {
			return hadoopServiceCore.describeRows(checkExists(clusterName), tableName, rowKeys);
		} catch (IOException e) {
			throw new HadoopServiceException(e);
		}
	}

	@Override
	public String describeHBaseTableConfiguration(final String clusterName, final String tableName) throws HadoopServiceException {
		try {
			return hadoopServiceCore.getHBaseTableDescriptor(checkExists(clusterName), tableName);
		} catch (IOException e) {
			throw new HadoopServiceException(e);
		}
	}

	@Override
	public boolean deleteHBaseColumn(final String clusterName, final String tableName, final HBaseRowDescriptor column) throws HadoopServiceException {
		try {
			hadoopServiceCore.deleteColumnsFromHBase(checkExists(clusterName), tableName, Lists.newArrayList(column));
			return true;
		} catch (IOException e) {
			throw new HadoopServiceException(e);
		}
	}

	@Override
	public boolean deleteHBaseColumns(final String clusterName, final String tableName, final List<HBaseRowDescriptor> column) throws HadoopServiceException {
		try {
			hadoopServiceCore.deleteColumnsFromHBase(checkExists(clusterName), tableName, column);
			return true;
		} catch (IOException e) {
			throw new HadoopServiceException(e);
		}
	}

	///////////////////

	/**
	 * Check exists.
	 *
	 * @param clusterName
	 *            the cluster name
	 * @return the cluster name
	 * @throws HadoopServiceException
	 *             the hadoop service exception
	 */
	private ClusterName checkExists(final String clusterName) throws HadoopServiceException {
		try {
			return ClusterName.valueOf(clusterName);
		} catch (final IllegalArgumentException e) {
			throw new HadoopServiceException("Invalid cluster name: " + clusterName);
		}
	}

	/**
	 * Gets the notification handler.
	 *
	 * @return the notification handler
	 */
	public NotificationHandler getNotificationHandler() {
		return notificationHandler;
	}

	/**
	 * Sets the notification handler.
	 *
	 * @param notificationHandler
	 *            the new notification handler
	 */
	@Required
	public void setNotificationHandler(final NotificationHandler notificationHandler) {
		this.notificationHandler = notificationHandler;
	}

}
