package eu.dnetlib.data.hadoop.hbase;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;

import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.google.common.collect.Sets;

import eu.dnetlib.data.hadoop.HadoopServiceCore;
import eu.dnetlib.data.hadoop.config.ClusterName;
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
import eu.dnetlib.miscutils.datetime.DateUtils;

@ActiveProfiles("test")
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = HBaseTestContextConfiguration.class)
public class HBaseTest {

	protected static final String TEST_TABLE = "dnet_test_table";

	protected static final int NUM_VERSIONS = 10;

	@Autowired
	private HadoopServiceCore hadoopServiceCore;

	@Autowired
	private ConfigurationEnumerator configurationEnumerator;

	@Before
	public void setUp() throws HadoopServiceException, IOException, InterruptedException {
		assertNotNull(hadoopServiceCore);

		System.out.println("waiting for clients to be ready... timeout? " + !hadoopServiceCore.getClients().waitClients());

		ensureDropTable();
	}

	@After
	public void tearDown() throws HadoopServiceException, IOException {
		ensureDropTable();
	}

	@Test
	@Ignore
	// TODO allow testing on a dev cluster instance
	public void testReadWrite() throws HadoopServiceException, IOException, InterruptedException {

		hadoopServiceCore.createTable(ClusterName.DM, TEST_TABLE, testSchema());
		assertTrue(hadoopServiceCore.existTable(ClusterName.DM, TEST_TABLE));

		final HTable htable = new HTable(configurationEnumerator.get(ClusterName.DM), TEST_TABLE);

		final Put put = new Put(Bytes.toBytes("1"));

		for (int i = 0; i < NUM_VERSIONS; i++) {
			put.add(Bytes.toBytes("result"), Bytes.toBytes("body"), Bytes.toBytes(i + ""));
			htable.put(put);
			Thread.sleep(1000);
		}
		final Get get = new Get(Bytes.toBytes("1"));
		get.setMaxVersions(HBaseTestContextConfiguration.MAX_VERSIONS);

		final Result r = htable.get(get);

		// Map<family,Map<qualifier,Map<timestamp,value>>>
		final NavigableMap<Long, byte[]> versions = r.getMap().get(Bytes.toBytes("result")).get(Bytes.toBytes("body"));

		for (final Entry<Long, byte[]> e : versions.entrySet()) {
			System.out.println("t: " + DateUtils.calculate_ISO8601(e.getKey()) + ", v: " + Bytes.toString(e.getValue()));
		}

		htable.close();

	}

	protected void ensureDropTable() throws HadoopServiceException, IOException {
		if (hadoopServiceCore.existTable(ClusterName.DM, TEST_TABLE)) {
			hadoopServiceCore.dropTable(ClusterName.DM, TEST_TABLE);
		}
	}

	protected Set<String> testSchema() {
		final Set<String> schema = Sets.newHashSet();

		schema.add("result");

		return schema;
	}

}
