package eu.dnetlib.espas.data.harvest;

import java.io.IOException;
import java.io.StringReader;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;

import org.apache.log4j.Logger;
import org.w3c.dom.Document;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;

import eu.dnetlib.espas.data.harvest.csw.CSWGetRecordsRequest;
import eu.dnetlib.espas.data.harvest.csw.common.AssertionFailureException;
import eu.dnetlib.espas.data.harvest.csw.common.CSWConstants;
import eu.dnetlib.espas.data.harvest.csw.common.CSWOutputFormatEnum;
import eu.dnetlib.espas.data.harvest.csw.common.CSWOutputSchemaStrs;
import eu.dnetlib.espas.data.harvest.csw.common.CSWResultSetTypeEnum;

public class TestCSWTransImplHarvesterWithAssertions
{
   private static Logger logger = Logger.getLogger(TestCSWTransImplHarvesterWithAssertions.class);
   protected static BlockingQueue<String> expectedRecordsBlockingQueue;

   public static void run()
   {
      // Initialization phase.
      expectedRecordsBlockingQueue = LinkedBlockingQueueGenerator.generateLinkedBlockingQueue();

      // TODO : Unit test phase
      // Move the following two test cases to different classes,
      // each test case should be included in a separate class
      // with its own main function,
      // so that they may be run in isolation.
      testMultipleHarvestersUnderThreadJoin();
      testMultipleHarvestersUnderExecutorService();
      TestHarvesterstingProgramCSWRecords.testHarvesterstingProgramCSWRecordsMaxRecords3();
      TestHarvesterstingProgramCSWRecords.testHarvesterstingProgramCSWRecordsMaxRecords4();
      TestHarvesterstingProgramCSWRecords.testHarvesterstingProgramCSWRecordsMaxRecords10();
      TestHarvesterstingAcquisitionCSWRecords.testHarvesterstingAcquisitionCSWRecordsMaxRecords100();
      TestHarvesterstingAcquisitionCSWRecords.testHarvesterstingAcquisitionCSWRecordsMaxRecords200();
      TestHarvesterstingAcquisitionCSWRecords.testHarvesterstingAcquisitionCSWRecordsMaxRecords500();
      TestHarvesterstingAcquisitionCSWRecords.testHarvesterstingAcquisitionCSWRecordsMaxRecords1000();

      // Termination phase.
      expectedRecordsBlockingQueue.clear();
      expectedRecordsBlockingQueue = null;
   }

