package eu.dnetlib.data.hadoop.blackboard;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import eu.dnetlib.data.hadoop.HadoopJob;
import eu.dnetlib.data.hadoop.mapreduce.MapreduceJobMonitor;
import eu.dnetlib.rmi.data.hadoop.ClusterName;
import eu.dnetlib.rmi.data.hadoop.HadoopJobType.AdminJobType;
import eu.dnetlib.rmi.data.hadoop.HadoopServiceException;
import eu.dnetlib.data.hadoop.utils.JobProfile;
import eu.dnetlib.data.hadoop.utils.ScanProperties;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.mapreduce.CopyTable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.Job;

/**
 * The Class SubmitAdminJobAction.
 */
public class SubmitAdminJobAction extends SubmitMapreduceJobAction {

	/**
	 * logger.
	 */
	private static final Log log = LogFactory.getLog(SubmitAdminJobAction.class); // NOPMD by marko on 11/24/08 5:02 PM

	/**
	 * (non-Javadoc)
	 *
	 * @see
	 * eu.dnetlib.data.hadoop..blackboard.SubmitMapreduceJobAction#executeAsync(eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler,
	 * eu.dnetlib.enabling.tools.blackboard.BlackboardJob)
	 */
	@Override
	public void submit(final JobCompletion callback, final BlackboardJob bbJob, String jobName, JobProfile jobProfile) throws HadoopServiceException {
		final ClusterName clusterName = ClusterName.valueOf(bbJob.getParameters().get("cluster"));

		try {
			JobConf jobConf = prepareJob(getConf(clusterName), jobName, jobProfile, bbJob.getParameters());

			jobConf = initAdminJob(bbJob, jobName, jobProfile, jobConf);

			if (!hadoopClientMap.isMapreduceAvailable(clusterName))
				throw new HadoopServiceException("mapreduce not available for cluster: " + clusterName.toString());

			logJobDetails(jobConf);

			final RunningJob runningJob = hadoopClientMap.getJtClient(clusterName).submitJob(jobConf);
			final String jobId = newJobId(clusterName, runningJob.getID().getId());

			jobRegistry.registerJob(HadoopJob.newInstance(jobId, clusterName, jobProfile,
					new MapreduceJobMonitor(runningJob, callback)));

		} catch (IOException e) {
			throw new HadoopServiceException("error executing hadoop job: " + jobName, e);
		}
	}

	private JobConf initAdminJob(final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile, final JobConf jobConf) throws IOException,
			HadoopServiceException {
		switch (AdminJobType.valueOf(jobName)) {
		case copytable:
			return copyTable(jobProfile, bbJob.getParameters(), jobConf);

		default:
			throw new HadoopServiceException("unknown admin job: " + jobName);
		}
	}

	/**
	 * Builds a CopyTable job. Mimics
	 * <p/>
	 * <pre>
	 * {@code
	 * bin/hbase org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289
	 * --peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable <code>
	 *
	 * Options:
	 * rs.class     hbase.regionserver.class of the peer cluster,
	 *              specify if different from current cluster
	 * rs.impl      hbase.regionserver.impl of the peer cluster,
	 * startrow     the start row
	 * stoprow      the stop row
	 * starttime    beginning of the time range (unixtime in millis)
	 *              without endtime means from starttime to forever
	 * endtime      end of the time range.  Ignored if no starttime specified.
	 * versions     number of cell versions to copy
	 * new.name     new table's name
	 * peer.adr     Address of the peer cluster given in the format
	 *              hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent
	 * families     comma-separated list of families to copy
	 *              To copy from cf1 to cf2, give sourceCfName:destCfName.
	 *              To keep the same name, just give "cfName"
	 * all.cells    also copy deleteFromHBase markers and deleted cells
	 *
	 * Args:
	 * tablename    Name of the table to copy
	 * }
	 * </pre>
	 *
	 * @param jobProfile the job profile
	 * @param parameters the parameters
	 * @param jobConf    the job conf
	 * @return the job conf
	 * @throws IOException            Signals that an I/O exception has occurred.
	 * @throws HadoopServiceException
	 */
	private JobConf copyTable(final JobProfile jobProfile, final Map<String, String> parameters, final JobConf jobConf) throws IOException,
			HadoopServiceException {

		ScanProperties scan = jobProfile.getScanProperties();

		List<String> params = Lists.newArrayList();
		if (!scan.getFamilies().isEmpty()) {
			String families = "--families=" + Joiner.on(",").join(scan.getFamilies());
			log.debug("adding column families: " + families);
			params.add(families);
		}

		// copy on remote cluster?
		if (parameters.containsKey("peer.adr") && !StringUtils.equals(parameters.get("sourceCluster"), parameters.get("cluster"))) {
			String peerAdr = "--peer.adr=" + parameters.get("peer.adr");
			log.debug("adding peer address: " + peerAdr);
			params.add(peerAdr);
		}

		// sets the target table
		String targetTable = "--new.name=" + parameters.get("new.name");
		log.debug("adding target table: " + targetTable);
		params.add(targetTable);

		// sets the source table
		String sourceTable = parameters.get("hbase.mapreduce.inputtable");
		log.debug("adding source table: " + sourceTable);
		params.add(sourceTable);

		log.info("copy table params: " + params);

		final Job copyJob = CopyTable.createSubmittableJob(jobConf, Iterables.toArray(params, String.class));

		if (copyJob == null) throw new HadoopServiceException("invalid copytable parameters: " + params);

		final Configuration copyConf = copyJob.getConfiguration();

		return merge(jobConf, copyConf);
	}

	/**
	 * Merge.
	 *
	 * @param jobConf  the job conf
	 * @param copyConf the copy conf
	 * @return the job conf
	 */
	private JobConf merge(final JobConf jobConf, final Configuration copyConf) {
		for (Entry<String, String> e : copyConf) {
			jobConf.set(e.getKey(), e.getValue());
		}
		return jobConf;
	}

}
