package eu.dnetlib.data.mapreduce.actions;

import java.io.*;
import java.util.List;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;

import com.google.common.collect.Lists;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.protobuf.InvalidProtocolBufferException;
import eu.dnetlib.actionmanager.actions.ActionFactory;
import eu.dnetlib.actionmanager.actions.AtomicAction;
import eu.dnetlib.actionmanager.common.Agent;
import eu.dnetlib.data.mapreduce.hbase.Reporter;
import eu.dnetlib.data.mapreduce.hbase.dataimport.DOIBoostToActions;
import eu.dnetlib.data.proto.OafProtos.Oaf;
import eu.dnetlib.data.transform.Column;
import eu.dnetlib.data.transform.Row;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.junit.Before;
import org.junit.Test;

public class DOIBoostToActionsTest  {
    private String setName;
    private Agent agent;
    private Reporter reporter;


    @Before
    public void setup() {
        setName = "DLI";
        agent= new Agent("agentId","agentName", Agent.AGENT_TYPE.service);
        reporter = (Reporter) (counterGroup, counterName, delta) -> System.out.println(String.format("COUNTER: %s - %s : %d", counterGroup, counterName, delta));
    }

    @Test
    public void testSingleDOIBoostAction() throws IOException {
        doTestSingleDOIBoostAction("/eu/dnetlib/data/mapreduce/actions/DOIBoostAction.json");
    }

    @Test
    public void testSingleDOIBoostActionFilter() throws IOException {
        doTestSingleDOIBoostAction("/eu/dnetlib/data/mapreduce/actions/DOIBoostAction_filterOut.json");
    }

    @Test
    public void testMultipleDOIBoostActionDiscardMany() throws IOException {
        doTestAllDOIBoostAction("/eu/dnetlib/data/mapreduce/actions/doiboost_discard_many.json");
    }


    @Test
    public void testDOIBoostActionToXML() throws Exception {
        doTestSingleDOIBoostActionToXML("/eu/dnetlib/data/mapreduce/actions/broken");
    }



    @Test
    public void testMultipleDOIBoostAction() throws IOException {
        doTestAllDOIBoostAction("/eu/dnetlib/data/mapreduce/actions/part-00070");
    }



    @Test
    public void testDecompression() throws DataFormatException {
        final String s ="eJxtVM1u20YQfpW5pHAAWXCCIECORazKQYAEcHrpcbUckgvs7jA7S8vsye/QU4H25fwk/WYpKUbQ\n" +
                "mygOZ775fuaOXawjeVeY+HHi3IU6F1Ya3QNTCcqZvGQNHRd3iAuFTHVkmpxWen76i1gn9sHF9dWe\n" +
                "S3J5sZJlbeFiYdctaO6ZO+7ozc0rkr414VwD5g5FVKmTxFqDp6lIN/tKV193H2/p7c3N+9db+h3l\n" +
                "yWECuqlk6qWgRcBn/MBRpoRehEdHXXHJWZ+OkwzFTSN++9Hlga21Z9Ut7bKUJLNS4i54F+3NgLXR\n" +
                "IHe2cA15tveV/ZglytCqQkLdA9sw3VCVgbFGoWMAhY4m0VADdlbxQa4ZbSS17/D/DEySN2RERzAS\n" +
                "8oAGGEdusN+TTHM81RwBebRtOtDjK6gFtDlWqwMFzhg72kOe0wHzQSdHCITCBn8skm2uaRKxG9oa\n" +
                "4pVFU5O/z5w9r4R5wAGZkB7/LQYqCsgqNIpOoboY/my4Vm7Oo71oVZpQViFJhbDeKW/pTo5QpGxI\n" +
                "5tKIYAzXtnaWSvAQt71jlOOq4PPTP9bM7BfBoOTnp3+t5KQCen7KWkHZ5n83HyBIySYJiHa1cprq\n" +
                "Os8aBh+Mv178rPadGH1J7G3f46XZZq7hvOPFmY9BG91jy0eLBzTATvDPln6D+/jRpSny5qXtz4wp\n" +
                "HRlf1AKdrW2TonBIh7koqDoslHmQGhpxh7mDkVZ6e7ZAlevOLRvkw2Pi6g9HuoCF1dpwoWl0gfyz\n" +
                "Uk3pwgjJiu/Nhw/vV7K9s8L23dqPjk5Jx9AbkipHVzrIWvi64z7kk6wNWgtPh+PQQG7pWzALGVsH\n" +
                "BsTc6O0tsO8otDCu+cbqB252Ly5rz6Wga8hmgjMGK7qoc17mB2Et3AdnzKHiNrghwyhK9xwbgfsi\n" +
                "86R0dXu/19cowcSru92Xzzv6hb59vLvf/fF1v2/AcEk+GdMhGT5AuMx4eZBCvr6AOIVndXkDUhGv\n" +
                "ebDA9+GRm2S4Ne10YgWVaBvLeibVshDqcvbVqdvz098WbuUWmV/b2WpRPwX9pe3s4tj1XT1dBc6g\n" +
                "NOM+NBcnmOjBrdqdjNxWCeWHXwkKX9xuN0LtVGmwKJqwBgxWSVz+Aw5ePDg=";
        byte[] byteArray = Base64.decodeBase64(s.getBytes());
        Inflater decompresser = new Inflater();
        decompresser.setInput(byteArray);
        ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length);
        byte[] buffer = new byte[8192];
        while (!decompresser.finished()) {
            int size = decompresser.inflate(buffer);
            bos.write(buffer, 0, size);
        }
        byte[] unzippeddata = bos.toByteArray();
        decompresser.end();

