package eu.dnetlib.data.transform;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.googlecode.protobuf.format.JsonFormat;
import eu.dnetlib.data.graph.model.DNGFRowKeyDecoder;
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO;
import eu.dnetlib.data.proto.DNGFProtos.DNGF;
import eu.dnetlib.data.proto.TypeProtos.Type;

import eu.dnetlib.data.proto.dli.Scholix2ObjectProtos;
import eu.dnetlib.dli.proto.DNGF2Scholix2Converter;
import eu.dnetlib.dli.proto.DNGFDLISummaryConverter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import static eu.dnetlib.data.graph.utils.RelDescriptor.QUALIFIER_SEPARATOR;
import static eu.dnetlib.data.proto.dli.Scholix2ObjectProtos.*;

/**
 * Created by sandro on 2/13/17.
 */
public class DLIConvertertTest extends AbstractTransformerTest {

	private static final Log log = LogFactory.getLog(DLIConvertertTest.class);

	private Ontologies ontologies;

    @Before
    public void setUp() throws Exception {
        factory = new XsltRowTransformerFactory();
        ontologies = OntologyLoader.loadOntologiesFromCp();
    }

    @Ignore
    public void testLinkPangaeaDLI() throws Exception {

        final List<Row> rows = Lists.newArrayList();
        rows.addAll(asRows(loadFromTransformationProfile("dmfdli2hbase.xml"), load("record_dli_dmf.xml")));

	    printAllSummary(mapAllSummary(buildTable(rows)));
    }

	@Test
	@Ignore
	public void testLinkPangaeaDLIScholix() throws Exception {

		final List<Row> rows = Lists.newArrayList();
		rows.addAll(asRows(loadFromTransformationProfile("dmfdli2hbase.xml"), load("record_dli_dmf2.xml")));
		rows.addAll(asRows(loadFromTransformationProfile("dmfdli2hbase.xml"), load("record_dli_dmf.xml")));
//		rows.forEach(row ->
//				row.getColumns().forEach(
//						result -> {
//							if (result != null) {
//
//
//								final DNGFDecoder decoder =
//										DNGFDecoder.decode(result.getValue(), DliFieldTypeProtos.completionStatus, DliProtos.completionStatus, DliProtos.resolvedfrom, DliProtos.typedIdentifier);
//								System.out.println(row.getKey());
//								Put put = HBaseTableDAO.asPutByCollectedFrom(decoder.getDNGF());
//								System.out.println("put.getTimeStamp() = " + put.getTimeStamp());
//
//								System.out.println("decoder.getDNGF().toString() = " + decoder.getDNGF().toString());
//							}
//						}
//				));




//		//rows.addAll(asRows(loadFromTransformationProfile("pmfdli2hbase.xml"), load("record_dli_pmf.xml")));
		reduceScholix(mapAllScholix(buildTable(rows)));
	}

	private void reduceScholix(final Map<String, List<Scholix>> scholix) {
    	scholix.forEach((id, list) -> {
			    final Iterator<Scholix> it = list.iterator();
			    final Scholix source = it.next();
			    if (source.hasSource()) {
				    it.forEachRemaining(s -> log.info(JsonFormat.printToString(
						    Scholix.newBuilder(s)
								    .setSource(source.getSource())
								    .build())));
			    } else {
			    	log.warn("missing source in scholix: " + id);
			    }
		    }
	    );
    }

	private void printAllSummary(final Map<String, DNGFDLISummaryConverter> converters) {
		converters.forEach((id, converter) -> log.info(JsonFormat.printToString(converter.convert())));
	}

	private void printAllScholix(final Map<String, List<Scholix>> converters) {
		converters.values().forEach(converter -> converter.forEach(s -> log.info(JsonFormat.printToString(s))));
	}

	private Map<String, DNGFDLISummaryConverter> mapAllSummary(final Map<String, Map<String, Map<String, byte[]>>> table) throws Exception {
		final Map<String, DNGFDLISummaryConverter> builders = Maps.newHashMap();
		for (final Entry<String, Map<String, Map<String, byte[]>>> e : table.entrySet()) {
				mapSummary(builders, e.getKey(), e.getValue());
	    }
	    return builders;
    }

