package eu.dnetlib.data.dedup;

import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import javax.annotation.Resource;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.GeneratedMessage;
import eu.dnetlib.data.mapreduce.util.OafDecoder;
import eu.dnetlib.data.mapreduce.util.OafEntityDecoder;
import eu.dnetlib.data.proto.OafProtos.Oaf;
import eu.dnetlib.data.transform.AbstractProtoMapper;
import eu.dnetlib.data.transform.OafEntityMerger;
import eu.dnetlib.data.transform.SolrProtoMapper;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
import eu.dnetlib.functionality.index.client.IndexClient;
import eu.dnetlib.functionality.index.client.IndexClientException;
import eu.dnetlib.functionality.index.client.response.LookupResponse;
import eu.dnetlib.functionality.index.client.solr.SolrIndexClient;
import eu.dnetlib.functionality.index.client.solr.SolrIndexClientFactory;
import eu.dnetlib.functionality.modular.ui.dedup.SimilarityGroup;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.FieldDef;
import eu.dnetlib.pace.model.FieldValueImpl;
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.dom4j.DocumentException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

public class DedupIndexDAO {

	private static final Log log = LogFactory.getLog(DedupIndexDAO.class);

	private static final String ID_PREFIX_REGEX = "^\\d\\d\\|";

	private static final Map<String, Map<String, String>> paths = Maps.newHashMap();

	static {
		paths.put("result", new HashMap<>());
		paths.put("organization", new HashMap<>());
		paths.put("person", new HashMap<>());

		paths.get("result").put("provenance", "collectedfrom/value");
		paths.get("organization").put("provenance", "collectedfrom/value");
		paths.get("person").put("provenance", "collectedfrom/value");

		paths.get("result").put("title", "result/metadata/title/value");
		paths.get("result").put("dateofacceptance", "result/metadata/dateofacceptance/value");
		paths.get("result").put("description", "result/metadata/description/value");
		paths.get("result").put("author", "result/metadata/author/fullname");

		paths.get("organization").put("legalname", "organization/metadata/legalname/value");
		paths.get("organization").put("legalshortname", "organization/metadata/legalshortname/value");
		paths.get("organization").put("websiteurl", "organization/metadata/websiteurl/value");
		paths.get("organization").put("country", "organization/metadata/country/classid");

		paths.get("person").put("fullname", "person/metadata/fullname/value");
	}

	@Resource
	private UniqueServiceLocator serviceLocator;

	/**
	 * The index client factory.
	 */
	@Autowired
	private SolrIndexClientFactory indexClientFactory;

	private IndexClient indexClient = null;

	@Value("${dnet.dedup.index.collection}")
	private String dedupIndexCollection;

	public OafResult search(final String type, final String userQuery, final String actionSet, final int start, final int rows, final String fields)
			throws Exception {

		final String cqlQuery =
				String.format("(>s=SOLR s.q.op=AND) and oaftype = %s and actionset exact \"%s\" and deletedbyinference = false and %s", type, actionSet,
						userQuery);

		final LookupResponse rsp = getIndexClient().lookup(cqlQuery, null, start, (start + rows) - 1);

		final List<String> fieldList = Lists.newLinkedList(Splitter.on(",").omitEmptyStrings().trimResults().split(fields));
		final List<Map<String, String>> resList = Lists.newLinkedList(Iterables.transform(toOaf(rsp), getOaf2FieldMapFunction(type, fieldList)));

		return new OafResult(rsp.getTotal(), resList);

	}

	public OafResult searchById(final String actionSet, final String type, final String objidentifier, final List<String> fields) throws Exception {
		final String cqlQuery = "objidentifier exact \"" + objidentifier + "\" and actionset exact \"" + actionSet + "\"";

		final LookupResponse rsp = getIndexClient().lookup(cqlQuery, null, 0, 1);

		final Iterable<Oaf> oafList = toOaf(rsp);

		final List<Map<String, String>> resList = Lists.newLinkedList(Iterables.transform(oafList, getOaf2FieldMapFunction(type, fields)));

		return new OafResult(rsp.getTotal(), resList);
	}

