package eu.dnetlib.data.hadoop;

import java.io.StringReader;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import javax.annotation.Resource;
import javax.xml.ws.Endpoint;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
import eu.dnetlib.miscutils.datetime.DateUtils;
import eu.dnetlib.rmi.data.hadoop.actionmanager.ActionManagerException;
import eu.dnetlib.rmi.data.hadoop.actionmanager.ActionManagerSet;
import eu.dnetlib.rmi.data.hadoop.actionmanager.ActionManagerSet.ImpactTypes;
import eu.dnetlib.rmi.data.hadoop.actionmanager.RawSet;
import eu.dnetlib.rmi.enabling.ISLookUpException;
import eu.dnetlib.rmi.enabling.ISLookUpService;
import eu.dnetlib.rmi.enabling.ISRegistryException;
import eu.dnetlib.rmi.enabling.ISRegistryService;
import org.antlr.stringtemplate.StringTemplate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.dom4j.Attribute;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.springframework.beans.factory.annotation.Required;

public class HadoopIsClient {

	private static final Log log = LogFactory.getLog(HadoopIsClient.class); // NOPMD by marko on 11/24/08 5:02 PM

	/**
	 * endpoint builder.
	 */
	private static final String ENDPOINT_TEMPLATE = "http://%s:%s/%s/services/actionManager";

	@Resource
	private UniqueServiceLocator serviceLocator;

	/**
	 * Template for ActionManagerSet profiles
	 */
	private StringTemplate actionManagerSetDsTemplate;

	/**
	 * service endpoint.
	 */
//	private Endpoint endpoint;
    private String hostname;
    private String port;
	private String context;

	public String registerSetProfile(final ActionManagerSet set) throws ActionManagerException {
		if (existsSet(set.getId())) { throw new ActionManagerException("Set " + set.getId() + " already registered"); }
		try {
			StringTemplate template = new StringTemplate(actionManagerSetDsTemplate.getTemplate());
			template.setAttribute("serviceUri", String.format(ENDPOINT_TEMPLATE, hostname, port, context));
			template.setAttribute("set", set);
			template.setAttribute("latest", RawSet.newInstance());
			return serviceLocator.getService(ISRegistryService.class).registerProfile(template.toString());
		} catch (ISRegistryException e) {
			throw new ActionManagerException("Error registering set " + set, e);
		}
	}

	public List<ActionManagerSet> listSets() throws ActionManagerException {
		final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') return $x";
		return getActionManagerSets(q);
	}

	public List<ActionManagerSet> listValidSets() throws ActionManagerException {
		final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') \n"
				+ "where string-length($x//RAW_SETS/LATEST/@id/string()) > 0\n"
				+ "return $x";

		return getActionManagerSets(q);
	}

	private List<ActionManagerSet> getActionManagerSets(final String q) throws ActionManagerException {
		final List<ActionManagerSet> list = Lists.newArrayList();

		try {
			final String basePath = getBasePathHDFS();
			for (String s : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(q)) {
				list.add(getActionManagerSet(basePath, s));
			}
		} catch (ISLookUpException e) {
			log.error("Error accessing Sets, using query: " + q);
		}
		return list;
	}

	public ActionManagerSet getSet(final String setId) throws ActionManagerException, ISLookUpException {
		final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') "
				+ "where $x//SET/@id = '" + setId + "' return $x";

		try {
			final String basePath = getBasePathHDFS();
			final String setProfile = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(q);
			return getActionManagerSet(basePath, setProfile);
		} catch (ISLookUpException e) {
			throw new ActionManagerException("Error accessing Sets, using query: " + q);
		}
	}

	private ActionManagerSet getActionManagerSet(final String basePath, final String profile) throws ActionManagerException {
		final SAXReader reader = new SAXReader();
		final ActionManagerSet set = new ActionManagerSet();

		try {
			final Document doc = reader.read(new StringReader(profile));

			set.setId(doc.valueOf("//SET/@id").trim());
			set.setName(doc.valueOf("//SET").trim());
			set.setImpact(ImpactTypes.valueOf(doc.valueOf("//IMPACT").trim()));
			set.setLatest(doc.valueOf("//RAW_SETS/LATEST/@id"), doc.valueOf("//RAW_SETS/LATEST/@creationDate"), doc.valueOf("//RAW_SETS/LATEST/@lastUpdate"));
			set.setDirectory(doc.valueOf("//SET/@directory"));
			final List expiredNodes = doc.selectNodes("//RAW_SETS/EXPIRED");
			if (expiredNodes != null) {
				for (int i = 0; i < expiredNodes.size(); i++) {
					Element ex = (Element) expiredNodes.get(i);
					set.addExpired(ex.attributeValue("id"), ex.attributeValue("creationDate"), ex.attributeValue("lastUpdate"));
				}
			}

			final StringBuilder sb = new StringBuilder();
			sb.append(basePath);
			sb.append("/");
			sb.append(doc.valueOf("//SET/@directory"));
			sb.append("/");
			sb.append(doc.valueOf("//RAW_SETS/LATEST/@id"));
			set.setPathToLatest(sb.toString());

			return set;
		} catch (Exception e) {
			throw new ActionManagerException("Error creating set from profile: " + profile, e);
		}
	}

