/**
 * Copyright 2008-2009 DRIVER PROJECT (ICM UW)
 * Original author: Marek Horst
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package eu.dnetlib.common.ws.harv;

import java.io.StringReader;
import java.security.InvalidParameterException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import javax.xml.transform.stream.StreamSource;
import javax.xml.ws.wsaddressing.W3CEndpointReference;

import org.apache.log4j.Logger;

import eu.dnetlib.resultset.impl.builder.IResultSetBuilder;
import eu.dnetlib.common.ws.harv.parser.IDriverContentParser;
import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
import eu.dnetlib.enabling.resultset.rmi.ResultSetService;
import eu.dnetlib.enabling.tools.JaxwsServiceResolverImpl;
import eu.dnetlib.enabling.tools.ServiceResolver;

/**
 * Abstract generic ResultSet iterator.
 * Should be used as a base for all ResultSet based iterators.
 * @author mhorst
 *
 */
public abstract class GenericResultSetIterator<FeedingDTO> implements Iterator<FeedingDTO> {
	
	protected static final Logger log = Logger.getLogger(GenericResultSetIterator.class);
	
	public static final String RESULT_SET_REQUEST_MODE_WAITING = "waiting";
	
	public static final String RESULT_SET_REQUEST_MODE_NON_WAITING = "non-waiting";
	
	/**
	 * Queue for storing harvested data.
	 */
	BlockingQueue<FeedingDTO> queue;
		
	/**
	 * Harvester thread.
	 */
	Harvester worker;
	
	/**
	 * Total RS size.
	 */
	int totalRSSize = 0;

	
	String rsStatus;	
	
	/**
	 * Maximum timeout (in seconds) for retrieving result from queue.
	 */
	long maxTimeout;
	
	/**
	 * Default constructor.
	 * Apparently resultSetBuilder is not used at the moment. 
	 * {@link JaxwsServiceResolverImpl} is used as resolver.
	 * This constructor should be dropped in near future.
	 * 
	 * @param resultSetEPR RS end point reference
	 * @param resultSetBuilder result set builder module
	 * @param contentParser content parser module
	 * @param queueSize size of the queue
	 * @param packageSize size of the single RS call result
	 * @param maxTimeout maximum timeout for receiving response from ResultSet
	 * @param rsClosingTimeout result set closing timeout
	 * 
	 */
	@Deprecated
	public GenericResultSetIterator(String resultSetEPR, IResultSetBuilder resultSetBuilder,
			IDriverContentParser<FeedingDTO> contentParser, 
			int queueSize, int packageSize, long maxTimeout, long rsClosingTimeout) {
		this(resultSetEPR, new JaxwsServiceResolverImpl(), 
				contentParser, queueSize, packageSize, maxTimeout, rsClosingTimeout);
	}
	
	/**
	 * Constructor taking ServiceResolver.
	 * 
	 * @param resultSetEPR RS end point reference
	 * @param serviceResolver result set service resolver module
	 * @param contentParser content parser module
	 * @param queueSize size of the queue
	 * @param packageSize size of the single RS call result
	 * @param maxTimeout maximum timeout for receiving response from ResultSet
	 * @param rsClosingTimeout result set closing timeout
	 * 
	 */
	public GenericResultSetIterator(String resultSetEPR, 
			ServiceResolver serviceResolver,
			IDriverContentParser<FeedingDTO> contentParser, 
			int queueSize, int packageSize, long maxTimeout, long rsClosingTimeout) {
		this.maxTimeout = maxTimeout;
		
		if (resultSetEPR == null )
			throw new InvalidParameterException(
					"The resultSetEPR content can not be null! ");
		
		 W3CEndpointReference epr = (W3CEndpointReference) 
		 	W3CEndpointReference.readFrom
		 		(new StreamSource(new StringReader(resultSetEPR)));

        final ResultSetService resultSetService = 
        	serviceResolver.getService(ResultSetService.class, epr);
        final String rsId = serviceResolver.getResourceIdentifier(epr);
		
        if (rsId == null )
			throw new InvalidParameterException(
					"The resultset identifier not extracted from EPR " +
					resultSetEPR.toString());
       
        queue = new LinkedBlockingQueue<FeedingDTO>(queueSize);
		worker = new Harvester(resultSetService, 
				rsId, queue, packageSize, contentParser, rsClosingTimeout);
		totalRSSize = worker.getTotalRSSize();
		rsStatus    = worker.getRSStatus();
		worker.start();
	}
	
