/**
 * Copyright 2008-2010 DRIVER PROJECT (Bielefeld University)
 * Original author: Marek Imialek <marek.imialek at uni-bielefeld.de>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package eu.dnetlib.data.utility.download.manager;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;

import org.apache.log4j.Logger;

import eu.dnetlib.common.utils.xml.xpath.XPathParser;
import eu.dnetlib.common.ws.dataprov.DataProviderException;
import eu.dnetlib.common.ws.dataprov.IDataProviderExt;
import eu.dnetlib.data.utility.download.constants.DownloadServiceConstants;
import eu.dnetlib.data.utility.download.dataprov.IDownloadDataProvider;
import eu.dnetlib.data.utility.download.manager.thread.DownloadThread;
import eu.dnetlib.data.utility.download.rs.iterator.IRSIterator;
import eu.dnetlib.data.utility.download.utils.UrlUtils;

/**
 * The Class DownloadProcess contronls and runs download process. It runs separate thread
 * for each url. When all download jobs are ready it closes bulk data. 
 * @author <a href="mailto:marek.imialek at uni-bielefeld.de">Marek Imialek</a
 */
public class DownloadProcess extends Observable implements Runnable, Observer{

	/** The Constant log. */
	protected static final Logger log = Logger.getLogger(DownloadProcess.class);
	
	
	/** The objects for downloading. */
	private IRSIterator objectsForDownloading;
	
	/** The download process id. */
	private String downloadProcessId;
	
	/** The main storing path. */
	private String mainStoringPath;
	
	/** The url x path. */
	private String urlXPath;
	
	/** The download threads. */
	List<DownloadThread> downloadThreads = new ArrayList<DownloadThread>();

	/** The counter. */
	private int counter = 0;
	
	/** The duplication counter. */
	private int duplication = 0;
	
	/** The ignored objects. */
	private int ignored = 0;
	
	/** The working. */
	private int waiting = 0;
	
	/** The data provider. */
	private IDownloadDataProvider dataProvider;
	
	/** The hostqueue. */
	private Map<String,List<DownloadThread>> hostsQueuetoBeDownloaded;

	/** The hostqueue. */
	private Map<String,Integer> hostsCounter;
	
	/** The number of download processes for single host. */
	private int maxNumberDownloadsForHost = 10;
	/**
	 * Instantiates a new download process.
	 * 
	 * @param objectsForDownloading the objects for downloading
	 * @param downloadProcessId the download process id
	 * @param mainStoringPath the main storing path
	 * @param urlXPath the url x path
	 */
	public DownloadProcess(IRSIterator objectsForDownloading,
			String downloadProcessId, String mainStoringPath, String urlXPath){
		this.objectsForDownloading = objectsForDownloading;
		this.downloadProcessId = downloadProcessId;
		this.mainStoringPath = mainStoringPath;
		this.urlXPath = urlXPath;
	}
	