   protected static void testMultipleHarvestersUnderThreadJoin()
   {
      final int numOfHarvesters = 1000;
      String baseURL = "http://localhost/xml-stax/csw.php";
      RecordsHarvester[] harvesters = new RecordsHarvester[numOfHarvesters];
      RecordsHarvesterConsumer[] consumers = new RecordsHarvesterConsumer[numOfHarvesters];
      Thread[] threads = new Thread[numOfHarvesters];

      for(int i = 0; i < harvesters.length; i++)
      {
    	 LinkedList<CSWGetRecordsRequest> cswGetRecordsRequests = new LinkedList<CSWGetRecordsRequest>();
         CSWGetRecordsRequest cswGetRecordsRequest = new CSWGetRecordsRequest();
         cswGetRecordsRequest.setBaseURL(baseURL);
         cswGetRecordsRequest.setService(CSWConstants.CSW_Service_NAME_STR);
         cswGetRecordsRequest.setVersion(CSWConstants.VERSION_202);
         cswGetRecordsRequest.setOutputFormat(CSWOutputFormatEnum.CSW_OUTPUT_FORMAT_APPLICATION_XML);
         cswGetRecordsRequest.setOutputSchema(CSWOutputSchemaStrs.CSW_OUTPUT_SCHEMA_CORE_DEFAULT);
         cswGetRecordsRequest.setResultType(CSWResultSetTypeEnum.RESULTS);
         cswGetRecordsRequest.setType("acquisition");
         cswGetRecordsRequest.setStartPosition(1);
         cswGetRecordsRequest.setMaxRecords(10);
         cswGetRecordsRequests.addLast(cswGetRecordsRequest);
         harvesters[i] = new RecordsHarvesterTransImpl(cswGetRecordsRequests);
      }

      CSWHarvester cswHarvester = new CSWHarvester();
      for(int i = 0; i < harvesters.length; i++)
      {  // Add the harvesters in the thread pool and execute them.
         cswHarvester.execute(harvesters[i]);
      }

      for(int i = 0; i < harvesters.length; i++)
      {  // Add the consumers in the thread pool and execute them.
         consumers[i] = new RecordsHarvesterConsumer(harvesters[i]);
         threads[i] = new Thread(consumers[i]);
         threads[i].start();
      }

      try
      {
         for(int i = 0; i < harvesters.length; i++)
         {
            threads[i].join();
         }
      }
      catch(InterruptedException e)
      {
         logger.error("Interrupted exception", e);
      }

      for(int i = 0; i < harvesters.length; i++)
      {
         BlockingQueue<String> actualRecordsBlockingQueue = consumers[i].getActualRecordsBlockingQueue();
         try
         {
            DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
            DocumentBuilder expectedDocumentBuilder = documentBuilderFactory.newDocumentBuilder();
            DocumentBuilder actualDocumentBuilder = documentBuilderFactory.newDocumentBuilder();
            Document expectedDocument = null;
            Document actualDocument = null;
            Iterator<String> expectedIt = expectedRecordsBlockingQueue.iterator();
            Iterator<String> actualIt = actualRecordsBlockingQueue.iterator();
            String expectedStr = null;
            String actualStr = null;
            while(actualIt.hasNext())
            {
               if(!expectedIt.hasNext())
               {
                  throw new AssertionFailureException("Expected records queue has no more elements");
               }
               // System.out.println(expectedStr);
               expectedStr = expectedIt.next();
               // System.out.println(actualStr + "\n");
               actualStr = actualIt.next();

               expectedDocument = expectedDocumentBuilder.parse(new InputSource(new StringReader(expectedStr)));
               actualDocument = actualDocumentBuilder.parse(new InputSource(new StringReader(actualStr)));

               // If the record elements are populated with corresponding identifiers,
               // check for their equality.
               DomUtilityForAssertions.testRecordIdentifierEquality(expectedDocument, actualDocument);

               // If the record elements are populated with corresponding types,
               // check for their equality.
               DomUtilityForAssertions.testRecordTypeEquality(expectedDocument, actualDocument);

               // If the record elements are populated with corresponding subjects,
               // check for their equality.
               DomUtilityForAssertions.testRecordSubjectEquality(expectedDocument, actualDocument);

               // If the record elements are populated with corresponding spatials,
               // check for their equality.
               DomUtilityForAssertions.testRecordSpatialEquality(expectedDocument, actualDocument);

               // If the record elements are populated with corresponding abstracts,
               // check for their equality.
               DomUtilityForAssertions.testRecordAbstractEquality(expectedDocument, actualDocument);

               // If the record elements are populated with corresponding dates,
               // check for their equality.
               DomUtilityForAssertions.testRecordDateEquality(expectedDocument, actualDocument);

               // If the record elements are populated with corresponding titles,
               // check for their equality.
               DomUtilityForAssertions.testRecordTitleEquality(expectedDocument, actualDocument);

               // If the record elements are populated with corresponding formats,
               // check for their equality.
               DomUtilityForAssertions.testRecordFormatEquality(expectedDocument, actualDocument);
            }
         }
         catch(IOException e)
         {
            logger.error("IO exception.", e);
            throw new AssertionFailureException(e.toString());
         }
         catch(ParserConfigurationException e)
         {
            logger.error("Parse configuration exception.", e);
            throw new AssertionFailureException(e.toString());
         }
         catch(SAXException e)
         {
            logger.error("SAX exception.", e);
            throw new AssertionFailureException(e.toString());
         }

         assert expectedRecordsBlockingQueue.size() == actualRecordsBlockingQueue.size() : new AssertionFailureException("Blocking queues with different sizes.");

         // Test Number of records matched.
         int expected = 12;
         int actual = Integer.parseInt(consumers[i].getRecordsHarvester().getCSWGetRecordsRequest().getFirst().getSearchResults().get(Record.SR_ATT_NUM_OF_RECORDS_MATCHED));
         assert expected == actual : new AssertionFailureException("Wrogng number of matched records");

         // Test Number of records returned.
         expected = 2;
         actual = Integer.parseInt(consumers[i].getRecordsHarvester().getCSWGetRecordsRequest().getFirst().getSearchResults().get(Record.SR_ATT_NUM_OF_RECORDS_RETUREND));
         assert expected == actual : new AssertionFailureException("Wrogng number of returned records");

         // Test Number of next record.
         expected = 0;
         actual = Integer.parseInt(consumers[i].getRecordsHarvester().getCSWGetRecordsRequest().getFirst().getSearchResults().get(Record.SR_ATT_NUM_OF_NEXT_RECORD));
         assert expected == actual : new AssertionFailureException("Wrogng number of next record");
      }
      cswHarvester.shutdownAndWaitTermination();
      System.out.printf("%-80s : %5s\n", "testMultipleHarvestersUnderThreadJoin", "passed");
   }