	/*
	 * (non-Javadoc)
	 * 
	 * @see java.util.Iterator#hasNext()
	 */
	public boolean hasNext() {
		
		if (worker.getInterrupt()) {
			stopWorker();
			return false;
		}
		else if (!queue.isEmpty()) {
			return true;
		}
		else if (worker.getProceed()) {
			return true;
		}	
		else {
			return false;
		}
	}

	/* (non-Javadoc)
	 * @see java.util.Iterator#next()
	 */
	public FeedingDTO next() {
		try {
			FeedingDTO result = queue.poll(maxTimeout, TimeUnit.SECONDS);
			if (result==null) {
				String errorContent = "Queue maximum timeout of " + maxTimeout
					+ " seconds passed!";
				log.error(errorContent);
				throw new DataRetrievingTimeoutException(errorContent);
			} else
				return result;
		} catch (InterruptedException e) {
			throw new RuntimeException("Problem occured when retrieving data from queue!"
					,e);
		}
	}

	/* (non-Javadoc)
	 * @see java.util.Iterator#remove()
	 */
	public void remove() {
		throw new UnsupportedOperationException("This operation is unsupported on ResultSet data!");
	}

	/**
	 * Stops worker thread.
	 */
	public void stopWorker() {
		if (worker!=null)
			worker.setInterrupt(true);
//		for unblocking worker if stuck on queue.put(fields);
		queue.clear();
	}

	/**
	 * Creates object markup which would suggest this object 
	 * should be ignored at the feeding phase.
	 * @return FeedingDTO which should be ignored
	 */
	protected abstract FeedingDTO buildIgnoredObjectMarkup();
	
	/**
	 * Thread harvesting xml data from ResultSet and processing
	 * content into FeedingDTO object.
	 * 
	 * @author mhorst
	 *
	 */
	class Harvester extends Thread {

		protected final Logger log = Logger.getLogger(Harvester.class);

		/**
		 * XML content parser.
		 */
		IDriverContentParser<FeedingDTO> contentParser;

		/**
		 * ResultSet service.
		 */
		ResultSetService resultSetService;

		/**
		 * ResultSet id.
		 */
		String rsId;

		/**
		 * Queue shared with iterator.
		 */
		BlockingQueue<FeedingDTO> queue;

		/**
		 * Getting results from RS range.
		 */
		int harvestPackageSize;

		/**
		 * Total ResultSet size.
		 */
		int totalRSSize = 0;
		
		/** The rs status. */
		private String rsStatus = "open";

		/** The interrupt. */
		private Boolean interrupt = false;
		
		/** The rs closing timeout. */
		private long rsClosingTimeout;
		
		/** The proceed reading RS process. */
		boolean proceed = true;

		/**
		 * Default harvester constructor.
		 * @param resultSetService resultset service port type
		 * @param rsId resultset identifier
		 * @param queue queue buffer
		 * @param harvestPackageSize single package size
		 * @param contentParser content parser module
		 * @param rsClosingTimeout
		 */
		public Harvester(ResultSetService resultSetService, String rsId,
				BlockingQueue<FeedingDTO> queue, int harvestPackageSize, 
				IDriverContentParser<FeedingDTO> contentParser, long rsClosingTimeout) {
			this.contentParser = contentParser;
			this.resultSetService = resultSetService;
			this.rsId = rsId;
			this.queue = queue;
			this.harvestPackageSize = harvestPackageSize;
			this.rsClosingTimeout = rsClosingTimeout;

			try {
				log.info("[resultset] Records number in ResultSet: " + 
						resultSetService.getNumberOfElements(rsId));
				this.setRSStatus(this.resultSetService.getRSStatus(rsId));
				log.debug("[resultset] Initial ResultSet Status: " + this.getRSStatus());
				//setTotalRSSize(resultSetService.getNumberOfElements(rsId));
			} catch (ResultSetException e) {
				throw new RuntimeException("Exception occured when cheching total RS size.",e);
			}
		}

