package eu.dnetlib.data.mapreduce.util;

import java.nio.ByteBuffer;

import eu.dnetlib.data.mapreduce.Algorithms;
import eu.dnetlib.data.mapreduce.JobParams;
import eu.dnetlib.data.proto.DedupProtos.Dedup.RelName;
import eu.dnetlib.data.proto.KindProtos.Kind;
import eu.dnetlib.data.proto.OafProtos.Oaf;
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;

import eu.dnetlib.data.proto.DedupProtos.Dedup;
import eu.dnetlib.data.proto.DedupSimilarityProtos.DedupSimilarity;
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
import eu.dnetlib.data.proto.OafProtos.OafRel;
import eu.dnetlib.data.proto.OafProtos.OafRel.Builder;
import eu.dnetlib.data.proto.OrganizationOrganizationProtos.OrganizationOrganization;
import eu.dnetlib.data.proto.PersonPersonProtos.PersonPerson;
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata;
import eu.dnetlib.data.proto.RelTypeProtos.RelType;
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult;
import eu.dnetlib.data.proto.TypeProtos.Type;
import eu.dnetlib.pace.config.DedupConfig;

public class DedupUtils {

	public static final String CF_SEPARATOR = "_";

	public static final String ROOT = "dedup_wf";

	public static final String BODY_S = "body";

	public static final byte[] BODY_B = Bytes.toBytes(BODY_S);

	public static String dedupPrefix(final String dedupRun) {
		return "|" + ROOT + "_" + dedupRun + "::";
	}

	public static String newId(final String id, final String dedupRun) {
		if ((dedupRun == null) || (dedupRun.length() != 3)) throw new IllegalArgumentException("wrong dedupRun param");

		return id.replaceFirst("\\|.*\\:\\:", dedupPrefix(dedupRun));
	}

	public static byte[] newIdBytes(final String s, final String dedupRun) {
		return newId(s, dedupRun).getBytes();
	}

	public static byte[] newIdBytes(final ByteBuffer b, final String dedupRun) {
		return newId(new String(b.array()), dedupRun).getBytes();
	}

	public static boolean isRoot(final String s) {
		return s.contains(ROOT);
	}

	public static boolean isRoot(final ImmutableBytesWritable s) {
		return isRoot(s.copyBytes());
	}

	public static boolean isRoot(final byte[] s) {
		return isRoot(new String(s));
	}

	public static String getDedupCF_merges(final Type type) {
		return getRelType(type) + CF_SEPARATOR + SubRelType.dedup + CF_SEPARATOR + Dedup.RelName.merges;
	}

	public static String getDedupCF_merges(final String type) {
		return getDedupCF_merges(Type.valueOf(type));
	}

	public static byte[] getDedupCF_mergesBytes(final Type type) {
		return Bytes.toBytes(getDedupCF_merges(type));
	}

	public static byte[] getDedupCF_mergesBytes(final String type) {
		return getDedupCF_mergesBytes(Type.valueOf(type));
	}

	public static String getDedupCF_mergedIn(final Type type) {
		return getRelType(type) + CF_SEPARATOR + SubRelType.dedup + CF_SEPARATOR + Dedup.RelName.isMergedIn;
	}

	public static String getDedupCF_mergedIn(final String type) {
		return getDedupCF_mergedIn(Type.valueOf(type));
	}

	public static byte[] getDedupCF_mergedInBytes(final Type type) {
		return Bytes.toBytes(getDedupCF_mergedIn(type));
	}

	public static byte[] getDedupCF_mergedInBytes(final String type) {
		return getDedupCF_mergedInBytes(Type.valueOf(type));
	}

	public static String getSimilarityCF(final Type type) {
		return getRelType(type) + CF_SEPARATOR + SubRelType.dedupSimilarity + CF_SEPARATOR + DedupSimilarity.RelName.isSimilarTo;
	}

	public static String getSimilarityCF(final String type) {
		return getSimilarityCF(Type.valueOf(type));
	}

	public static byte[] getSimilarityCFBytes(final Type type) {
		return Bytes.toBytes(getSimilarityCF(type));
	}

	public static byte[] getSimilarityCFBytes(final String type) {
		return getSimilarityCFBytes(Type.valueOf(type));
	}

	public static String getRelTypeString(final Type type) {
		return getRelType(type).toString();
	}

	public static RelType getRelType(final Type type) {
		switch (type) {
		case organization:
			return RelType.organizationOrganization;
		case person:
			return RelType.personPerson;
		case result:
			return RelType.resultResult;
		default:
			throw new IllegalArgumentException("Deduplication not supported for entity type: " + type);
		}
	}

