package eu.dnetlib.actionmanager.blackboard;

import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.annotation.Resource;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import eu.dnetlib.actionmanager.is.ISClient;
import eu.dnetlib.actionmanager.rmi.ActionManagerException;
import eu.dnetlib.actionmanager.set.ActionManagerSet;
import eu.dnetlib.actionmanager.set.RawSet;
import eu.dnetlib.data.hadoop.config.ClusterName;
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions;
import eu.dnetlib.data.hadoop.rmi.HadoopService;
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
import eu.dnetlib.enabling.tools.blackboard.BlackboardClientHandler;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJobRegistry;
import eu.dnetlib.miscutils.datetime.DateUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.beans.factory.annotation.Value;

public class JobLauncher {

	public static final String ALL_SETS = "__ALL__";
	public static final String SEMICOLON = ";";
	public static final String COLON = ",";
	public static final String SEQFILE_INPUTFORMAT = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat";
	private final static String UPDATE_ACTION_PROFILE = "for $x in "
			+ "/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'ActionManagerSetDSResourceType' and .//SET/@id = '%s'] "
			+ "return update delete $x//RAW_SETS/EXPIRED[@id = '%s']";
	private static final Log log = LogFactory.getLog(JobLauncher.class); // NOPMD by marko on 11/24/08 5:02 PM
	private final Executor executor = Executors.newCachedThreadPool();

	@Resource
	private UniqueServiceLocator serviceLocator;

	private ISClient infomationServiceClient;

	/**
	 * blackboard handler.
	 */
	@Resource
	private BlackboardClientHandler blackboardClientHandler;

	/**
	 * blackboard job registry.
	 */
	@Resource
	private BlackboardJobRegistry jobRegistry;

	@Autowired
	private ISClient isClient;

	@Value("${services.actionmanager.promote.mapper.class}")
	private String promoteJobMapperClass;

	private String actionTable;
	private String dataTable;

	public void executePromoteFromHDFSJob(final Set<String> sets, final JobCallback callback) throws ActionManagerException, ISLookUpException {
		log.info("Starting commit m/r job; sets=" + sets);

		final List<String> paths = Lists.newArrayList();
		if (sets == null || sets.isEmpty()) {

			for (ActionManagerSet set : infomationServiceClient.listSets()) {
				paths.add(set.getPathToLatest());
			}

		} else {
			for (String setId : sets) {
				if (infomationServiceClient.existsSet(setId)) {
					final ActionManagerSet set = infomationServiceClient.getSet(setId);
					paths.add(set.getPathToLatest());
				} else {
					log.error("Invalid set " + setId);
					throw new ActionManagerException("Invalid set " + setId);
				}
			}
		}

		final HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
		final List<String> existingPaths = Lists.newArrayList(Iterables.filter(paths, new Predicate<String>(){
			@Override
			public boolean apply(final String path) {

				try {
					final boolean exist = hadoopService.existHdfsPath(ClusterName.DM.toString(), path);
					if (!exist) {
						log.warn(String.format("path '%s' doesn't exist on DM cluster'", path));
					}
					return exist;
				} catch (HadoopServiceException e) {
					log.error(e);
					return false;
				}
			}
		}));

		final Map<String, String> params = Maps.newHashMap();
		params.put("mapred.input.dir.formats", Joiner.on(COLON).join(Iterables.transform(existingPaths, new Function<String, String>() {
			@Override
			public String apply(final String path) {
				return path + SEMICOLON + SEQFILE_INPUTFORMAT;
			}
		})));
		params.put("mapred.input.dir.mappers", Joiner.on(COLON).join(Iterables.transform(existingPaths, new Function<String, String>() {
			@Override
			public String apply(final String path) {
				return path + SEMICOLON + promoteJobMapperClass;
			}
		})));

		params.put("hbase.mapred.outputtable", dataTable);
		params.put("hbase.mapreduce.outputtable", dataTable);

		log.info("promoting HDFS rawsets: " + existingPaths);

		executeHDFS("promoteMultipleActionSetsJob", params, callback);
	}

	private void executeHDFS(final String jobName, final Map<String, String> params, final JobCallback callback) throws ActionManagerException {

		if (params == null || params.isEmpty()) { throw new ActionManagerException("Missing HDFS paths"); }

		executor.execute(new Runnable() {

			@Override
			public void run() {
				try {
					final String serviceId = findHadoopServiceProfileID();
					final BlackboardJob bbJob = blackboardClientHandler.newJob(serviceId);

					bbJob.setAction(HadoopBlackboardActions.SUBMIT_MAPREDUCE_JOB.toString());
					bbJob.getParameters().put("job.name", jobName);
					bbJob.getParameters().put("cluster", ClusterName.DM.toString());
					bbJob.getParameters().putAll(params);

					jobRegistry.registerJobListener(bbJob, new BlackboardJobListener(callback));
					blackboardClientHandler.assign(bbJob);
				} catch (Exception e) {
					log.error("Error launching m/r job: " + jobName, e);
					throw new RuntimeException("Error launching m/r job: " + jobName, e);
				}
			}

		});
	}

	public void executeGarbageActions() throws ISRegistryException, HadoopServiceException, ParseException, ActionManagerException {
		final String basePath = isClient.getBasePathHDFS();
		final Integer garbageRetain = Integer.valueOf(isClient.getGarbageRetainThreshold());
		final Long dateLimit = DateUtils.now() - Long.valueOf(isClient.getGarbageTimeMargin());

		final HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
		final ISRegistryService isRegistry = serviceLocator.getService(ISRegistryService.class);

		for (ActionManagerSet set : isClient.listValidSets()) {
			for (RawSet expired : Iterables.limit(set.getExpired(), garbageRetain)) {
				final Date d = org.apache.commons.lang.time.DateUtils.parseDate(expired.getLastUpdate(), ActionManagerSet.DATE_PATTERNS);
				if (d.getTime() < dateLimit) {

					log.info(String.format("removing raw action set %s/%s", set.getId(), expired.getId()));
					hadoopService.deleteHdfsPath(ClusterName.DM.toString(), basePath + "/" + set.getDirectory() + "/" + expired.getId());

					final String xUpdate = String.format(UPDATE_ACTION_PROFILE, set.getId(), expired.getId());
					log.info(String.format("updating ActionSet profile: %s", xUpdate));
					isRegistry.executeXUpdate(xUpdate);
				}
			}
		}
	}

	private String findHadoopServiceProfileID() throws Exception {
		return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(
				"collection('/db/DRIVER/ServiceResources/HadoopServiceResourceType')//RESOURCE_IDENTIFIER/@value/string()");
	}

	public String getActionTable() {
		return actionTable;
	}

	@Required
	public void setActionTable(final String actionTable) {
		this.actionTable = actionTable;
	}

	public String getDataTable() {
		return dataTable;
	}

	@Required
	public void setDataTable(final String dataTable) {
		this.dataTable = dataTable;
	}

	public ISClient getInfomationServiceClient() {
		return infomationServiceClient;
	}

	@Required
	public void setInfomationServiceClient(final ISClient infomationServiceClient) {
		this.infomationServiceClient = infomationServiceClient;
	}

}