	/* (non-Javadoc)
	 * @see java.lang.Runnable#run()
	 */
	@Override
	public void run() {
		
		String objectDownloadDirectory = mainStoringPath + "/"+ downloadProcessId;
		File downloadDirectoryF = new File(objectDownloadDirectory);
		
		if (!downloadDirectoryF.exists())
			if (!(downloadDirectoryF.mkdir()))
				throw new RuntimeException("Could not create "
					+ "directory for storing objects, please check your "
					+ "access rights for the following location: "
					+ downloadDirectoryF.getAbsolutePath());
	
		hostsQueuetoBeDownloaded = Collections.synchronizedMap(new HashMap<String,List<DownloadThread>>());
		hostsCounter = Collections.synchronizedMap(new HashMap<String,Integer>());
		duplication = 0;
		counter = 0;
		ignored = 0;
		
		while (objectsForDownloading.hasNext()) {
			waiting++;//just to register thread waiting for downloading - 
					  //is waithing for response from RS
			//log.debug("INCR WAITING:" + waiting);
			String url = objectsForDownloading.next();
			//log.debug("GOT new record");
			if ("ignorethisobject".equals(url)) {
				waiting--;//removing form the list of waiting threads
				continue;
			}
			
			XPathParser xPathParser = null;
			String objextStoringPath = null;
			
			if (urlXPath != null){
				try {
					xPathParser = new XPathParser(url, false);
					url = xPathParser.getElementValue(urlXPath);
				}	
				catch (Exception e) {
					waiting--;//removing form the list of waiting threads
					log.warn("Object " +url+ " seems to be not a correct XML, ignoring it!");
					ignored++;
					continue;
				}
			}
			
			URL verifiedUrl = UrlUtils.verifyUrl(url);
			
			if (verifiedUrl != null) {
				objextStoringPath = objectDownloadDirectory + "/"
					+ verifiedUrl.getHost()+"-"+verifiedUrl.getPath().hashCode()+"-"
					+ UrlUtils.getFileName(verifiedUrl);
				
				DownloadThread dt = new DownloadThread(verifiedUrl,
						downloadProcessId, objextStoringPath, urlXPath);
				dt.setXPathParser(xPathParser);
				dt.addObserver(this);
				downloadThreads.add(dt);
				log.debug("Adding new download thread to the list: "+ dt 
						+ " NUMBER OF THREADS: "+downloadThreads.size() );
				waiting--;//removing form the list of waiting threads
				//log.debug("DECR WAITING:" + waiting);
				if (hostsCounter.containsKey(verifiedUrl.getHost())) {
					
					final int numberOfdownloadsForHost = hostsCounter.get(verifiedUrl.getHost());
					
					if ( numberOfdownloadsForHost >= maxNumberDownloadsForHost) {
						List<DownloadThread> ldt = null;
						if (hostsQueuetoBeDownloaded.containsKey(verifiedUrl.getHost())) {
							ldt = hostsQueuetoBeDownloaded.get(verifiedUrl.getHost());
						} else {
							ldt = new ArrayList<DownloadThread>();
						}
							
						ldt.add(dt);
						hostsQueuetoBeDownloaded.put(verifiedUrl.getHost(), ldt);
						log.warn("Thread "+ dt +" is waiting in queue, to many " +
								"request for host "+ verifiedUrl.getHost());
					} else {
						final int nodfh = numberOfdownloadsForHost + 1;
						hostsCounter.put(verifiedUrl.getHost(), nodfh);
						dt.download();
					}
				} else {
					hostsCounter.put(verifiedUrl.getHost(), 1);
					dt.download();
				}
				
			} else {
				waiting--;//removing form the list of waiting threads	
				String errMsg = "Invalid Download URL: " + url;
				log.warn(errMsg);
				if (this.urlXPath != null)
					objextStoringPath = objectDownloadDirectory + "/"
						+ url.hashCode() + "--"+System.currentTimeMillis();
					try {
						counter++;
						final int cnt1 = counter;
						this.storeXMLwithDownloadPath(xPathParser, objextStoringPath, 
								urlXPath, false, cnt1);
							
						dataProvider.updateBulkData(downloadProcessId, IDataProviderExt.STATUS_OPEN);
						
					} catch (DataProviderException e) {
						log.error("Problem by updating Bulk Data status: "+ e);
					} catch (Exception e) {
						log.error(e);
					}
				
			}
		}
		log.debug("All urls added to the queue, wait for finishing all jobs!");
		log.debug("Waiting queue::"+ waiting);
		log.debug("Download threads queue::"+ downloadThreads.size());
		log.debug("ResultSet iterator status::"+ objectsForDownloading.getRSStatus());
		//in case if all urls were not correct, close BD
		if (downloadThreads.size() == 0 && waiting == 0) {
			log.debug("All jobs finished, sending status for closing BD!");
			setChanged();
			notifyObservers("");
		}
		
	}