	public static ColumnFamily decodeCF(final byte[] b) {
		final String[] s = new String(b).split(CF_SEPARATOR);
		return new DedupUtils().getCF(RelType.valueOf(s[0]), SubRelType.valueOf(s[1]));
	}

	private ColumnFamily getCF(final RelType relType, final SubRelType subRelType) {
		return new ColumnFamily(relType, subRelType);
	}

	public static OafRel.Builder getDedup(final DedupConfig dedupConf, final String from, final String to, final Dedup.RelName relClass) {
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
		final Builder oafRel = getRelBuilder(from, to, relClass.name(), DedupUtils.getRelType(type));
		switch (type) {
		case organization:
			oafRel.setOrganizationOrganization(OrganizationOrganization.newBuilder().setDedup(
					dedup(relClass.name(), "dnet:organization_organization_relations")));
			break;
		case person:
			oafRel.setPersonPerson(PersonPerson.newBuilder().setDedup(DedupUtils.dedup(relClass.name(), "dnet:person_person_relations")));
			break;
		case result:
			oafRel.setResultResult(ResultResult.newBuilder().setDedup(DedupUtils.dedup(relClass.name(), "dnet:result_result_relations")));
			break;
		default:
			throw new IllegalArgumentException("Deduplication not supported for entity type: " + dedupConf.getWf().getEntityType());
		}
		return oafRel;
	}

	public static OafRel.Builder getDedupSimilarity(final DedupConfig dedupConf, final String from, final String to) {
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
		final String isSimilarTo = DedupSimilarity.RelName.isSimilarTo.name();
		final Builder oafRel = getRelBuilder(from, to, isSimilarTo, DedupUtils.getRelType(type));
		switch (type) {
		case organization:
			oafRel.setOrganizationOrganization(OrganizationOrganization.newBuilder().setDedupSimilarity(
					dedupSimilarity(isSimilarTo, "dnet:organization_organization_relations")));
			break;
		case person:
			oafRel.setPersonPerson(PersonPerson.newBuilder().setDedupSimilarity(dedupSimilarity(isSimilarTo, "dnet:person_person_relations")));
			break;
		case result:
			oafRel.setResultResult(ResultResult.newBuilder().setDedupSimilarity(dedupSimilarity(isSimilarTo, "dnet:result_result_relations")));
			break;
		default:
			throw new IllegalArgumentException("Deduplication not supported for entity type: " + dedupConf.getWf().getEntityType());
		}
		return oafRel;
	}

	private static Builder getRelBuilder(final String from, final String to, final String relClass, final RelType relType) {
		return OafRel.newBuilder().setRelType(relType).setSubRelType(SubRelType.dedup).setRelClass(relClass).setChild(false)
				.setSource(new String(from)).setTarget(new String(to));
	}

	private static Dedup.Builder dedup(final String relClass, final String scheme) {
		return Dedup.newBuilder().setRelMetadata(getRelMetadata(relClass, scheme));
	}

	private static DedupSimilarity.Builder dedupSimilarity(final String relClass, final String scheme) {
		return DedupSimilarity.newBuilder().setRelMetadata(getRelMetadata(relClass, scheme));
	}

	private static RelMetadata.Builder getRelMetadata(final String relClass, final String scheme) {
		return RelMetadata.newBuilder().setSemantics(
				Qualifier.newBuilder().setClassid(relClass).setClassname(relClass).setSchemeid(scheme).setSchemename(scheme));
	}

	public static Oaf.Builder buildRel(final DedupConfig dedupConf, final OafRel.Builder oafRel, final double trust) {
		final double sTrust = Algorithms.scale(trust, JobParams.MAX_DEDUP_TRUST);
		final Oaf.Builder oaf =
				Oaf.newBuilder()
						.setKind(Kind.relation)
						.setLastupdatetimestamp(System.currentTimeMillis())
						.setDataInfo(
								AbstractDNetXsltFunctions.getDataInfo(null, "", String.valueOf(sTrust), false, true).setInferenceprovenance(
										dedupConf.getWf().getConfigurationId())).setRel(oafRel);
		return oaf;
	}

	class ColumnFamily {

		private final RelType relType;
		private final SubRelType subRelType;

		public ColumnFamily(final RelType relType, final SubRelType subRelType) {
			this.relType = relType;
			this.subRelType = subRelType;
		}

		@Override
		public String toString() {
			return getRelType() + CF_SEPARATOR + getSubRelType();
		}

		public RelType getRelType() {
			return relType;
		}

		public SubRelType getSubRelType() {
			return subRelType;
		}

	}

}
