package eu.dnetlib.data.hadoop.hbase;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
import eu.dnetlib.data.transform.Row;
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
import eu.dnetlib.rmi.common.ResultSet;
import eu.dnetlib.rmi.data.hadoop.ClusterName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * The Class HbaseTableFeeder provides abstraction to ship batch operation on an HBase table
 */
public abstract class HbaseTableFeeder {

	/**
	 * The Constant log.
	 */
	private static final Log log = LogFactory.getLog(HbaseTableFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM
	/**
	 * The configuration enumerator.
	 */
	@Autowired
	protected ConfigurationEnumerator configurationEnumerator;

	/**
	 * The resultSet client.
	 */
	@Autowired
	private ResultSetClient resultSetClient;

	/**
	 * The batch size.
	 */
	private int batchSize = 100;

	/**
	 * Adds the operation.
	 *
	 * @param buffer the buffer
	 * @param row    the row
	 */
	protected abstract void addOperation(final List<Mutation> buffer, final Row row);

	/**
	 * Feed.
	 *
	 * @param rsJson      the resultSet
	 * @param xsl         the xsl
	 * @param clusterName the cluster name
	 * @param tableName   the table name
	 * @param simulation  the simulation
	 * @return the int
	 * @throws IOException          Signals that an I/O exception has occurred.
	 * @throws InterruptedException the interrupted exception
	 */
	public int feed(final String rsJson, final String xsl, final ClusterName clusterName, final String tableName, final boolean simulation)
			throws IOException, InterruptedException {
		return doWrite(asRows(rsJson, xsl), getConf(clusterName), tableName, simulation);
	}

	/**
	 * Do writeOnHBase.
	 *
	 * @param rows          the rows
	 * @param configuration the configuration
	 * @param tableName     the table name
	 * @param simulation    the simulation
	 * @return the int
	 * @throws IOException          Signals that an I/O exception has occurred.
	 * @throws InterruptedException the interrupted exception
	 */
	private int doWrite(final Iterable<Row> rows, final Configuration configuration, final String tableName, final boolean simulation)
			throws IOException, InterruptedException {
		final List<Mutation> buffer = Lists.newArrayList();

		int count = 0;
		if (simulation) {
			log.info("running in simulation mode ...");
			log.info(String.format("... simulated import of %d records", Iterables.size(rows)));
		} else {

			final HTable htable = new HTable(configuration, tableName);
			try {
				int i = 0;
				for (final Row row : rows) {
					addOperation(buffer, row);
					if ((++i % getBatchSize()) == 0) {
						flush(tableName, buffer, htable);
						count += buffer.size();
						buffer.clear();
					}
				}
			} finally {
				if (!buffer.isEmpty()) {
					flush(tableName, buffer, htable);
					count += buffer.size();
				}
				htable.flushCommits();
				htable.close();
			}
		}
		return count;
	}

	private void flush(final String tableName, final List<Mutation> buffer, final HTable htable) throws IOException, InterruptedException {
		if (!checkOp(htable.batch(buffer), tableName)) throw new IOException("unable to flush operation on HBase table: " + tableName);
	}

	private boolean checkOp(final Object[] res, final String tableName) throws IOException {
		return Iterables.all(Arrays.asList(res), Predicates.notNull());
	}

	/**
	 * As rows.
	 *
	 * @param rsJson the resultSet
	 * @param xsl the xsl
	 * @return the iterable
	 */
	protected Iterable<Row> asRows(final String rsJson, final String xsl) {
		final ResultSet<String> resultSet = (ResultSet<String>) ResultSet.fromJson(rsJson);
		return Iterables.concat(Iterables.transform(resultSetClient.iter(resultSet, String.class), eu.dnetlib.data.transform.XsltRowTransformerFactory.newInstance(xsl)));
	}

	/**
	 * Gets the conf.
	 *
	 * @param clusterName the cluster name
	 * @return the conf
	 */
	protected Configuration getConf(final ClusterName clusterName) {
		return configurationEnumerator.get(clusterName);
	}

	/**
	 * Gets the batch size.
	 *
	 * @return the batch size
	 */
	public int getBatchSize() {
		return batchSize;
	}

	/**
	 * Sets the batch size.
	 *
	 * @param batchSize the new batch size
	 */
	public void setBatchSize(final int batchSize) {
		this.batchSize = batchSize;
	}

}