		public void run() {
			try {
				
				int startElement 	= 1;
				int endElement		= 0;
				int currentRSSize	= 0;
				long startTime		= 0;
				
				while (proceed) {
					currentRSSize = resultSetService.getNumberOfElements(rsId);
					log.debug("[resultset] Current size:" + currentRSSize);
					log.debug("[resultset] Total size:" + totalRSSize);
					if (currentRSSize != 0 && currentRSSize != totalRSSize) {
					
						endElement = startElement + harvestPackageSize - 1;
					
						if (endElement > currentRSSize)
							endElement = currentRSSize;
					
					
						log.debug("[resultset] getting results for range: "+
							startElement + " - " + endElement);
						List<String> results = resultSetService.getResult(
								rsId, startElement, endElement,
								GenericResultSetIterator.RESULT_SET_REQUEST_MODE_WAITING);
						
						if (results != null && results.size() != 0) {
							log.debug("[resultset] Got " +results.size() + " records, processing");
							prepareAndAddResultsToQueue(results);
						
							totalRSSize = totalRSSize + results.size();
						
							int resDiff = endElement - startElement + 1;
							if (resDiff != results.size())
								startElement = totalRSSize + 1;
							else
								startElement = endElement + 1;
						}
					}
					
					this.setRSStatus(this.resultSetService.getRSStatus(rsId));
					//log.debug("STATUS: " +  this.resultSetService.getRSStatus(rsId) );
					if ("closed".equals(this.getRSStatus())){
						log.debug("[resultset] ResultSet is closed.");
						int rsSize = resultSetService.getNumberOfElements(rsId);
						
						if (rsSize != totalRSSize ) { 
							int diff = rsSize - totalRSSize;
							log.debug("[resultset] still need to get "+ diff +" records " +
									" from ResultSet.");
						} else {
							proceed = false;
							queue.put(buildIgnoredObjectMarkup());//closing queue with empty 
													  //record that should be ignored by the client
						}	
					} else {
						
						if (startTime == 0) {
							startTime = System.currentTimeMillis()/1000;
						}
						
						long currentTime = System.currentTimeMillis()/1000;
						long timeDiff = currentTime-startTime;
							
						if ( timeDiff > this.rsClosingTimeout) {
							log.warn("[resultset] Closing ResultSet Timeout " +
									"after "+timeDiff+"[sec], can not wait any more " +
									"for closing ResultSet"); 
							proceed = false;
							queue.put(buildIgnoredObjectMarkup());//closing queue with empty 
													  //record that should be ignored by the client
						} else {
							Harvester.sleep(2000);
						}
							
					}
				}
				log.info("Finished work after processing "+ totalRSSize + " results");

			} catch (ResultSetException e1) {
				log.error("Exception occured when retrieving data from ResultSet",e1);
			} catch (InterruptedException e2) {
				log.error("Exception occured when storing data to the queue", e2);
			}
		}
		
		/**
		 * Parses results and add them to queue.
		 * @param results
		 * @throws InterruptedException
		 */
		void prepareAndAddResultsToQueue(List<String> results) throws InterruptedException {
			if (results==null)
				return;
			Iterator<String> itRes = results.iterator();
			int counter = 0;
			while (itRes.hasNext()) {
				if (getInterrupt()) {
					log.debug("got interrupt call after insterting "+
							counter+" elements from current package");
					return;
				}
				String currentResult = itRes.next();
				FeedingDTO fields = contentParser.parse(currentResult);
				if (fields!=null) {
					queue.put(fields);
				}
			}
		}

		/**
		 * Returns total ResultSet size.
		 * @return total ResultSet size
		 */
		public synchronized int getTotalRSSize() {
			return totalRSSize;
		}

		public synchronized void setTotalRSSize(int totalRSSize) {
			this.totalRSSize = totalRSSize;
		}
		
		/**
		 * Returns ResultSet status.
		 * 
		 * @return ResultSet status
		 */
		public synchronized String getRSStatus() {
			return this.rsStatus;
		}

		/**
		 * Sets the rs status.
		 * 
		 * @param rsStatus the rs status
		 */
		public synchronized void setRSStatus(String rsStatus) {
			this.rsStatus = rsStatus;
		}

		/**
		 * Returns true if thread should be stopped.
		 * @return true if thread should be stopped.
		 */
		public synchronized Boolean getInterrupt() {
			return interrupt;
		}

		/**
		 * Sets interrupt flag.
		 * @param interrupt
		 */
		public synchronized void setInterrupt(Boolean interrupt) {
			this.interrupt = interrupt;
		}
		
		/**
		 * Gets the proceed.
		 * 
		 * @return the proceed
		 */
		public synchronized Boolean getProceed() {
			return this.proceed;
		}

		/**
		 * Sets the proceed.
		 * 
		 * @param proceed the new proceed
		 */
		public synchronized void setProceed(Boolean proceed) {
			this.proceed = proceed;
		}

	}
}