	/* (non-Javadoc)
	 * @see java.util.Observer#update(java.util.Observable, java.lang.Object)
	 */
	public synchronized void update(Observable o, Object arg) {
		final DownloadThread downloadThread = ((DownloadThread) o);
		
		synchronized(downloadThread){
			try {
				log.debug("Download Thread is ready: "+ downloadThread);
				downloadThreads.remove(downloadThread);
				
				List<DownloadThread> threadsToBedownloaded = null;
				
				if (hostsQueuetoBeDownloaded != null) {
					threadsToBedownloaded = 
						hostsQueuetoBeDownloaded.get(downloadThread.getURL().getHost());
				}
				//get the current number of threads for host
				int numberOfProcessesforHost = hostsCounter.get(downloadThread.getURL().getHost());
				
				if (numberOfProcessesforHost >= maxNumberDownloadsForHost && 
						threadsToBedownloaded != null && 
						threadsToBedownloaded.size() != 0) {
					
					log.debug("Number of non-active threads for host '"
							+ downloadThread.getURL().getHost()+"' waiting for downloading: "
							+threadsToBedownloaded.size());
					
					log.debug("Activating one of the waiting threads");
					DownloadThread dthread = (DownloadThread) threadsToBedownloaded.get(0);
					dthread.download();
					threadsToBedownloaded.remove(0);
					hostsQueuetoBeDownloaded.put(downloadThread.getURL().getHost(), threadsToBedownloaded);
					
				} else {
					if (numberOfProcessesforHost == 1) {
						hostsCounter.remove(downloadThread.getURL().getHost());
						numberOfProcessesforHost = 0;
					} else {
						numberOfProcessesforHost = numberOfProcessesforHost -1;
						hostsCounter.put(downloadThread.getURL().getHost(), numberOfProcessesforHost);
					}
				}
				
				log.debug("Number of active downloads threads for host '"
						+ downloadThread.getURL().getHost()+"': "
						+ numberOfProcessesforHost);
						
				//increment number of results in Bulk Data if the downloading is completed
				int status = (Integer) arg;
				if (status == DownloadServiceConstants.STATUS_COMPLETE) {
					counter++;
					final int cnt = counter;
					
					if (urlXPath != null) {
						this.storeXMLwithDownloadPath(downloadThread.getXPathParser(), 
							downloadThread.getFilePath(), urlXPath, true, cnt);
					}
		
					if (downloadThread.renameFile(cnt)) {
						
						dataProvider.updateBulkData(downloadProcessId, IDataProviderExt.STATUS_OPEN);
					}
				}
				else if (status == DownloadServiceConstants.STATUS_DUPLICATION) {
					duplication++;
				}
				else {
					log.error("Unknown status ("+status+") for "+downloadThread.getUrl());
				}
			} catch (DataProviderException e) {
					log.error("Problem by updating Bulk Data status: "+ e);
			} catch (Exception e) {
				log.error("Problem by storing XML file" + e);
			}	
	
			log.debug("Waiting queue:"+ waiting);
			log.debug("Download threads queue:"+ downloadThreads.size());
			log.debug("ResultSet iterator status:"+ objectsForDownloading.getRSStatus());
			
			if (downloadThreads.size() == 0 && waiting == 0) {
				if (IDataProviderExt.STATUS_CLOSED.equalsIgnoreCase(objectsForDownloading.getRSStatus())) {
					log.info("All jobs finished, " +counter+ " objects processed.");
			
					if (duplication != 0)
						log.warn("Omited "+ duplication+" object(s) pointing on the same url." );
			
					if (ignored != 0)
						log.warn("Ignored "+ ignored+" object(s)." );
			
					setChanged();
					notifyObservers(arg);
				}
			}	
		}	
	}
	
	/**
	 * Store xm lwith download path.
	 * 
	 * @param xPathParser the x path parser
	 * @param storingPath the storing path
	 * @param urlXPath the url x path
	 * @param updateXML the update xml
	 * @param counter the counter
	 * 
	 * @throws Exception the exception
	 */
	private void storeXMLwithDownloadPath(XPathParser xPathParser, String storingPath, 
			String urlXPath, boolean updateXML, int cnt1) 
		throws Exception {

		int fieldIndexFrom = urlXPath.lastIndexOf("/");
		if (fieldIndexFrom == -1) {
			throw new Exception("Can not extract field name from XPath: "+urlXPath);
		}
		String fieldName = urlXPath.substring(fieldIndexFrom+1, urlXPath.length());

		String schema = null;
		String schemaDef = null; 
		if (fieldName.contains(":")) {
			int schemaIndexTo = fieldName.lastIndexOf(":");
			if (schemaIndexTo == -1) {
				throw new Exception("Can not extract schema from field: "+fieldName);
			}
			schema = fieldName.substring(0, schemaIndexTo);
		}
		
		HashMap<String, String> schemas = xPathParser.extractSchemas();
		if (schemas != null) schemaDef = schemas.get(schema);
	
		boolean response = false;
		
		File tmpStoringPath = new File(storingPath);
		File newStoringPath = new File(tmpStoringPath.getParent()+"/"
				+cnt1+"."+tmpStoringPath.getName());
		
		if(updateXML) {
			response = xPathParser.updateElementValue(urlXPath, 
					fieldName, newStoringPath.getAbsolutePath(), schema, schemaDef);
		}
		else {
			response = true;
		}
		
		if (response) {
			String modifiedRecord = xPathParser.getModifiedDocument();
			String pathToWrite = newStoringPath.getAbsolutePath() +
				DownloadServiceConstants.DOWNLOAD_XML_FILE_SUFFIX;
			
			log.debug("Storing XML in: "+ pathToWrite);
			BufferedWriter out = new BufferedWriter(new FileWriter(pathToWrite));
			out.write(modifiedRecord);
			out.close();
		}	
		else 
			throw new Exception("XML not updated with an object downloading path");

	}
	
	/**
	 * Sets the data provider.
	 * 
	 * @param dataProvider the new data provider
	 */
	public void setDataProvider(IDownloadDataProvider dataProvider) {
		this.dataProvider = dataProvider;
	}
	
	/**
	 * Sets the max number downloads for host.
	 * 
	 * @param maxNumberDownloadsForHost the new max number downloads for host
	 */
	public void setMaxNumberDownloadsForHost(int maxNumberDownloadsForHost) {
		this.maxNumberDownloadsForHost = maxNumberDownloadsForHost;
	}
	

}