	public boolean commit(final SimilarityGroup group) throws Exception {

		int commitStatus = 0;
		int addStatus = 0;

		log.info("starting index update");

		final SolrIndexClient indexClient = (SolrIndexClient) getIndexClient();
		final SolrProtoMapper mapper = initProtoMapper();

		final Function<Oaf, SolrInputDocument> oaf2solr = oaf2solr(group, mapper);
		final List<SolrInputDocument> buffer = Lists.newLinkedList();

		// mark as deleted all the documents in the group
		final List<Oaf> groupDocs = markDeleted(asOafBuilder(parseBase64(queryIndex(group.getGroup(), group.getActionSet()))));
		buffer.addAll(asIndexDocs(oaf2solr, groupDocs));

		// elect a new representative
		final SolrInputDocument newRoot = oaf2solr.apply(OafEntityMerger.merge(getDedupConf(group), newRootId(group), groupDocs).build());
		final String newRootId = (String) newRoot.getFieldValue("objidentifier");
		// newRoot.setField("actionset", dedupConf.getWf().getConfigurationId());
		buffer.add(newRoot);

		// mark as non deleted the documents taken away from the group
		final List<Oaf> dissimDocs = markUnDeleted(asOafBuilder(parseBase64(queryIndex(unique(group.getDissimilar()), group.getActionSet()))));
		buffer.addAll(asIndexDocs(oaf2solr, dissimDocs));

		log.debug(String.format("adding %d documents to index %s", buffer.size(), dedupIndexCollection));

		// add the changes to the server
		addStatus = indexClient.feed(buffer);
		log.debug("solr add status: " + addStatus);

		// delete the old representatives, avoiding to remove the current one (if it didn't change)
		log.debug(String.format("deleting %d documents from index %s", group.getRootIds().size(), dedupIndexCollection));
		for (final String rootId : Iterables.filter(group.getRootIds(), rootId -> !rootId.equals(newRootId))) {
			indexClient.remove(mapper.getRecordId(rootId, group.getActionSet()));
		}

		commitStatus = indexClient.commit().getStatus();

		log.debug("solr commit status: " + commitStatus);

		return (addStatus == 0) && (commitStatus == 0);
	}

	private Iterable<Oaf> toOaf(final LookupResponse rsp) {
		return Iterables.transform(rsp.getRecords(), getXml2OafFunction());
	}

	private Iterable<Oaf> parseBase64(final Iterable<String> r) {
		return Iterables.transform(r, getXml2OafFunction());
	}

	protected Function<String, Oaf> getXml2OafFunction() {
		return s -> {
			// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", "");
			final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<");
			try {
				final byte[] oafBytes = Base64.decodeBase64(base64);
				final Oaf oaf = OafDecoder.decode(oafBytes).getOaf();
				return oaf;
			} catch (final Throwable e) {
				throw new IllegalArgumentException("unable to decode base64 encoded Oaf object: " + base64);
			}
		};
	}

	private SolrProtoMapper initProtoMapper() throws DocumentException, ISLookUpException, ISLookUpDocumentNotFoundException {
		return new SolrProtoMapper(
				serviceLocator
						.getService(ISLookUpService.class)
						.getResourceProfileByQuery(
								"collection('')//RESOURCE_PROFILE["
										+ ".//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and "
										+ ".//NAME='OPENAIRE']//LAYOUT[@name='index']/FIELDS"));
	}

	private String getResourceProfileByQuery(final String xquery) throws ISLookUpException {
		log.debug("quering for service property: " + xquery);
		final String res = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
		if (StringUtils.isBlank(res)) throw new IllegalStateException("unable to find unique service property, xquery: " + xquery);
		return res;
	}

