/**
 * Copyright 2009-2012 OpenAIRE PROJECT (Bielefeld University)
 * Original author: Marek Imialek <marek.imialek at uni-bielefeld.de>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package eu.dnetlib.data.udm.rs;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
import eu.dnetlib.enabling.resultset.rmi.ResultSetService;

/**
 * ResultSet iterator. Retrieves Metadata data from ResultSet
 * 
 * @author <a href="mailto:marek.imialek at uni-bielefeld.de">Marek Imialek</a>
 * @version 0.0.1
 */

public class ResultSetIterator implements Iterator<List<String>> {

	/** The Constant log. */
	protected static final Logger log = Logger
		.getLogger(ResultSetIterator.class);
	
	/** The Constant RESULT_SET_REQUEST_MODE_WAITING. */
	public static final String RESULT_SET_REQUEST_MODE_WAITING = "waiting";

	/** The Constant RESULT_SET_REQUEST_MODE_NON_WAITING. */
	public static final String RESULT_SET_REQUEST_MODE_NON_WAITING = "non-waiting";

	/** Queue for storing harvested data. */
	BlockingQueue<List<String>> queue;

	/** Harvester thread. */
	Harvester worker;

	/** Counter for debug purposes. */
	int counter = 0;
	
	/** Maximum timeout (in seconds) for retrieving result from queue. */
	long maxTimeout;

