package eu.dnetlib.data.dedup;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import eu.dnetlib.data.mapreduce.util.OafDecoder;
import eu.dnetlib.data.mapreduce.util.OafEntityDecoder;
import eu.dnetlib.data.proto.OafProtos.Oaf;
import eu.dnetlib.data.proto.OafProtos.Oaf.Builder;
import eu.dnetlib.data.proto.OafProtos.OafEntity;
import eu.dnetlib.data.transform.OafEntityMerger;
import eu.dnetlib.data.transform.SolrProtoMapper;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
import eu.dnetlib.functionality.index.client.IndexClient;
import eu.dnetlib.functionality.index.client.IndexClientException;
import eu.dnetlib.functionality.index.client.ResolvingIndexClientFactory;
import eu.dnetlib.functionality.index.client.response.LookupResponse;
import eu.dnetlib.functionality.modular.ui.dedup.SimilarityGroup;
import eu.dnetlib.pace.config.DedupConfig;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.common.SolrInputDocument;
import org.dom4j.DocumentException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;

public class DedupIndexDAO {

	private static final Log log = LogFactory.getLog(DedupIndexDAO.class);

	private static final String ID_PREFIX_REGEX = "^\\d\\d\\|";

	private static final Map<String, Map<String, String>> paths = Maps.newHashMap();

	static {
		paths.put("result", new HashMap<String, String>());
		paths.put("organization", new HashMap<String, String>());
		paths.put("person", new HashMap<String, String>());

		paths.get("result").put("provenance", "collectedfrom/value");
		paths.get("organization").put("provenance", "collectedfrom/value");
		paths.get("person").put("provenance", "collectedfrom/value");

		paths.get("result").put("title", "result/metadata/title/value");
		paths.get("result").put("dateofacceptance", "result/metadata/dateofacceptance/value");
		paths.get("result").put("description", "result/metadata/description/value");
		paths.get("result").put("author", "result/author/metadata/fullname/value");

		paths.get("organization").put("legalname", "organization/metadata/legalname/value");
		paths.get("organization").put("legalshortname", "organization/metadata/legalshortname/value");
		paths.get("organization").put("websiteurl", "organization/metadata/websiteurl/value");
		paths.get("organization").put("country", "organization/metadata/country/classid");

		paths.get("person").put("fullname", "person/metadata/fullname/value");
	}

	@Resource
	private UniqueServiceLocator serviceLocator;

	/**
	 * The index client factory.
	 */
	@Autowired
	private ResolvingIndexClientFactory indexClientFactory;

	private IndexClient indexClient = null;

	@Value("${dnet.dedup.index.format}")
	private String indexFormat;

	@Value("${dnet.dedup.index.collection}")
	private String dedupIndexCollection;

	public OafResult search(final String type, final String userQuery, final String actionSet, final int start, final int rows, final String fields)
			throws Exception {

		final String cqlQuery =
				String.format("(>s=SOLR s.q.op=AND) and oaftype = %s and actionset exact \"%s\" and deletedbyinference = false and %s", type, actionSet,
						userQuery);

		final LookupResponse rsp = getIndexClient().lookup(cqlQuery, null, start, (start + rows) - 1);

		final List<String> fieldList = Lists.newLinkedList(Splitter.on(",").omitEmptyStrings().trimResults().split(fields));
		final List<Map<String, String>> resList = Lists.newLinkedList(Iterables.transform(toOaf(rsp), getOaf2FieldMapFunction(type, fieldList)));

		return new OafResult(rsp.getTotal(), resList);

	}

	public OafResult searchById(final String actionSet, final String type, final String objidentifier, final List<String> fields) throws Exception {
		final String cqlQuery = "objidentifier exact \"" + objidentifier + "\" and actionset exact \"" + actionSet + "\"";

		final LookupResponse rsp = getIndexClient().lookup(cqlQuery, null, 0, 1);

		final Iterable<Oaf> oafList = toOaf(rsp);

		final List<Map<String, String>> resList = Lists.newLinkedList(Iterables.transform(oafList, getOaf2FieldMapFunction(type, fields)));

		return new OafResult(rsp.getTotal(), resList);
	}