	protected Function<Oaf, Map<String, String>> getOaf2FieldMapFunction(final String type, final List<String> fields) {
		return oaf -> {

			final Map<String, String> res = Maps.newHashMap();
			final String oafId = cleanId(oaf.getEntity().getId());

			final List<String> idList = oaf.getEntity().getChildrenList().stream()
					.map(e -> e.getId())
					.map(s -> cleanId(s))
					.collect(Collectors.toList());
			if (idList.isEmpty()) {
				idList.add(oafId);
			}
			res.put("id", oafId);
			res.put("idList", Joiner.on(",").join(idList));
			res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + "");

			for (final String fieldName : fields) {

				res.put(fieldName, Joiner.on("; ").skipNulls().join(getFieldValues((GeneratedMessage) oaf.getEntity(), fieldName, paths.get(type).get(fieldName))));
			}

			return res;
		};
	}

	private List<String> getFieldValues(final GeneratedMessage m, final String fieldName, final String path) {
		return new SolrDocumentMapper().processPath(m, fieldName, path).stream()
				.map(o -> o.toString())
				.collect(Collectors.toCollection(LinkedList::new));
	}

	class SolrDocumentMapper extends AbstractProtoMapper {

		public List<Object> processPath(final GeneratedMessage m, final String fieldName, final String path) {
			final FieldDef fd = new FieldDef();
			fd.setName(fieldName);
			return processPath(m, fd, path);
		}
	}

	private String cleanId(final String id) {
		return id.replaceFirst(ID_PREFIX_REGEX, "");
	}

	private IndexClient getIndexClient() throws IndexClientException, ISLookUpDocumentNotFoundException, ISLookUpException {
		if (indexClient == null) {
			indexClient = indexClientFactory.getClient(dedupIndexCollection);
		}
		return indexClient;
	}

	private Iterable<String> queryIndex(final Iterable<String> ids, final String actionset) {
		return Iterables.transform(ids, id -> {
			try {
				final String cql = "objidentifier exact \"" + id + "\" and actionset exact \"" + actionset + "\"";
				final LookupResponse rsp = getIndexClient().lookup(cql, null, 0, 1);

				log.debug(String.format("query index for id '%s', found '%d'", id, rsp.getTotal()));

				return Iterables.getOnlyElement(rsp.getRecords());
			} catch (final Throwable e) {
				log.error(e);
				throw new RuntimeException("unable to query id: " + id, e);
			}
		});
	}

	private List<Oaf> markDeleted(final Iterable<Oaf.Builder> builders) {
		return Lists.newArrayList(Iterables.transform(builders, builder -> {
			// TODO add more changes to the Oaf object here as needed.
			builder.getDataInfoBuilder().setDeletedbyinference(true);
			return builder.build();
		}));
	}

	private List<Oaf> markUnDeleted(final Iterable<Oaf.Builder> builders) {
		return Lists.newArrayList(Iterables.transform(builders, builder -> {
			// TODO add more changes to the Oaf object here as needed.
			builder.getDataInfoBuilder().setDeletedbyinference(false);
			return builder.build();
		}));
	}

	private Iterable<Oaf.Builder> asOafBuilder(final Iterable<Oaf> oaf) {
		return Iterables.transform(oaf, oaf1 -> Oaf.newBuilder(oaf1));
	}

	private String newRootId(final SimilarityGroup group) {
		return "dedup_wf_001::" + Collections.min(group.getGroup()).replaceFirst("^.*::", "");
		// else return Collections.min(group.getRootIds());
	}

	private List<SolrInputDocument> asIndexDocs(final Function<Oaf, SolrInputDocument> mapper, final Iterable<Oaf> protos) {
		return Lists.newArrayList(Iterables.transform(protos, mapper));
	}

	private Function<Oaf, SolrInputDocument> oaf2solr(final SimilarityGroup group, final SolrProtoMapper mapper) {
		return oaf -> {
			try {
				return mapper.map(oaf, group.getDate(), "", group.getActionSet());
			} catch (final Throwable e) {
				throw new IllegalArgumentException("unable to map proto to index document", e);
			}
		};
	}

	private Set<String> unique(final Map<String, Set<String>> map) {
		final Set<String> res = Sets.newHashSet();
		for (final Set<String> ids : map.values()) {
			res.addAll(ids);
		}

		return res;
	}

	private DedupConfig getDedupConf(final SimilarityGroup group) throws IOException {
		final Map<String, String> config = Maps.newHashMap();
		config.put("entityType", group.getEntityType().getType());
		config.put("configurationId", group.getActionSet());
		final DedupConfig dedupConf = DedupConfig.loadDefault(config);
		return dedupConf;
	}

}
