/**
 * Copyright 2009-2012 OpenAIRE PROJECT (Bielefeld University)
 * Original author: Marek Imialek <marek.imialek at uni-bielefeld.de>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package eu.dnetlib.data.udm;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import javax.xml.ws.wsaddressing.W3CEndpointReference;

import org.apache.log4j.Logger;

import eu.dnetlib.data.udm.is.ISInteractions;
import eu.dnetlib.data.udm.mdstore.MDStoreInteractions;
import eu.dnetlib.data.udm.mdstore.MDStoreProperties;
import eu.dnetlib.data.udm.rs.ResultSetIterator;
import eu.dnetlib.data.udm.rs.ResultSetLoader;
import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
import eu.dnetlib.enabling.resultset.rmi.ResultSetService;
import eu.dnetlib.enabling.tools.JaxwsServiceResolverImpl;

/**
 * The Class DeliverUsageData initializes delivering of records for each mdstore founded in
 * IS registry. Having ResultSet EPR, thread initializes ResultSet iterator
 * and start the ResultSet loader for each mdstore. Thread calls handelMDStoreRequestsQueue()
 * which decides if the new delivering process can be started now or maybe it have to wait 
 * sometime.
 * 
 * @author <a href="mailto:marek.imialek at uni-bielefeld.de">Marek Imialek</a>
 */
public class DeliverUsageData implements Runnable {

	/** The Constant LOGGER. */
	private final static Logger LOGGER = Logger.getLogger(DeliverUsageData.class);
	
	/** The repositories list. */
	private List<String> repositoriesList;
	
	/** The date from parameter. */
	private String dateFrom;
	
	/** The date until parameter. */
	private String dateUntil;
	
	/** The is interactions. */
	private ISInteractions isInteractions;

	/** The out resultset service. */
	private ResultSetService outResultSetService;

	/** The out resultset id. */
	private String outRsId;
	
	/** The threads list. */
	private List<Thread> threadsList;

	/** The delivery preferences. */
	private DeliveryPreferences deliveryPreferences;
	
	private boolean waiting = true;

