package eu.dnetlib.data.mapreduce.hbase;

import java.math.BigInteger;
import java.util.List;
import java.util.Set;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import eu.dnetlib.data.proto.RelTypeProtos.RelType;
import eu.dnetlib.data.proto.TypeProtos.Type;

/**
 * Common static utility methods to manage the hbase tables
 * 
 * @author claudio
 * 
 */
public class HBaseTableUtils {

	public final static String lowerKey = "0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000";
	public final static String upperKey = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz";

	public enum VolatileColumnFamily {
		dedup, dedupPerson, instance; // instance is here to remove the old protos

		public static boolean isVolatile(String columnName) {
			try {
				return VolatileColumnFamily.valueOf(columnName) != null;
			} catch (Throwable e) {
				return false;
			}
		}
	}

	private static Function<Type, String> typeName = new Function<Type, String>() {
		@Override
		public String apply(Type type) {
			return type.toString();
		}
	};

	private static Function<RelType, String> relTypeName = new Function<RelType, String>() {
		@Override
		public String apply(RelType type) {
			return type.toString();
		}
	};

	private static Function<VolatileColumnFamily, String> extraColumnFamilyName = new Function<VolatileColumnFamily, String>() {
		@Override
		public String apply(VolatileColumnFamily type) {
			return type.toString();
		}
	};

	public static Set<String> listColumns() {
		return Sets.newHashSet(Iterables.concat(Iterables.transform(Lists.newArrayList(Type.values()), typeName),
				Iterables.transform(Lists.newArrayList(RelType.values()), relTypeName),
				Iterables.transform(Lists.newArrayList(VolatileColumnFamily.values()), extraColumnFamilyName)));
	}

	public static byte[][] getHexSplits(List<String> splitKeys) {
		byte[][] splits = new byte[splitKeys.size() - 1][];
		for (int i = 0; i < splitKeys.size() - 1; i++) {
			byte[] b = splitKeys.get(i).getBytes();
			splits[i] = b;
		}
		return splits;
	}

	public static byte[][] getHexSplits(int numRegions) {
		return getHexSplits(lowerKey, upperKey, numRegions);
	}

	public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) {
		byte[][] splits = new byte[numRegions - 1][];
		BigInteger lowestKey = new BigInteger(startKey, 16);
		BigInteger highestKey = new BigInteger(endKey, 16);
		BigInteger range = highestKey.subtract(lowestKey);
		BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions));
		lowestKey = lowestKey.add(regionIncrement);
		for (int i = 0; i < numRegions - 1; i++) {
			BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
			byte[] b = String.format("%016x", key).getBytes();
			splits[i] = b;
		}
		return splits;
	}

}
