package eu.dnetlib.data.hadoop.utils;

import java.util.Map.Entry;
import java.util.Set;

import eu.dnetlib.rmi.data.hadoop.ClusterName;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;

import com.google.common.base.Function;
import com.google.common.base.Predicate;

import eu.dnetlib.data.hadoop.HadoopJob;
import eu.dnetlib.rmi.data.hadoop.HadoopJobDescriptor;
import eu.dnetlib.rmi.data.hadoop.HadoopServiceException;

public class HadoopUtils {

	public static Function<HTableDescriptor, String> tableName() {
		return d -> d.getNameAsString();
	}

	public static Function<HColumnDescriptor, String> columnName() {
		return d -> d.getNameAsString();
	}

	public static Predicate<String> columnPredicate(final Set<String> cols) {
		return new HadoopUtils().getSetPredicate(cols);
	}

	public SetPredicate getSetPredicate(final Set<String> set) {
		return new SetPredicate(set);
	}

	class SetPredicate implements Predicate<String> {

		private final Set<String> set;

		public SetPredicate(final Set<String> set) {
			this.set = set;
		}

		@Override
		public boolean apply(final String s) {
			return !set.contains(s);
		}
	}

	public static Function<Entry<String, HadoopJob>, HadoopJobDescriptor> hadoopJobDescriptor() {
		return e -> {
			try {
				return e.getValue().asDescriptor();
			} catch (HadoopServiceException e1) {
				return null;
			}
		};
	}

	public static Predicate<HadoopJob> filterByCluster(final ClusterName clusterName) {
		return job -> job.getClusterName().equals(clusterName);
	}

}