	private Map<String, List<Scholix>> mapAllScholix(final Map<String, Map<String, Map<String, byte[]>>> table) throws Exception {
		final Map<String, List<Scholix>> builders = Maps.newHashMap();
		for (final Entry<String, Map<String, Map<String, byte[]>>> e : table.entrySet()) {
			mapScholix(builders, e.getKey(), e.getValue());
		}
		return builders;
	}

	private void mapScholix(final Map<String, List<Scholix>> builders, final String rowKey, final Map<String, Map<String, byte[]>> row) {

		final Type type = DNGFRowKeyDecoder.decode(rowKey).getType();
		final Map<String, byte[]> familyMap = row.get("metadata");

		if (familyMap == null) return;

		final byte[] bodyB = familyMap.get(type.toString());

		if (bodyB != null) {
			ensureScholixBuilder(builders, rowKey);
			final DNGF entity = HBaseTableDAO.parseProto(familyMap.get(type.toString()));

			final Scholix.Builder source = DNGF2Scholix2Converter.withSource(entity.getEntity());
			builders.get(rowKey).add(source.build());

			final Map<String, byte[]> rels = row.get("rels");
			rels.forEach((q, v) -> {
				final DNGF.Builder r = DNGF.newBuilder(HBaseTableDAO.parseProto(v));
                final Scholix.Builder target = DNGF2Scholix2Converter.withTarget(entity.getEntity(), r.getRel());
                ensureScholixBuilder(builders, r.getRel().getTarget());
				builders.get(r.getRel().getTarget()).add(target.build());
			});
		}
	}

	private void mapSummary(final Map<String, DNGFDLISummaryConverter> converters, final String rowKey, final Map<String, Map<String, byte[]>> row) throws Exception {

		final Type type = DNGFRowKeyDecoder.decode(rowKey).getType();

		final Map<String, byte[]> familyMap = row.get("metadata");

		if (familyMap == null) return;

		final byte[] bodyB = familyMap.get(type.toString());

		if (bodyB != null) {
			ensureSummaryBuilder(converters, rowKey);

			final DNGF mainEntity = HBaseTableDAO.parseProto(familyMap.get(type.toString()));
			converters.get(rowKey).setMainEntity(mainEntity);

			final Map<String, byte[]> rels = row.get("rels");
			if (rels!= null)
				rels.forEach((q, v) -> {

					final DNGF.Builder rel = DNGF.newBuilder(HBaseTableDAO.parseProto(v));
					rel.getRelBuilder().setCachedTarget(mainEntity.getEntity());

					final String source = rel.getRel().getSource();
					final Type sourceType = rel.getRel().getSourceType();

					rel.getRelBuilder().setSource(rel.getRel().getTarget());
					rel.getRelBuilder().setSourceType(rel.getRel().getTargetType());

					rel.getRelBuilder().setTarget(source);
					rel.getRelBuilder().setTargetType(sourceType);

					final Ontology o = ontologies.get(rel.getRel().getRelType().getSchemeid());
					final String inverse = o.inverseOf(rel.getRel().getRelType().getClassid());

					rel.getRelBuilder().getRelTypeBuilder().setClassid(inverse).setClassname(inverse);

					final String targetId = StringUtils.substringAfter(q, QUALIFIER_SEPARATOR);

					ensureSummaryBuilder(converters, targetId);
					converters.get(targetId).addRelation(rel.build());
				});
		}

	}

	private void ensureSummaryBuilder(final Map<String, DNGFDLISummaryConverter> builders, final String rowKey) {
		if (!builders.containsKey(rowKey)) {
			builders.put(rowKey, new DNGFDLISummaryConverter());
		}
	}

	private void ensureScholixBuilder(final Map<String, List<Scholix>> builders, final String rowKey) {
		if (!builders.containsKey(rowKey)) {
			builders.put(rowKey, Lists.newLinkedList());
		}
	}

}
