package eu.dnetlib.data.mapreduce.util;

import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import eu.dnetlib.data.proto.FieldTypeProtos.*;
import static eu.dnetlib.openaire.hadoop.utils.HBaseTableUtils.*;

import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.data.proto.TypeProtos;
import eu.dnetlib.openaire.hadoop.utils.HBaseTableUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import com.google.common.base.Function;
import com.google.protobuf.InvalidProtocolBufferException;

import eu.dnetlib.data.proto.OafProtos.Oaf;
import eu.dnetlib.data.transform.OafUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;



public class OafHbaseUtils extends OafUtils {

	public static OafDecoder decode(final ImmutableBytesWritable oaf) {
		return new OafDecoder(oaf.copyBytes());
	}

	public static Function<ImmutableBytesWritable, OafDecoder> decoder() {
		return input -> OafDecoder.decode(input.copyBytes());
	}

	public static Iterable<Oaf> asOaf(final Iterable<ImmutableBytesWritable> in) {
		return Iterables.transform(in, oafDecoder());
	}

	public static Function<ImmutableBytesWritable, Oaf> oafDecoder() {
		return input -> parse(input);
	}

	public static Oaf parse(final ImmutableBytesWritable input) {
		try {
			return Oaf.parseFrom(input.copyBytes());
		} catch (final InvalidProtocolBufferException e) {
			throw new IllegalArgumentException(e);
		}
	}

	public static <T> String getValue(T t) {
		return mapValue(t);
	}

	public static <T> String getKey(T t) {
		return mapKey(t);
	}

	public static <T> String getValue(Iterable<T> ts) {
		return Iterables.getFirst(listValues(ts), "");
	}

	public static <T> Set<String> hashSetValues(Iterable<T> ts) {
		return Sets.newHashSet(Iterables.transform(ts, t -> mapValue(t)));
	}

	public static <T> List<String> listValues(Iterable<T> ts) {
		return Lists.newArrayList(Iterables.transform(ts, t -> mapValue(t)));
	}

	public static <T> List<String> listObjects(Iterable<T> ts) {
		return Lists.newArrayList(Iterables.transform(ts, t -> mapObject(t)));
	}

	public static <T> String getKey(Iterable<T> ts) {
		return Iterables.getFirst(listKeys(ts), "");
	}

	public static <T> List<String> listKeys(Iterable<T> ts) {
		return Lists.newArrayList(Iterables.transform(ts, t -> mapKey(t)));
	}

	public static <T> Set<String> hashSetKeys(Iterable<T> ts) {
		return Sets.newHashSet(Iterables.transform(ts, t -> mapKey(t)));
	}

	private static <T> String mapKey(final T t) {
		if (t instanceof KeyValue) return ((KeyValue) t).getKey();
		if (t instanceof String) return (String) t;
		if (t instanceof Qualifier) return ((Qualifier) t).getClassid();

		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
	}

	public static <T> String mapValue(final T t) {
		if (t instanceof StructuredProperty) return ((StructuredProperty) t).getValue();
		if (t instanceof KeyValue) return ((KeyValue) t).getValue();
		if (t instanceof String) return (String) t;
		if (t instanceof StringField) return ((StringField) t).getValue();
		if (t instanceof Qualifier) return ((Qualifier) t).getClassname();
		if (t instanceof Author) {
			Author a = (Author) t;
			if (a.getPidCount() == 0) {
				return a.getFullname();
			} else {
				return a.getFullname() + " " + listObjects(a.getPidList());
			}
		}

		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
	}

	public static <T> String mapObject(final T t) {
		if (t instanceof KeyValue) {
			KeyValue kv = (KeyValue) t;
			return kv.getKey() + ":" + kv.getValue();
		}
		if (t instanceof String) return (String) t;

		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
	}

	public static List<String> getPropertyValues(final Reducer.Context context, final String name) {
		return doGetPropertyValues(context.getConfiguration().get(name, ""));
	}

	public static List<String> getPropertyValues(final Mapper.Context context, final String name) {
		return doGetPropertyValues(context.getConfiguration().get(name, ""));
	}

	private static List<String> doGetPropertyValues(final String s) {
		return Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(s));
	}

	public static List<Oaf> rel(final Result value) {
		return value.list().stream()
				.filter(kv -> {
					final String q = new String(kv.getQualifier());
					return q.matches(OafRowKeyDecoder.ID_REGEX);
				})
				.filter(kv -> kv.getValue() != null && kv.getValue().length > 0)
				.map(kv -> parseProto(kv.getValue()))
				.collect(Collectors.toList());
	}

	public static Oaf parseProto(final byte[] value) {
		final OafDecoder d = OafDecoder.decode(value);
		return d.getOaf();
	}

	public static Put asPut(final Oaf oaf) {
		switch (oaf.getKind()) {
			case entity:

				final Put entity = getPut(oaf.getEntity().getId());
				byte[] cf = Bytes.toBytes(oaf.getEntity().getType().toString());
				return entity.add(cf, DedupUtils.BODY_B, oaf.toByteArray());
			case relation:
				final OafProtos.OafRel rel = oaf.getRel();
				final Put putRel = getPut(rel.getSource());

				final OafRelDecoder relDecoder = OafRelDecoder.decode(rel);

				final byte[] cfRel = Bytes.toBytes(relDecoder.getCFQ());
				final byte[] qualifier = Bytes.toBytes(rel.getTarget());

				return putRel.add(cfRel, qualifier, oaf.toByteArray());
			default:
				throw new IllegalArgumentException("invalid kind");
		}
	}

	private static Put getPut(final String rowkey) {
		final Put put = new Put(Bytes.toBytes(rowkey));
		put.setDurability(Durability.USE_DEFAULT);
		return put;
	}

	public static byte[] getBodyB(final Result value, final TypeProtos.Type type) {
		return value.getValue(Bytes.toBytes(type.toString()), DedupUtils.BODY_B);
	}

	public static Oaf getBody(final Result value, final TypeProtos.Type type) throws InvalidProtocolBufferException {
		final byte[] body = getBodyB(value, type);
		return body != null ? parseProto(body) : null;
	}

}