	public boolean deleteSetProfile(final String setId) throws ActionManagerException {
		final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') where $x//SET/@id = '" + setId
				+ "' return $x//RESOURCE_IDENTIFIER/@value/string()";

		try {
			final String profId = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(q);
			return serviceLocator.getService(ISRegistryService.class).deleteProfile(profId);
		} catch (Exception e) {
			log.error("Error deleting set " + setId, e);
			throw new ActionManagerException("Error deleting set " + setId, e);
		}
	}

	public boolean existsSet(final String id) throws ActionManagerException {
		final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') where $x//SET/@id = '" + id
				+ "' return $x";
		try {
			return !serviceLocator.getService(ISLookUpService.class).quickSearchProfile(q).isEmpty();
		} catch (ISLookUpException e) {
			log.error("Error accessing Sets, using query: " + q, e);
			throw new ActionManagerException("Error running xquery: " + q, e);
		}
	}

	public boolean existsRawSet(final String id) throws ActionManagerException {
		final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') where $x//RAW_SETS/*/@id = '" + id
				+ "' return $x";
		try {
			return !serviceLocator.getService(ISLookUpService.class).quickSearchProfile(q).isEmpty();
		} catch (ISLookUpException e) {
			log.error("Error accessing RawSets, using query: " + q, e);
			throw new ActionManagerException("Error running xquery: " + q, e);
		}
	}

	public Set<String> listValidRawSets() throws ActionManagerException {
		final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') return $x//RAW_SETS/LATEST/@id/string()";
		try {
			final List<String> list = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(q);
			return Sets.newHashSet(list);
		} catch (Exception e) {
			log.error("Error running xquery: " + q, e);
			throw new ActionManagerException("Error running xquery: " + q, e);
		}
	}

	public RawSet geLatestRawSet(final String set) throws ActionManagerException {
		final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') where $x//SET/@id = '" + set
				+ "' return concat(' ' , $x//RAW_SETS/LATEST/@id, ' @@@ ', $x//RAW_SETS/LATEST/@creationDate, ' @@@ ', $x//RAW_SETS/LATEST/@lastUpdate, ' ')";
		try {
			String[] arr = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(q).split("@@@");
			return RawSet.newInstance(arr[0].trim(), arr[1].trim(), arr[2].trim());
		} catch (ISLookUpException e) {
			log.error("Error accessing Sets, using query: " + q, e);
			throw new ActionManagerException("Error running xquery: " + q, e);
		}
	}

	public String getGarbageTimeMargin() throws ActionManagerException {
		return queryServiceProperty("garbageTimeMargin");
	}

	public String getBasePathHDFS() throws ActionManagerException {
		return queryServiceProperty("basePath");
	}

	public String getGarbageRetainThreshold() throws ActionManagerException {
		return queryServiceProperty("garbageRetainThreshold");
	}

	private String queryServiceProperty(final String propertyName) throws ActionManagerException {
		final String q = "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ActionManagerServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='"
				+ propertyName + "']/@value/string()";
		log.debug("quering for service property: " + q);
		try {
			final List<String> value = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(q);
			return Iterables.getOnlyElement(value);
		} catch (ISLookUpException e) {
			String msg = "Error accessing service profile, using query: " + q;
			log.error(msg, e);
			throw new ActionManagerException(msg, e);
		} catch (NoSuchElementException e) {
			String msg = "missing service property: " + propertyName;
			log.error(msg, e);
			throw new ActionManagerException(msg, e);
		} catch (IllegalArgumentException e) {
			String msg = "found more than one service property: " + propertyName;
			log.error(msg, e);
			throw new ActionManagerException(msg, e);
		}
	}