	/**
	 * Default constructor.
	 * 
	 * @param resultSetEPR RS end point reference
	 * @param resultSetService2 
	 * @param queueSize size of the queue
	 * @param packageSize size of the single RS call result
	 * @param maxTimeout maximum timeout for receiving response from ResultSet
	 * @param statisticFieldsList the statistic fields list
	 */
	public ResultSetIterator(String rsId, ResultSetService resultSetService, 
			int queueSize, int packageSize, long maxTimeout,
			long rsClosingTimeout) {

		this.maxTimeout = maxTimeout;
		
		queue = new LinkedBlockingQueue<List<String>>(queueSize);
		worker = new Harvester(resultSetService, rsId,
				queue, packageSize, rsClosingTimeout);
		worker.start();
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see java.util.Iterator#hasNext()
	 */
	public boolean hasNext() {
		if (worker.getInterrupt()) {
			stopWorker();
			return false;
		}
		else if (!queue.isEmpty()) {
			return true;
		}
		else if (worker.getProceed()) {
			return true;
		}	
		else {
			return false;
		}
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see java.util.Iterator#next()
	 */
	public List<String> next() {
		try {
			counter++;
			List<String> result = queue.poll(maxTimeout, TimeUnit.SECONDS);
			if (result == null) {
				String errorContent = "Problem occured when retrieving "
					+ counter + " element of " + worker.getTotalRSSize() + "! "
					+ "Maximum timeout of " + maxTimeout
					+ " seconds passed!";
				log.error(errorContent);
				throw new InterruptedException(errorContent);
			} else
				return result;
		} catch (InterruptedException e) {
			throw new RuntimeException(
					"Problem occured when retrieving data from queue!", e);
		}
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see java.util.Iterator#remove()
	 */
	public void remove() {
		throw new UnsupportedOperationException(
		"This operation is unsupported on ResultSet data!");
	}

	/**
	 * Stops worker thread.
	 */
	public void stopWorker() {
		if (worker != null)
			worker.setInterrupt(true);
		// for unblocking worker if stuck on queue.put(fields);
		queue.clear();
	}

	/**
	 * The Class Harvester.
	 */
	class Harvester extends Thread {

		/** The log. */
		protected final Logger log = Logger.getLogger(Harvester.class);

		/** ResultSet service. */
		ResultSetService resultSetService;

		/** ResultSet id. */
		String rsId;

		/** Queue shared with iterator. */
		BlockingQueue<List<String>> queue;

		/** Getting results from RS range. */
		int harvestPackageSize;

		/** Total ResultSet size. */
		int totalRSSize;

		/** The interrupt. */
		private Boolean interrupt = false;

		/** The rs status. */
		private String rsStatus = "open";

		/** The rs closing timeout. */
		private long rsClosingTimeout;
		
		/** The proceed reading RS process. */
		boolean proceed = true;

		
		/**
		 * Default harvester constructor.
		 * 
		 * @param resultSetService resultset service port type
		 * @param rsId resultset identifier
		 * @param queue queue buffer
		 * @param harvestPackageSize single package size
		 */
		public Harvester(ResultSetService resultSetService, String rsId,
				BlockingQueue<List<String>> queue, int harvestPackageSize,
				long rsClosingTimeout) {
			this.resultSetService = resultSetService;
			this.rsId = rsId;
			this.queue = queue;
			this.harvestPackageSize = harvestPackageSize;
			this.rsClosingTimeout = rsClosingTimeout;
			
			try {
				log.debug("[harv] Records number in ResultSet: " + resultSetService.getNumberOfElements(rsId));
			} catch (ResultSetException e) {
				throw new RuntimeException(
						"Exception occured when cheching total RS size.", e);
			}
		}

		/* (non-Javadoc)
		 * @see java.lang.Thread#run()
		 */
		public void run() {
			try {
				
				int startElement 	= 1;
				int endElement		= 0;
				int currentRSSize	= 0;
				long startTime		= 0;
				
				while (proceed) {
					currentRSSize = resultSetService.getNumberOfElements(rsId);
					log.debug("current size:" + currentRSSize);
					log.debug("total size:" + totalRSSize);
					
					//if (totalRSSize>5000)//DELME
					//	proceed =false;//DELME
					
					if (currentRSSize != 0 && currentRSSize != totalRSSize ) {
					
						endElement = startElement + harvestPackageSize - 1;
					
						if (endElement > currentRSSize)
							endElement = currentRSSize;
					
					
						log.debug("[harv] getting results for range: "+
							startElement + " - " + endElement);
						List<String> results = resultSetService.getResult(
								rsId, startElement, endElement,
								ResultSetIterator.RESULT_SET_REQUEST_MODE_NON_WAITING);
						
						if (results != null && results.size() != 0) {
							log.debug("PROCESSING: " +results.size());
							queue.put(results);
						
							totalRSSize = totalRSSize + results.size();
						
							int resDiff = endElement - startElement + 1;
							if (resDiff != results.size())
								startElement = totalRSSize + 1;
							else
								startElement = endElement + 1;
						}
					}
					
					this.setRSStatus(this.resultSetService.getRSStatus(rsId));
					log.debug("STATUS: " +  this.resultSetService.getRSStatus(rsId) );
					if ("closed".equals(this.getRSStatus())){
						log.debug("[harv] ResultSet is closed.");
						int rsSize = resultSetService.getNumberOfElements(rsId);
						
						if (rsSize != totalRSSize ) { 
							int diff = rsSize - totalRSSize;
							log.debug("[harv] still need to get "+ diff +" records " +
									" from ResultSet.");
						} else {
							proceed = false;
							queue.put(new ArrayList<String>(Arrays.asList("ignorethisobject")));//closing queue with empty 
													  //record that should be ignored by the client
						}	
					} else {
						
						if (startTime == 0) {
							startTime = System.currentTimeMillis()/1000;
						}
						
						long currentTime = System.currentTimeMillis()/1000;
						long timeDiff = currentTime-startTime;
							
						if ( timeDiff > this.rsClosingTimeout) {
							log.warn("[harv] : !!!! CLOSING ResultSet TIMEOUT " +
									"after "+timeDiff+"[sec], can not wait any more " +
									"for closing ResultSet"); 
							proceed = false;
							queue.put(new ArrayList<String>(Arrays.asList("ignorethisobject")));//closing queue with empty 
													  //record that should be ignored by the client
						} else {
							Harvester.sleep(3000);
						}
							
					}
				}
				log.debug("Finished work after processing "+ totalRSSize + " results");

			} catch (ResultSetException e1) {
				log.error("Exception occured when retrieving data from ResultSet",e1);
			} catch (InterruptedException e2) {
				log.error("Exception occured when storing data to the queue", e2);
			}
		}

		/**
		 * Returns total ResultSet size.
		 * 
		 * @return total ResultSet size
		 */
		public synchronized int getTotalRSSize() {
			return this.totalRSSize;
		}

		/**
		 * Sets the total rs size.
		 * 
		 * @param totalRSSize the new total rs size
		 */
		public synchronized void setTotalRSSize(int totalRSSize) {
			this.totalRSSize = totalRSSize;
		}

		/**
		 * Returns ResultSet status.
		 * 
		 * @return ResultSet status
		 */
		public synchronized String getRSStatus() {
			return this.rsStatus;
		}

		/**
		 * Sets the rs status.
		 * 
		 * @param rsStatus the rs status
		 */
		public synchronized void setRSStatus(String rsStatus) {
			this.rsStatus = rsStatus;
		}

		/**
		 * Returns true if thread should be stopped.
		 * 
		 * @return true if thread should be stopped.
		 */
		public synchronized Boolean getInterrupt() {
			return interrupt;
		}

		/**
		 * Sets interrupt flag.
		 * 
		 * @param interrupt the interrupt
		 */
		public synchronized void setInterrupt(Boolean interrupt) {
			this.interrupt = interrupt;
		}
		
		/**
		 * Gets the proceed.
		 * 
		 * @return the proceed
		 */
		public synchronized Boolean getProceed() {
			return this.proceed;
		}

		/**
		 * Sets the proceed.
		 * 
		 * @param proceed the new proceed
		 */
		public synchronized void setProceed(Boolean proceed) {
			this.proceed = proceed;
		}

	}
}