package eu.dnetlib.data.transform;

import java.io.IOException;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import eu.dnetlib.data.graph.model.DNGFDecoder;
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset;
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset.GeoLocation;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.*;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

/**
 * Created by claudio on 05/09/16.
 */
public class HBaseReadWriteTest {

	private static final String TABLE_NAME = "db_dli";
	private Resource confIn = new ClassPathResource("eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties");

	private HTable table;

	//@Before
	public void setUp() throws IOException {

		final Configuration conf = new Configuration();

		for(final String line : IOUtils.readLines(confIn.getInputStream())) {
			System.out.println("line = " + line);
			if (!line.trim().isEmpty() && !line.startsWith("#")) {
				final String[] split = line.split("=");
				conf.set(split[0].trim(), split[1].trim());
			}
		}

		table = new HTable(conf, Bytes.toBytes(TABLE_NAME));
	}

	@Ignore
	//@Test
	public void testReadGeoLocations() throws IOException {

		final Scan scan = new Scan();
		scan.addColumn(Bytes.toBytes("dataset"), Bytes.toBytes("body"));

		final ResultScanner rs = table.getScanner(scan);

		System.out.println("start iteration");

		final DescriptiveStatistics statN = new DescriptiveStatistics();
		final AtomicInteger invalid = new AtomicInteger(0);
		rs.forEach(r -> {
			final byte[] b = r.getValue(Bytes.toBytes("dataset"), Bytes.toBytes("body"));
			final DNGFDecoder d = DNGFDecoder.decode(b, WdsDataset.geolocation);
			final List<GeoLocation> geoList = d.getDNGF().getEntity().getDataset().getMetadata().getExtension(WdsDataset.geolocation);
			geoList.forEach(g -> g.getBoxList().forEach(box -> {
				if (StringUtils.isNotBlank(box)) {
					final String[] split = box.trim().split(" ");
					try {
						statN.addValue(split.length);
						Assert.assertTrue("bad number of coordinates", split.length == 4);

						// Rect(minX=-180.0,maxX=180.0,minY=-90.0,maxY=90.0)

						Assert.assertTrue("minX=-180", Double.parseDouble(split[1]) >= -180.0);
						Assert.assertTrue("maxX= 180", Double.parseDouble(split[3]) <=  180.0);
						Assert.assertTrue("minY= -90", Double.parseDouble(split[0]) >= -90.0);
						Assert.assertTrue("maxY=  90", Double.parseDouble(split[2]) <=  90.0);

						//maxY must be >= minY: 90.0 to -90.0
						Assert.assertTrue("maxY must be >= minY", Double.parseDouble(split[2]) >= Double.parseDouble(split[0]));

						//maxY must be >= minY: 90.0 to -90.0
						Assert.assertTrue("maxX must be >= minX", Double.parseDouble(split[3]) >= Double.parseDouble(split[1]));
					} catch (AssertionError e) {
						invalid.set(invalid.get() + 1);
						//System.err.println(String.format("document %s has %s coordinates: %s", d.getDNGF().getEntity().getId(), split.length, e.getMessage()));
						//throw e;
					}
				}
			}));
		});

		rs.close();

		System.out.println(String.format("stat N: %s", statN));
		System.out.println(String.format("invalid N: %s", invalid.get()));
	}

	//@Ignore
	//@Test
	public void testWriteVersions() throws IOException {

		final byte[] ROW_ID = "test".getBytes();
		IntStream.range(0, 10).forEach(
				i -> {
					//final long ts = System.nanoTime() + i;
					final Put put = new Put(ROW_ID);
					put.setWriteToWAL(true);
					put.add("rels".getBytes(), "pippo".getBytes(), String.valueOf(i).getBytes());

					System.out.println("writing i = " + i);

					try {
						table.put(put);
					} catch (IOException e) {
						e.printStackTrace();
					}
				}
		);

		System.out.println("done writing");

		final Scan scan = new Scan();
		scan.setMaxVersions(100);
		scan.addColumn("rels".getBytes(), "pippo".getBytes());

		final ResultScanner rs = table.getScanner(scan);

		System.out.println("start iteration");

		rs.forEach(r -> {
			final NavigableMap<Long, byte[]> versions = r.getMap().get("rels".getBytes()).get("pippo".getBytes());

			versions.entrySet().forEach(e -> {
				System.out.println("ts = " + e.getKey() + " value: " + new String(e.getValue()));
			});
		});

		rs.close();
	}

	//@After
	public void tearDown() throws IOException {
		table.close();
	}
}
