package eu.dnetlib.actionmanager.wf;

import java.util.Properties;
import java.util.concurrent.Callable;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.RunningJob;

import com.googlecode.sarasvati.NodeToken;

import eu.dnetlib.data.mapreduce.JobParams;
import eu.dnetlib.data.mapreduce.wf.SetupMapreduceJobNode;

public abstract class AbstractHbaseActionJobNode extends SetupMapreduceJobNode {

	private static final Log log = LogFactory.getLog(AbstractHbaseActionJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM

	@Override
	protected Properties prepareJob(NodeToken token) {
		Properties p = new Properties();

		p.setProperty(JobParams.HBASE_SOURCE_TABLE, getSourceTable(token));
		p.setProperty(JobParams.HBASE_TARGET_TABLE, getTargetTable(token));

		String set = token.getFullEnv().getAttribute("set");
		if (set != null) {
			p.setProperty("set", set);
		}

		return p;
	}

	protected abstract String getSourceTable(NodeToken token);

	protected abstract String getTargetTable(NodeToken token);

	@Override
	protected void beforeCompleted(NodeToken token, RunningJob job) {
		Object callback = token.getFullEnv().getTransientAttribute("callbackCompleted");
		if (callback != null && callback instanceof Callable<?>) {
			try {
				((Callable<?>) callback).call();
			} catch (Exception e) {
				log.error("Error executing callback");
			}
		}
	}

	@Override
	protected void beforeFailed(NodeToken token, RunningJob job) {
		Object callback = token.getFullEnv().getTransientAttribute("callbackFailed");
		if (callback != null && callback instanceof Callable<?>) {
			try {
				((Callable<?>) callback).call();
			} catch (Exception e) {
				log.error("Error executing callback");
			}
		}
	}

}