	public void addLatestRawSet(final String set, final RawSet rawSet) throws ActionManagerException {
		final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') where $x//SET/@id = '" + set
				+ "' return $x";
		try {
			final String profile = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(q);
			final Document doc = new SAXReader().read(new StringReader(profile));
			final String profId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
			final Element latest = (Element) doc.selectSingleNode("//RAW_SETS/LATEST");
			final Element expired = ((Element) doc.selectSingleNode("//RAW_SETS")).addElement("EXPIRED");

			for (Object o : latest.attributes()) {
				final Attribute a = (Attribute) o;
				expired.addAttribute(a.getName(), a.getValue());
			}

			latest.addAttribute("id", rawSet.getId());
			latest.addAttribute("creationDate", rawSet.getCreationDate());
			latest.addAttribute("lastUpdate", rawSet.getLastUpdate());

			serviceLocator.getService(ISRegistryService.class).updateProfile(profId, doc.asXML(), "ActionManagerSetDSResourceType");
		} catch (Exception e) {
			log.error("Error updating profile of set: " + set);
			throw new ActionManagerException("Error running xquery: " + q, e);
		}
	}

	public void updateLastUpdate(final String rawSet) throws ActionManagerException {
		final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType')//RAW_SETS/*[@id='" + rawSet
				+ "']/@lastUpdate return update replace $x with '" + DateUtils.now_ISO8601() + "'";

		try {
			serviceLocator.getService(ISRegistryService.class).executeXUpdate(q);
		} catch (Exception e) {
			log.error("Error updating lastUpdate using query: " + q, e);
			throw new ActionManagerException("Error updating lastUpdate using query: " + q, e);
		}
	}

	public String getJobProfile(final String jobName) throws ISLookUpException {
		return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(
				"/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='" + jobName + "']");
	}

	public String queryForServiceProperty(final String key) throws ISLookUpException {
		return getServiceConfigValue("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='HadoopServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='"
				+ key + "']/@value/string()");
	}

	public void updateCountElement(final String jobName, final String element, final String delta) {
		final String xquery =
				"let $x := //RESOURCE_PROFILE[" + ".//RESOURCE_TYPE/@value='HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='" + jobName
						+ "'], $tot := $x//STATUS/" + element + "/@value/number() " + delta + " return update replace $x//STATUS/" + element + " with <"
						+ element + " value='{$tot}' />";

		executeXUpdate(xquery);
	}

	public void updateDate(final String jobName) {
		log.info("increment last submission date for job: " + jobName);
		executeXUpdate("for $x in collection('')/RESOURCE_PROFILE["
				+ ".//RESOURCE_TYPE/@value='HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='"
				+ jobName + "'] " + " return update value $x//LAST_SUBMISSION_DATE/@value with '" + DateUtils.now_ISO8601() + "' ");
	}

	private String getServiceConfigValue(final String xquery) throws ISLookUpException {
		log.debug("quering for service property: " + xquery);
		final List<String> urls = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery);
		if ((urls == null) || (urls.size() != 1))
			throw new IllegalStateException("unable to find unique service property, xquery: " + xquery);
		return Iterables.getOnlyElement(urls);
	}

	public boolean executeXUpdate(final String xupdate) {
		try {
			log.debug("running xupdate: " + xupdate);
			return serviceLocator.getService(ISRegistryService.class).executeXUpdate(xupdate);
		} catch (final ISRegistryException e) {
			log.error("unable to run xupdate: " + xupdate, e);
			return false;
		}
	}

	public String getActionSetProfileId(final String set) throws ISLookUpException {
		final String xquery = "substring-before(for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') "
				+ "return $x[.//SET/@id = '%s']//RESOURCE_IDENTIFIER/@value/string(), '_')";
		return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(String.format(xquery, set));
	}

	public StringTemplate getActionManagerSetDsTemplate() {
		return actionManagerSetDsTemplate;
	}

	@Required
	public void setActionManagerSetDsTemplate(final StringTemplate actionManagerSetDsTemplate) {
		this.actionManagerSetDsTemplate = actionManagerSetDsTemplate;
	}

//	public Endpoint getEndpoint() {
//		return endpoint;
//	}
//
//	@Required
//	public void setEndpoint(final Endpoint endpoint) {
//		this.endpoint = endpoint;
//	}

	public String getHostname() {
		return hostname;
	}

	@Required
	public void setHostname(final String hostname) {
		this.hostname = hostname;
	}

	public String getPort() {
		return port;
	}

	@Required
	public void setPort(final String port) {
		this.port = port;
	}

	public String getContext() {
		return context;
	}

	@Required
	public void setContext(final String context) {
		this.context = context;
	}
}