        System.out.println(new String(unzippeddata));

    }


    private void doTestSingleDOIBoostActionToXML(final String filePath) throws Exception {
        final List<Row> rows = Lists.newArrayList();
        final InputStream is = this.getClass().getResourceAsStream(filePath);
        final BufferedReader in = new BufferedReader(new InputStreamReader(is));

        String line = in.readLine();

        final JsonParser parser = new JsonParser();
        JsonObject root = parser.parse(line).getAsJsonObject();
        List<AtomicAction> actions = DOIBoostToActions.generatePublicationActionsFromDump(root, new ActionFactory(), setName, agent, false, false,reporter);

        if (actions!= null) {
            actions.forEach(action-> {
                if (action.getTargetColumn().equals("body") && action.getTargetColumnFamily().equals("result"))
                {
                    Column<String, byte[]> col = new Column<>("body" , action.getTargetValue());
                    rows.add(new Row("result",action.getTargetRowKey() , Lists.newArrayList(col)));
                }

            });



        }



    }

    private void doTestSingleDOIBoostAction(final String filePath) throws IOException {
        final InputStream is = this.getClass().getResourceAsStream(filePath);
        final BufferedReader in = new BufferedReader(new InputStreamReader(is));

        String line = in.readLine();

        final JsonParser parser = new JsonParser();
        JsonObject root = parser.parse(line).getAsJsonObject();
        List<AtomicAction> actions = DOIBoostToActions.generatePublicationActionsFromDump(root, new ActionFactory(), setName, agent, false, false, reporter);
        if (actions!= null) {
            actions.forEach(it -> {
                try {
                    System.out.println(
                            String.format(" RowKey:%s TargetColumnFamily:%s   TargetColumn: %s\n value:\n%s", it.getTargetRowKey(), it.getTargetColumnFamily(),
                                    it.getTargetColumn(),
                                    Oaf.parseFrom(it.getTargetValue())));
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }
            });
        }
    }

    private void doTestAllDOIBoostAction(final String filePath) throws IOException {
        final InputStream is = this.getClass().getResourceAsStream(filePath);
        final BufferedReader in = new BufferedReader(new InputStreamReader(is));

        String line = in.readLine();
        int i = 0;
        int cnt = 0;
        while(StringUtils.isNotBlank(line)) {
            cnt ++;

            final JsonParser parser = new JsonParser();
            JsonObject root = parser.parse(line).getAsJsonObject();
            try {
                List<AtomicAction> atomicActions = DOIBoostToActions.generatePublicationActionsFromDump(root, new ActionFactory(), setName, agent, false, false, reporter);
                if (atomicActions!= null)
                {
                    i ++;
                }
//                    atomicActions.forEach(it -> System.out.println(String.format(" RowKey:%s TargetColumnFamily:%s   TargetColumn: %s", it.getTargetRowKey(), it.getTargetColumnFamily(), it.getTargetColumn())));
                else{
                    System.out.println("SKIPPED Type "+ root.get("type").getAsString());
                }

            } catch (Throwable e) {
                System.out.println(line);
                throw new RuntimeException(e);
            }
            line= in.readLine();
        }

        System.out.println("total "+i+" / "+cnt);
    }



}