	/**
	 * Instantiates a new deliver usage data.
	 * 
	 * @param dateFrom the date from
	 * @param dateUntil the date until
	 * @param isInteractions the is interactions
	 * @param outRSEpr the out rs epr
	 * @param deliveryPreferences the delivery preferences
	 * 
	 * @throws Exception the exception
	 */
	public DeliverUsageData (String dateFrom, String dateUntil, 
			ISInteractions isInteractions, W3CEndpointReference outRSEpr,
			DeliveryPreferences deliveryPreferences) throws Exception {
		
		JaxwsServiceResolverImpl outRsServiceResolver = new JaxwsServiceResolverImpl();	
		this.outResultSetService = outRsServiceResolver.getService(
				ResultSetService.class, outRSEpr);
		this.outRsId = outRsServiceResolver.getResourceIdentifier(outRSEpr);
		
		repositoriesList = isInteractions.getRepositories();
		if (repositoriesList == null)
			throw new Exception ("No one repository identifier found in IS registry");
		
		this.deliveryPreferences = deliveryPreferences;
		this.isInteractions = isInteractions;
		this.dateFrom = dateFrom;
		this.dateUntil = dateUntil;
		this.threadsList = new ArrayList<Thread>();
		
	}
	
	
	/* (non-Javadoc)
	 * @see java.lang.Runnable#run()
	 */
	@Override
	public void run() {
		
		Iterator<String> repositoriesIter = repositoriesList.iterator();
		LOGGER.debug("Number of MDStores to be iterated: " + repositoriesList.size());
		int repoCounter = 1;
		
		while (repositoriesIter.hasNext()){
			
			handelMDStoreRequestsQueue();
			
			LOGGER.debug("("+repoCounter+") Processing MDStore "+ 
					repoCounter + " from "+ repositoriesList.size());
			MDStoreProperties mdstoreProperties;
			try {
				mdstoreProperties = MDStoreInteractions.deliverMDStoreEPR(
						repositoriesIter.next(), dateFrom, dateUntil, null, isInteractions);
			} catch (Exception e) {
				LOGGER.error("("+repoCounter+")Ignoring MDStore, "+ e);
				continue;
			}
			
			JaxwsServiceResolverImpl inRsServiceResolver = new JaxwsServiceResolverImpl();	
			ResultSetService inResultSetService = inRsServiceResolver.getService(
					ResultSetService.class, mdstoreProperties.getMDStoreResultSetEpr());
			
			String inRsId = inRsServiceResolver.getResourceIdentifier(
					mdstoreProperties.getMDStoreResultSetEpr());
				
			try {	
				int recordsNumber = inResultSetService.getNumberOfElements(inRsId);
				LOGGER.debug("("+repoCounter+") Number of elements stored in RS:" + recordsNumber);
				
			} catch (ResultSetException e2) {
				String err = "("+repoCounter+") Ignoring MDStore, can not get" +
						" number of results from MDStore ResultSet :"+e2.getMessage();
				LOGGER.error(err);
				continue;
			}

			if (inRsId == null) {
				String errorContent = "("+repoCounter+") Ignoring MDStore, the resultset identifier not "
					+ "extracted from EPR " + mdstoreProperties.getMDStoreResultSetEpr().toString();
				LOGGER.error(errorContent);
				continue;
			}	
			
			ResultSetIterator inRsIterator = new ResultSetIterator(
					inRsId, inResultSetService, 
					this.deliveryPreferences.getRsIteratorQueueSize(), 
					this.deliveryPreferences.getRsPackageSize(), 
					this.deliveryPreferences.getMaxQueueTimeout(), 
					this.deliveryPreferences.getRsClosingTimeout());

			ResultSetLoader resultsetLoader = new ResultSetLoader(inRsIterator, 
					this.outResultSetService, outRsId,
					repoCounter, repositoriesList.size(), mdstoreProperties.getMdstoreIdentifier());
			
			Thread resultsetLoaderThread = new Thread(resultsetLoader);
			resultsetLoaderThread.start();
			threadsList.add(resultsetLoaderThread);
			repoCounter++;
		}
		
		closeResultset();			
	}
	
	
	/**
	 * Function waits for closing all jobs and then closees Resultset.
	 */
	private void closeResultset() {

		while (threadsList.size() > 0) {
			
			if (threadsList.size() <= deliveryPreferences.getMaxMdstoreRequestThreads()) {
		
				for (int i=0; i< threadsList.size(); i++) {
					Thread workingThread = threadsList.get(i);
					if (!workingThread.isAlive()) {
						threadsList.remove(workingThread);
						LOGGER.info("The mdstore delivering-thread '"+workingThread.getName()+"' finished, " +
								"removing it from the list of running threads.");
					}	
					workingThread = null;
					if (threadsList.size() == 0) break;
				}
			} 
			
			if (threadsList.size() == 0) break;
			
			try {
				Thread.sleep(10000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
		}	
		
		LOGGER.info("The process of transfering data from all MDStores completed, " +
				"closing Resultset.");
		this.outResultSetService.closeRS(outRsId);
		try {
			int number = this.outResultSetService.getNumberOfElements(outRsId);
			LOGGER.info("Number of records transfered to the Resultset: "+number );
		} catch (ResultSetException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}


	/**
	 * Function handelMDStoreRequestsQueue helps to handle the queue of single 
	 * mdstore deliver threads. The number of running threads can not be 
	 * more then the DeliveryPreferences.getMaxMdstoreRequestThreads().
	 * If the queue is already full then the next thread is blocked for
	 * DeliveryPreferences.getWaitForClosingThreads() milliseconds.
	 * If the total thread waiting time is more then 
	 * deliveryPreferences.getMaxWaitForClosingThreads() then 
	 * the task is broken. 
	 *  
	 */
	@SuppressWarnings("static-access")
	private void handelMDStoreRequestsQueue(){

		if (threadsList != null && threadsList.size() > 0){
			
			long startTime = 0;
			
			while (waiting) {
				if (startTime == 0)
					startTime = System.currentTimeMillis()/1000;
				
				long currentTime = System.currentTimeMillis()/1000;
				long timeDiff = currentTime-startTime;
	
				if ( timeDiff > deliveryPreferences.getMaxWaitForClosingThreads()) {
					String msg = "TIMEOUT MAX_WAIT_FOR_CLOSING_THREADS:" +
							"The process of delivering data from further " +
							"mdstores has been blocked by some threads, can not wait any more " +
							"for finishing running jobs.";
					LOGGER.error(msg);
					throw new RuntimeException(msg);
				}
				
				Iterator<Thread> threadsIter = threadsList.iterator();
				while (threadsIter.hasNext()) {
					Thread thread = threadsIter.next();
					if (!thread.isAlive()) {
						threadsList.remove(thread);
						LOGGER.info("The mdstore delivering-thread '"+thread.getName()+"' finished, " +
								"removing it from the list of running threads.");
					}	
					thread = null;
					if (threadsList.size() == 0) break;
				}
				if (threadsList.size() >= deliveryPreferences.getMaxMdstoreRequestThreads()) {
					LOGGER.warn("Can not start delivering of records for the next mdstore at the moment. " +
							"Waiting for closing one of the running delivery threads  " +
							"(threads limit: "+ deliveryPreferences.getMaxMdstoreRequestThreads() + ").");
					
					try {
						Thread.currentThread().sleep(deliveryPreferences.getWaitForClosingThreads());
					} catch (InterruptedException e) {}
				} else
					waiting = false;
			}
		}
	}

}