	public boolean commit(final SimilarityGroup group) throws Exception {

		int commitStatus = 0;
		int addStatus = 0;

		final CloudSolrServer solrServer = getSolrServer();

		log.info("starting index update");

		try {
			final SolrProtoMapper mapper = initProtoMapper();

			final Function<Oaf, SolrInputDocument> oaf2solr = oaf2solr(group, mapper);
			final List<SolrInputDocument> buffer = Lists.newLinkedList();

			// mark as deleted all the documents in the group
			final List<Oaf> groupDocs = markDeleted(asOafBuilder(parseBase64(queryIndex(group.getGroup(), group.getActionSet()))));
			buffer.addAll(asIndexDocs(oaf2solr, groupDocs));

			// elect a new representative
			final SolrInputDocument newRoot = oaf2solr.apply(OafEntityMerger.merge(getDedupConf(group), newRootId(group), groupDocs).build());
			final String newRootId = (String) newRoot.getFieldValue("objidentifier");
			// newRoot.setField("actionset", dedupConf.getWf().getConfigurationId());
			buffer.add(newRoot);

			// mark as non deleted the documents taken away from the group
			final List<Oaf> dissimDocs = markUnDeleted(asOafBuilder(parseBase64(queryIndex(unique(group.getDissimilar()), group.getActionSet()))));
			buffer.addAll(asIndexDocs(oaf2solr, dissimDocs));

			log.debug(String.format("adding %d documents to index %s", buffer.size(), dedupIndexCollection));

			// add the changes to the server
			addStatus = solrServer.add(buffer).getStatus();
			log.debug("solr add status: " + addStatus);

			// delete the old representatives, avoiding to remove the current one (if it didn't change)
			log.debug(String.format("deleting %d documents from index %s", group.getRootIds().size(), dedupIndexCollection));
			for (final String rootId : Iterables.filter(group.getRootIds(), new Predicate<String>() {

				@Override
				public boolean apply(final String rootId) {
					return !rootId.equals(newRootId);
				}
			})) {
				solrServer.deleteById(mapper.getRecordId(rootId, group.getActionSet()));
			}

			commitStatus = solrServer.commit().getStatus();

			log.debug("solr commit status: " + commitStatus);
		} finally {
			log.debug("closing solr zk client");
			solrServer.getZkStateReader().close();
		}

		return (addStatus == 0) && (commitStatus == 0);
	}

	private Iterable<Oaf> toOaf(final LookupResponse rsp) {
		return Iterables.transform(rsp.getRecords(), getXml2OafFunction());
	}

	private Iterable<Oaf> parseBase64(final Iterable<String> r) {
		return Iterables.transform(r, getXml2OafFunction());
	}

	private Function<String, Oaf> getXml2OafFunction() {
		return new Function<String, Oaf>() {

			@Override
			public Oaf apply(final String s) {
				// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", "");
				final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<");
				try {
					final byte[] oafBytes = Base64.decodeBase64(base64);
					final Oaf oaf = OafDecoder.decode(oafBytes).getOaf();
					return oaf;
				} catch (final Throwable e) {
					throw new IllegalArgumentException("unable to decode base64 encoded Oaf object: " + base64);
				}
			}
		};
	}

	private SolrProtoMapper initProtoMapper() throws DocumentException, ISLookUpException, ISLookUpDocumentNotFoundException {
		return new SolrProtoMapper(
				serviceLocator
						.getService(ISLookUpService.class)
						.getResourceProfileByQuery(
								"collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='" + indexFormat
										+ "']//LAYOUT[@name='index']/FIELDS"));
	}

	private CloudSolrServer getSolrServer() {
		final String zk = getIndexSolrUrlZk();
		log.info(String.format("initializing solr client for collection %s, zk url: %s", dedupIndexCollection, zk));
		final CloudSolrServer solrServer = new CloudSolrServer(zk);
		solrServer.setDefaultCollection(dedupIndexCollection);

		return solrServer;
	}

	private String getIndexSolrUrlZk() {
		try {
			return getResourceProfileByQuery(
					"for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
		} catch (final ISLookUpException e) {
			throw new IllegalStateException("unable to read solr ZK url from service profile", e);
		}
	}

	private String getResourceProfileByQuery(final String xquery) throws ISLookUpException {
		log.debug("quering for service property: " + xquery);
		final String res = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
		if (StringUtils.isBlank(res)) throw new IllegalStateException("unable to find unique service property, xquery: " + xquery);
		return res;
	}

	private Function<Oaf, Map<String, String>> getOaf2FieldMapFunction(final String type, final List<String> fields) {
		return new Function<Oaf, Map<String, String>>() {

			@Override
			public Map<String, String> apply(final Oaf oaf) {

				final OafEntityDecoder ed = OafDecoder.decode(oaf).decodeEntity();
				final Map<String, String> res = Maps.newHashMap();
				final String oafId = cleanId(oaf.getEntity().getId());
				final List<String> idList = Lists.newArrayList(Iterables.transform(oaf.getEntity().getChildrenList(), new Function<OafEntity, String>() {

					@Override
					public String apply(final OafEntity e) {
						return cleanId(e.getId());
					}
				}));
				if (idList.isEmpty()) {
					idList.add(oafId);
				}
				res.put("id", oafId);
				res.put("idList", Joiner.on(",").join(idList));
				res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + "");

				for (final String fieldName : fields) {
					res.put(fieldName, Joiner.on("; ").skipNulls().join(ed.getFieldValues(paths.get(type).get(fieldName))));
				}

				return res;
			}
		};
	}

