package eu.dnetlib.data.mapreduce.util;

import java.util.List;
import java.util.Set;

import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import eu.dnetlib.data.proto.FieldTypeProtos.KeyValue;
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
import eu.dnetlib.data.proto.FieldTypeProtos.StringField;
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty;
import eu.dnetlib.data.proto.PersonProtos.Person;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import com.google.common.base.Function;
import com.google.protobuf.InvalidProtocolBufferException;

import eu.dnetlib.data.proto.OafProtos.Oaf;
import eu.dnetlib.data.transform.OafUtils;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class OafHbaseUtils extends OafUtils {

	public static OafDecoder decode(final ImmutableBytesWritable oaf) {
		return new OafDecoder(oaf.copyBytes());
	}

	public static Function<ImmutableBytesWritable, OafDecoder> decoder() {
		return new Function<ImmutableBytesWritable, OafDecoder>() {

			@Override
			public OafDecoder apply(final ImmutableBytesWritable input) {
				return OafDecoder.decode(input.copyBytes());
			}
		};
	}

	public static Iterable<Oaf> asOaf(final Iterable<ImmutableBytesWritable> in) {
		return Iterables.transform(in, oafDecoder());
	}

	public static Function<ImmutableBytesWritable, Oaf> oafDecoder() {
		return new Function<ImmutableBytesWritable, Oaf>() {

			@Override
			public Oaf apply(final ImmutableBytesWritable input) {
				return parse(input);
			}
		};
	}

	public static Oaf parse(final ImmutableBytesWritable input) {
		try {
			return Oaf.parseFrom(input.copyBytes());
		} catch (final InvalidProtocolBufferException e) {
			throw new IllegalArgumentException(e);
		}
	}

	public static <T> String getValue(T t) {
		return mapValue(t);
	}

	public static <T> String getKey(T t) {
		return mapKey(t);
	}

	public static <T> String getValue(Iterable<T> ts) {
		return Iterables.getFirst(listValues(ts), "");
	}

	public static <T> Set<String> hashSetValues(Iterable<T> ts) {
		return Sets.newHashSet(Iterables.transform(ts, new Function<T, String>() {
			@Override
			public String apply(final T t) {
				return mapValue(t);
			}
		}));
	}

	public static <T> List<String> listValues(Iterable<T> ts) {
		return Lists.newArrayList(Iterables.transform(ts, new Function<T, String>() {
			@Override
			public String apply(final T t) {
				return mapValue(t);
			}
		}));
	}

	public static <T> String getKey(Iterable<T> ts) {
		return Iterables.getFirst(listKeys(ts), "");
	}

	public static <T> List<String> listKeys(Iterable<T> ts) {
		return Lists.newArrayList(Iterables.transform(ts, new Function<T, String>() {
			@Override
			public String apply(final T t) {
				return mapKey(t);
			}
		}));
	}

	public static <T> Set<String> hashSetKeys(Iterable<T> ts) {
		return Sets.newHashSet(Iterables.transform(ts, new Function<T, String>() {
			@Override
			public String apply(final T t) {
				return mapKey(t);
			}
		}));
	}

	private static <T> String mapKey(final T t) {
		if (t instanceof KeyValue) return ((KeyValue) t).getKey();
		if (t instanceof String) return (String) t;
		if (t instanceof Qualifier) return ((Qualifier) t).getClassid();

		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
	}

	private static <T> String mapValue(final T t) {
		if (t instanceof StructuredProperty) return ((StructuredProperty) t).getValue();
		if (t instanceof KeyValue) return ((KeyValue) t).getValue();
		if (t instanceof String) return (String) t;
		if (t instanceof Person) return ((Person) t).getMetadata().getFullname().getValue();
		if (t instanceof StringField) return ((StringField) t).getValue();
		if (t instanceof Qualifier) return ((Qualifier) t).getClassname();

		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
	}

	public static List<String> getPropertyValues(final Reducer.Context context, final String name) {
		return doGetPropertyValues(context.getConfiguration().get(name, ""));
	}

	public static List<String> getPropertyValues(final Mapper.Context context, final String name) {
		return doGetPropertyValues(context.getConfiguration().get(name, ""));
	}

	private static List<String> doGetPropertyValues(final String s) {
		return Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(s));
	}

}
