package eu.dnetlib.actionmanager.hbase;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import javax.annotation.Resource;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Required;

import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;

import eu.dnetlib.actionmanager.ActionManagerConstants;
import eu.dnetlib.actionmanager.ActionManagerConstants.ACTION_TYPE;
import eu.dnetlib.actionmanager.ActionManagerConstants.COLUMN_FAMILIES;
import eu.dnetlib.actionmanager.rmi.ActionManagerException;
import eu.dnetlib.data.hadoop.config.ClusterName;
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
import eu.dnetlib.data.proto.OafProtos.Oaf;

public class HBaseClient {
	private String tableName;

	@Resource
	private ConfigurationEnumerator configurationEnumerator;

	private Configuration config;

	private static final Log log = LogFactory.getLog(HBaseClient.class); // NOPMD by marko on 11/24/08 5:02 PM

	private final Executor executor = Executors.newSingleThreadExecutor();

	private final Function<Result, Map<String, String>> transformRow = new Function<Result, Map<String, String>>() {
		@Override
		public Map<String, String> apply(Result r) {
			Map<String, String> map = Maps.newHashMap();
			map.put("rowId", Bytes.toString(r.getRow()));

			for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> e1 : r.getMap().entrySet()) {
				String cf = Bytes.toString(e1.getKey());
				for (Entry<byte[], NavigableMap<Long, byte[]>> e2 : e1.getValue().entrySet()) {
					byte[] bb = e2.getValue().get(Collections.max(e2.getValue().keySet()));
					String key = cf + ":" + Bytes.toString(e2.getKey());
					String value = "";
					try {
						value = (key.equals("target:content")) ? Oaf.parseFrom(bb).toString() : Bytes.toString(bb);
					} catch (InvalidProtocolBufferException e) {
						log.error("Problem parsing protobuf !!!");
					}
					map.put(key, value);
				}
			}
			return map;
		}
	};

	public void init() {
		performInit(false);
	}

	public void initWithEmptyTable() {
		performInit(true);
	}

	private void performInit(final boolean delete) {
		executor.execute(new Runnable() {
			@Override
			public void run() {
				try {
					log.info("Initializing Action Manager");
					loadConfiguration();
					prepareTable(delete);
					log.info("ActionManager is ready");
				} catch (IOException e) {
					log.error("Error initializing action manager", e);
				}
			}
		});
	}

	private void prepareTable(boolean delete) throws IOException {

		final HBaseAdmin admin = new HBaseAdmin(config);
		try {
			if (delete && admin.tableExists(tableName)) {
				log.info("Deleting existing hbase table: " + tableName);
				admin.disableTable(tableName);
				admin.deleteTable(tableName);
			}

			if (!admin.tableExists(tableName)) {
				log.info("Creating missing hbase table: " + tableName);
				admin.createTable(new HTableDescriptor(tableName));
			}

			final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(getTableName()));

			Set<String> currents = Sets.newHashSet();
			for (HColumnDescriptor hcd : desc.getColumnFamilies()) {
				currents.add(hcd.getNameAsString());
			}

			Set<String> missing = Sets.newHashSet();
			for (COLUMN_FAMILIES cf : ActionManagerConstants.COLUMN_FAMILIES.values()) {
				if (!currents.contains(cf.toString())) {
					missing.add(cf.toString());
				}
			}

			if (!missing.isEmpty()) {
				if (admin.isTableEnabled(getTableName())) {
					admin.disableTable(getTableName());
				}

				for (String column : missing) {
					log.info("hbase table: '" + getTableName() + "', adding columnFamily: " + column);
					admin.addColumn(getTableName(), new HColumnDescriptor(column));
				}

				admin.enableTable(getTableName());
			}
		} finally {
			admin.close();
		}
	}

	private void loadConfiguration() throws IOException {
		config = configurationEnumerator.get(ClusterName.DM);
		log.info(config.toString());
	}

	public Configuration getConfig() {
		return config;
	}

	public void write(List<Put> puts) throws ActionManagerException {
		try {
			final HTable table = new HTable(config, tableName);
			try {
				log.info("Adding " + puts.size() + " action(s) to " + tableName);
				table.put(puts);
			} finally {
				table.flushCommits();
				table.close();
			}
		} catch (IOException e) {
			throw new ActionManagerException(e);
		}
	}

	public void write(List<Put> puts, HTable table) throws ActionManagerException {
		try {
			log.info("Adding " + puts.size() + " action(s) to " + tableName);
			table.put(puts);
		} catch (IOException e) {
			throw new ActionManagerException(e);
		}
	}

	public void delete(List<Delete> deletes) throws ActionManagerException {
		try {
			final HTable table = new HTable(config, tableName);
			try {
				log.info("Deleting " + deletes.size() + " action(s) from " + tableName);
				table.delete(deletes);
			} finally {
				table.flushCommits();
				table.close();
			}
		} catch (IOException e) {
			throw new ActionManagerException(e);
		}
	}

	public Map<String, String> getRow(byte[] id) throws ActionManagerException {
		try {
			final HTable table = new HTable(config, tableName);
			try {
				Result result = table.get(new Get(id));
				return transformRow.apply(result);
			} finally {
				table.close();
			}
		} catch (IOException e) {
			throw new ActionManagerException(e);
		}
	}

	public List<Map<String, String>> retrieveRows(String prefix, String start, int limit) throws ActionManagerException {
		List<Map<String, String>> list = Lists.newArrayList();

		try {
			final HTable table = new HTable(config, tableName);
			try {
				Filter filter = (prefix == null || prefix.isEmpty()) ? new PageFilter(limit) : new FilterList(FilterList.Operator.MUST_PASS_ALL,
						new PageFilter(limit), new PrefixFilter(Bytes.toBytes(prefix)));
				Scan scan = new Scan();
				if (start != null) {
					scan.setStartRow(Bytes.toBytes(start));
				}
				scan.setFilter(filter);

				ResultScanner rs = table.getScanner(scan);
				try {
					Iterator<Map<String, String>> iter = Iterators.transform(rs.iterator(), transformRow);
					while (iter.hasNext()) {
						list.add(iter.next());
					}
				} finally {
					rs.close();
				}
			} finally {
				table.close();
			}
		} catch (IOException e) {
			throw new ActionManagerException(e);
		}
		return list;
	}

	public List<String> retrieveActionsByAgent(String agentId, int limit) throws ActionManagerException {
		List<String> list = Lists.newArrayList();

		String prefix = ACTION_TYPE.pkg + "|" + Hashing.md5().hashString(agentId) + "|";

		try {
			final HTable table = new HTable(config, tableName);
			try {

				FilterList filter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
				filter.addFilter(new PageFilter(limit));
				filter.addFilter(new PrefixFilter(Bytes.toBytes(prefix)));

				Scan scan = new Scan();
				scan.setFilter(filter);

				ResultScanner rs = table.getScanner(scan);
				Iterator<Result> iter = rs.iterator();
				while (iter.hasNext()) {
					Result r = iter.next();
					Map<String, String> map = Maps.newHashMap();
					map.put("id", Bytes.toString(r.getRow()));
					map.put("content", Bytes.toString(r.getValue(ActionManagerConstants.ACTION_COLFAMILY, Bytes.toBytes(ACTION_TYPE.pkg.toString()))));
					list.add(new Gson().toJson(map));
				}
			} finally {
				table.close();
			}
		} catch (IOException e) {
			throw new ActionManagerException(e);
		}
		return list;
	}

	public String getTableName() {
		return tableName;
	}

	@Required
	public void setTableName(String tableName) {
		this.tableName = tableName;
	}

}