	private String cleanId(final String id) {
		return id.replaceFirst(ID_PREFIX_REGEX, "");
	}

	private IndexClient getIndexClient() throws IndexClientException, ISLookUpDocumentNotFoundException, ISLookUpException {
		if (indexClient == null) {
			indexClient = indexClientFactory.getClient(indexFormat, "index", "dedup", "solr");
		}
		return indexClient;
	}

	private Iterable<String> queryIndex(final Iterable<String> ids, final String actionset) {
		return Iterables.transform(ids, new Function<String, String>() {

			@Override
			public String apply(final String id) {
				try {
					final String cql = "objidentifier exact \"" + id + "\" and actionset exact \"" + actionset + "\"";
					final LookupResponse rsp = getIndexClient().lookup(cql, null, 0, 1);

					log.debug(String.format("query index for id '%s', found '%d'", id, rsp.getTotal()));

					return Iterables.getOnlyElement(rsp.getRecords());
				} catch (final Throwable e) {
					log.error(e);
					throw new RuntimeException("unable to query id: " + id, e);
				}
			}
		});
	}

	private List<Oaf> markDeleted(final Iterable<Oaf.Builder> builders) {
		return Lists.newArrayList(Iterables.transform(builders, new Function<Oaf.Builder, Oaf>() {

			@Override
			public Oaf apply(final Oaf.Builder builder) {
				// TODO add more changes to the Oaf object here as needed.
				builder.getDataInfoBuilder().setDeletedbyinference(true);
				return builder.build();
			}
		}));
	}

	private List<Oaf> markUnDeleted(final Iterable<Oaf.Builder> builders) {
		return Lists.newArrayList(Iterables.transform(builders, new Function<Oaf.Builder, Oaf>() {

			@Override
			public Oaf apply(final Oaf.Builder builder) {
				// TODO add more changes to the Oaf object here as needed.
				builder.getDataInfoBuilder().setDeletedbyinference(false);
				return builder.build();
			}
		}));
	}

	private Iterable<Oaf.Builder> asOafBuilder(final Iterable<Oaf> oaf) {
		return Iterables.transform(oaf, new Function<Oaf, Oaf.Builder>() {

			@Override
			public Builder apply(final Oaf oaf) {
				return Oaf.newBuilder(oaf);
			}
		});
	}

	private String newRootId(final SimilarityGroup group) {
		return "dedup_wf_001::" + Collections.min(group.getGroup()).replaceFirst("^.*::", "");
		// else return Collections.min(group.getRootIds());
	}

	private List<SolrInputDocument> asIndexDocs(final Function<Oaf, SolrInputDocument> mapper, final Iterable<Oaf> protos) {
		return Lists.newArrayList(Iterables.transform(protos, mapper));
	}

	private Function<Oaf, SolrInputDocument> oaf2solr(final SimilarityGroup group, final SolrProtoMapper mapper) {
		return new Function<Oaf, SolrInputDocument>() {

			@Override
			public SolrInputDocument apply(final Oaf oaf) {
				try {
					return mapper.map(oaf, group.getDate(), "", group.getActionSet());
				} catch (final Throwable e) {
					throw new IllegalArgumentException("unable to map proto to index document", e);
				}
			}
		};
	}

	private Set<String> unique(final Map<String, Set<String>> map) {
		final Set<String> res = Sets.newHashSet();
		for (final Set<String> ids : map.values()) {
			res.addAll(ids);
		}

		return res;
	}

	private DedupConfig getDedupConf(final SimilarityGroup group) throws IOException {
		final Map<String, String> config = Maps.newHashMap();
		config.put("entityType", group.getEntityType().getType());
		config.put("configurationId", group.getActionSet());
		final DedupConfig dedupConf = DedupConfig.loadDefault(config);
		return dedupConf;
	}

}