   protected static void testMultipleHarvestersUnderExecutorService()
   {
      final int numOfHarvesters = 1000;
      String baseURL = "http://localhost/xml-stax/csw.php";
      RecordsHarvester[] harvesters = new RecordsHarvester[numOfHarvesters];
      RecordsHarvesterConsumer[] consumers = new RecordsHarvesterConsumer[numOfHarvesters];

      for(int i = 0; i < harvesters.length; i++)
      {
    	 LinkedList<CSWGetRecordsRequest> cswGetRecordsRequests = new LinkedList<CSWGetRecordsRequest>();
         CSWGetRecordsRequest cswGetRecordsRequest = new CSWGetRecordsRequest();
         cswGetRecordsRequest.setBaseURL(baseURL);
         cswGetRecordsRequest.setService(CSWConstants.CSW_Service_NAME_STR);
         cswGetRecordsRequest.setVersion(CSWConstants.VERSION_202);
         cswGetRecordsRequest.setOutputFormat(CSWOutputFormatEnum.CSW_OUTPUT_FORMAT_APPLICATION_XML);
         cswGetRecordsRequest.setOutputSchema(CSWOutputSchemaStrs.CSW_OUTPUT_SCHEMA_CORE_DEFAULT);
         cswGetRecordsRequest.setResultType(CSWResultSetTypeEnum.RESULTS);
         cswGetRecordsRequest.setType("acquisition");
         cswGetRecordsRequest.setStartPosition(1);
         cswGetRecordsRequest.setMaxRecords(10);
         cswGetRecordsRequests.addLast(cswGetRecordsRequest);
         harvesters[i] = new RecordsHarvesterTransImpl(cswGetRecordsRequests);
      }

      CSWHarvester cswHarvester = new CSWHarvester();
      for(int i = 0; i < harvesters.length; i++)
      {  // Add the harvesters in the thread pool and execute them.
         cswHarvester.execute(harvesters[i]);
      }

      for(int i = 0; i < consumers.length; i++)
      {  // Add the consumers in the thread pool and execute them.
         consumers[i] = new RecordsHarvesterConsumer(harvesters[i]);
         cswHarvester.getExecutorService().execute(consumers[i]);
      }

      cswHarvester.shutdown();

      try
      {
         while(!cswHarvester.getExecutorService().isTerminated())
         {
            /*
            System.out.println("Executor service has not been terminated yet");
            for(int i = 0; i < harvesters.length; i++)
            {
               if(!harvesters[i].isHarvestingCompleted())
               {
                  System.out.println("Harvester: " + i + " has not completed yet");
               }
            }
            for(int i = 0; i < consumers.length; i++)
            {
               if(!consumers[i].isConsumingCompleted())
               {
                  System.out.println("Consumer: " + i + " has not completed yet");
               }
            }
            */
            Thread.sleep(100);
         }
      }
      catch(InterruptedException e)
      {
         logger.error("Interrupted exception", e);
      }

      for(int i = 0; i < harvesters.length; i++)
      {
         BlockingQueue<String> actualRecordsBlockingQueue = consumers[i].getActualRecordsBlockingQueue();
         try
         {
            DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
            DocumentBuilder expectedDocumentBuilder = documentBuilderFactory.newDocumentBuilder();
            DocumentBuilder actualDocumentBuilder = documentBuilderFactory.newDocumentBuilder();
            Document expectedDocument = null;
            Document actualDocument = null;
            Iterator<String> expectedIt = expectedRecordsBlockingQueue.iterator();
            Iterator<String> actualIt = actualRecordsBlockingQueue.iterator();
            String expectedStr = null;
            String actualStr = null;
            while(actualIt.hasNext())
            {
               if(!expectedIt.hasNext())
               {
                  throw new AssertionFailureException("Expected records queue has no more elements");
               }
               // System.out.println(expectedStr);
               expectedStr = expectedIt.next();
               // System.out.println(actualStr + "\n");
               actualStr = actualIt.next();

               expectedDocument = expectedDocumentBuilder.parse(new InputSource(new StringReader(expectedStr)));
               actualDocument = actualDocumentBuilder.parse(new InputSource(new StringReader(actualStr)));

               // If the record elements are populated with corresponding identifiers,
               // check for their equality.
               DomUtilityForAssertions.testRecordIdentifierEquality(expectedDocument, actualDocument);

               // If the record elements are populated with corresponding types,
               // check for their equality.
               DomUtilityForAssertions.testRecordTypeEquality(expectedDocument, actualDocument);

               // If the record elements are populated with corresponding subjects,
               // check for their equality.
               DomUtilityForAssertions.testRecordSubjectEquality(expectedDocument, actualDocument);

               // If the record elements are populated with corresponding spatials,
               // check for their equality.
               DomUtilityForAssertions.testRecordSpatialEquality(expectedDocument, actualDocument);

               // If the record elements are populated with corresponding abstracts,
               // check for their equality.
               DomUtilityForAssertions.testRecordAbstractEquality(expectedDocument, actualDocument);

               // If the record elements are populated with corresponding dates,
               // check for their equality.
               DomUtilityForAssertions.testRecordDateEquality(expectedDocument, actualDocument);

               // If the record elements are populated with corresponding titles,
               // check for their equality.
               DomUtilityForAssertions.testRecordTitleEquality(expectedDocument, actualDocument);

               // If the record elements are populated with corresponding formats,
               // check for their equality.
               DomUtilityForAssertions.testRecordFormatEquality(expectedDocument, actualDocument);
            }
         }
         catch(IOException e)
         {
            logger.error("IO exception.", e);
            throw new AssertionFailureException(e.toString());
         }
         catch(ParserConfigurationException e)
         {
            logger.error("Parse configuration exception.", e);
            throw new AssertionFailureException(e.toString());
         }
         catch(SAXException e)
         {
            logger.error("SAX exception.", e);
            throw new AssertionFailureException(e.toString());
         }

         assert expectedRecordsBlockingQueue.size() == actualRecordsBlockingQueue.size() : new AssertionFailureException("Blocking queues with different sizes.");

         // Test Number of records matched.
         int expected = 12;
         int actual = Integer.parseInt(consumers[i].getRecordsHarvester().getCSWGetRecordsRequest().getFirst().getSearchResults().get(Record.SR_ATT_NUM_OF_RECORDS_MATCHED));
         assert expected == actual : new AssertionFailureException("Wrogng number of matched records");

         // Test Number of records returned.
         expected = 2;
         actual = Integer.parseInt(consumers[i].getRecordsHarvester().getCSWGetRecordsRequest().getFirst().getSearchResults().get(Record.SR_ATT_NUM_OF_RECORDS_RETUREND));
         assert expected == actual : new AssertionFailureException("Wrogng number of returned records");

         // Test Number of next record.
         expected = 0;
         actual = Integer.parseInt(consumers[i].getRecordsHarvester().getCSWGetRecordsRequest().getFirst().getSearchResults().get(Record.SR_ATT_NUM_OF_NEXT_RECORD));
         assert expected == actual : new AssertionFailureException("Wrong number of next record");
      }
      System.out.printf("%-80s : %5s\n", "testMultipleHarvestersUnderExecutorService", "passed");
   }
}
