package eu.dnetlib.actionmanager.hack;

import java.io.IOException;
import java.io.StringReader;

import javax.xml.ws.wsaddressing.W3CEndpointReference;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.dom4j.Document;
import org.dom4j.io.SAXReader;
import org.springframework.beans.factory.annotation.Required;

import com.googlecode.sarasvati.Arc;
import com.googlecode.sarasvati.Engine;
import com.googlecode.sarasvati.NodeToken;

import eu.dnetlib.actionmanager.common.Agent;
import eu.dnetlib.actionmanager.common.Agent.AGENT_TYPE;
import eu.dnetlib.actionmanager.common.Operation;
import eu.dnetlib.actionmanager.common.Provenance;
import eu.dnetlib.actionmanager.hbase.HBaseActionManagerCore;
import eu.dnetlib.enabling.resultset.WorkflowCountingResultSetFactory;
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
import eu.dnetlib.workflow.AsyncJobNode;

public class StoreInfoPackageJobNode extends AsyncJobNode {

	private static final Log log = LogFactory.getLog(StoreInfoPackageJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM

	private ResultSetClientFactory resultSetClientFactory;

	private HBaseActionManagerCore core;

	private String splitRuleId = "claimedMigration";

	private String defaultTrust;
	
	private WorkflowCountingResultSetFactory countingRSFactory;

	@Override
	protected void executeAsync(Engine engine, NodeToken token) {
		final SAXReader reader = new SAXReader();
		try {
			final HTable table = getTable();
			try {
				W3CEndpointReference epr = (W3CEndpointReference) token.getEnv().getTransientAttribute("epr");
				for (String record : resultSetClientFactory.getClient(getCountingRSFactory().createCountingResultSet(epr, token))) {
					//System.out.println("\n\nRECORD:\n\n" + record);
	
					Document doc = reader.read(new StringReader(record));
					String id = doc.valueOf("//*[local-name()='objIdentifier']");
					String user = doc.valueOf("//*[local-name()='user']");
					String nsprefix = doc.valueOf("//*[local-name()='datasourceprefix']");
	
					RequiredSet set = null;
					Provenance provenance = null;
					Operation operation = Operation.INSERT;
					Agent agent = null;
					if (user != null && !user.trim().isEmpty()) {
						agent = new Agent(user.trim(), user.trim(), AGENT_TYPE.human);
						if (id.startsWith("crossref____::")) {
							set = RequiredSet.userclaim_search_crossref;
							provenance = Provenance.user_claim_doi;
						} else if (id.startsWith("driver______::")) {
							set = RequiredSet.userclaim_search_driver;
							provenance = Provenance.user_claim_driver;
						} else {
							log.warn("I can't assign set and provenance for id " + id);
							log.warn(record);
						}
					} else {
						log.warn("********* Missing user for record: " + id);
					}
	
					core.applyInfoPackageAction(splitRuleId, set.toString(), agent, operation, record, provenance, defaultTrust, nsprefix, table);
				}
				engine.complete(token, Arc.DEFAULT_ARC);
			} catch (Exception e) {
				log.error("Error storing EPR");
				failed(engine, token, e);
			} finally {
				table.flushCommits();
				table.close();
			}
		} catch(Throwable e) {
			log.error("Error storing EPR");
			failed(engine, token, e);
		}
	}

	private HTable getTable() throws IOException {
		return new HTable(core.getHbaseClient().getConfig(), core.getHbaseClient().getTableName());
	}

	public ResultSetClientFactory getResultSetClientFactory() {
		return resultSetClientFactory;
	}

	@Required
	public void setResultSetClientFactory(ResultSetClientFactory resultSetClientFactory) {
		this.resultSetClientFactory = resultSetClientFactory;
	}

	public HBaseActionManagerCore getCore() {
		return core;
	}

	@Required
	public void setCore(HBaseActionManagerCore core) {
		this.core = core;
	}

	public String getSplitRuleId() {
		return splitRuleId;
	}

	@Required
	public void setSplitRuleId(String splitRuleId) {
		this.splitRuleId = splitRuleId;
	}

	public String getDefaultTrust() {
		return defaultTrust;
	}

	@Required
	public void setDefaultTrust(String defaultTrust) {
		this.defaultTrust = defaultTrust;
	}

	public WorkflowCountingResultSetFactory getCountingRSFactory() {
		return countingRSFactory;
	}

	@Required
	public void setCountingRSFactory(WorkflowCountingResultSetFactory countingRSFactory) {
		this.countingRSFactory = countingRSFactory;
	}

}
