package eu.dnetlib.data.hadoop.action;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import eu.dnetlib.data.hadoop.HadoopJob;
import eu.dnetlib.data.hadoop.config.ClusterName;
import eu.dnetlib.data.hadoop.mapred.MapreduceJobMonitor;
import eu.dnetlib.data.hadoop.rmi.HadoopJobType.AdminJobType;
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
import eu.dnetlib.data.hadoop.utils.JobProfile;
import eu.dnetlib.data.hadoop.utils.ScanProperties;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.mapreduce.CopyTable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.Job;

/**
 * The Class SubmitAdminJobAction.
 */
public class SubmitAdminJobAction extends SubmitMapreduceJobAction {

    /**
     * logger.
     */
    private static final Log log = LogFactory.getLog(SubmitAdminJobAction.class); // NOPMD by marko on 11/24/08 5:02 PM

    /*
     * (non-Javadoc)
     *
     * @see
     * eu.dnetlib.data.hadoop.action.SubmitMapreduceJobAction#executeAsync(eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler,
     * eu.dnetlib.enabling.tools.blackboard.BlackboardJob)
     */
    @Override
    public void submit(final JobCompletion callback, final BlackboardJob bbJob, String jobName, JobProfile jobProfile) throws HadoopServiceException {
        final ClusterName clusterName = ClusterName.valueOf(bbJob.getParameters().get("cluster"));

        try {
            JobConf jobConf = prepareJob(getConf(clusterName), jobName, jobProfile, bbJob.getParameters());

            jobConf = initAdminJob(bbJob, jobName, jobProfile, jobConf);

            if (!hadoopClientMap.isMapreduceAvailable(clusterName))
                throw new HadoopServiceException("mapreduce not available for cluster: " + clusterName.toString());

            logJobDetails(jobConf);

            final RunningJob runningJob = hadoopClientMap.getJtClient(clusterName).submitJob(jobConf);
            final String jobId = newJobId(clusterName, runningJob.getID().getId());

            jobRegistry.registerJob(HadoopJob.newInstance(jobId, clusterName, jobProfile,
                    new MapreduceJobMonitor(runningJob, callback)));

        } catch (IOException e) {
            throw new HadoopServiceException("error executing hadoop job: " + jobName, e);
        }
    }

    private JobConf initAdminJob(final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile, final JobConf jobConf) throws IOException,
            HadoopServiceException {
        switch (AdminJobType.valueOf(jobName)) {
            case copytable:
                return copyTable(jobProfile, bbJob.getParameters(), jobConf);

            default:
                throw new HadoopServiceException("unknown admin job: " + jobName);
        }
    }

    /**
     * Builds a CopyTable job. Mimics
     * <p/>
     * <pre>
     * {@code
     * bin/hbase org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289
     * --peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable <code>
     *
     * Options:
     * rs.class     hbase.regionserver.class of the peer cluster,
     *              specify if different from current cluster
     * rs.impl      hbase.regionserver.impl of the peer cluster,
     * startrow     the start row
     * stoprow      the stop row
     * starttime    beginning of the time range (unixtime in millis)
     *              without endtime means from starttime to forever
     * endtime      end of the time range.  Ignored if no starttime specified.
     * versions     number of cell versions to copy
     * new.name     new table's name
     * peer.adr     Address of the peer cluster given in the format
     *              hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent
     * families     comma-separated list of families to copy
     *              To copy from cf1 to cf2, give sourceCfName:destCfName.
     *              To keep the same name, just give "cfName"
     * all.cells    also copy deleteFromHBase markers and deleted cells
     *
     * Args:
     * tablename    Name of the table to copy
     * }
     * </pre>
     *
     * @param jobProfile the job profile
     * @param parameters the parameters
     * @param jobConf    the job conf
     * @return the job conf
     * @throws IOException            Signals that an I/O exception has occurred.
     * @throws HadoopServiceException
     */
    private JobConf copyTable(final JobProfile jobProfile, final Map<String, String> parameters, final JobConf jobConf) throws IOException,
            HadoopServiceException {

        ScanProperties scan = jobProfile.getScanProperties();

        List<String> params = Lists.newArrayList();
        if (!scan.getFamilies().isEmpty()) {
            String families = "--families=" + Joiner.on(",").join(scan.getFamilies());
            log.debug("adding column families: " + families);
            params.add(families);
        }

        // copy on remote cluster?
        if (parameters.containsKey("peer.adr") && !StringUtils.equals(parameters.get("sourceCluster"), parameters.get("cluster"))) {
            String peerAdr = "--peer.adr=" + parameters.get("peer.adr");
            log.debug("adding peer address: " + peerAdr);
            params.add(peerAdr);
        }

        // sets the target table
        String targetTable = "--new.name=" + parameters.get("new.name");
        log.debug("adding target table: " + targetTable);
        params.add(targetTable);

        // sets the source table
        String sourceTable = parameters.get("hbase.mapreduce.inputtable");
        log.debug("adding source table: " + sourceTable);
        params.add(sourceTable);

        log.info("copy table params: " + params);

        final Job copyJob = CopyTable.createSubmittableJob(jobConf, Iterables.toArray(params, String.class));

        if (copyJob == null) throw new HadoopServiceException("invalid copytable parameters: " + params);

        final Configuration copyConf = copyJob.getConfiguration();

        return merge(jobConf, copyConf);
    }

    /**
     * Merge.
     *
     * @param jobConf  the job conf
     * @param copyConf the copy conf
     * @return the job conf
     */
    private JobConf merge(final JobConf jobConf, final Configuration copyConf) {
        for (Entry<String, String> e : copyConf) {
            jobConf.set(e.getKey(), e.getValue());
        }
        return jobConf;
    }

}
