/**
 * Copyright © 2008-2009 DRIVER PROJECT (ICM UW)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package eu.dnetlib.resultset.impl.containers;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import org.apache.log4j.Logger;

import eu.dnetlib.common.ws.dataprov.DataProviderException;
import eu.dnetlib.common.ws.dataprov.IDataProviderExt;
import eu.dnetlib.resultset.impl.IndexResultSetFaultMessage;
import eu.dnetlib.resultset.impl.PullResultSetObject;
import eu.dnetlib.resultset.impl.PushResultSetObject;
import eu.dnetlib.resultset.impl.ResultSetConstants;
import eu.dnetlib.resultset.impl.ResultSetObject;
import eu.dnetlib.resultset.impl.ResultSetUtils;
import eu.dnetlib.resultset.impl.ResultSetObject.ResultSetType;

/**
 * Memory based ResultSetObject container.
 * Should be considered as RSObjectContainer incubator, shouldn't be used in production environment.
 * @author Marek Horst
 * @version 0.01
 *
 */
public class ResultSetObjectMemoryContainer implements
		IResultSetObjectContainer {

	protected static final Logger log = Logger.getLogger(ResultSetObjectMemoryContainer.class);
	
	Map <String,ResultSetObject> rsData = Collections.synchronizedMap(
			new HashMap<String, ResultSetObject>());
	
	
	
	/**
	 * Internal index data provider.
	 */
	private IDataProviderExt indexDataProvider;
	
	/**
	 * Caching flag. If set to false no RS data will be stored in rsObjectContainer.
	 */
	private boolean cachingEnabled;
	
	/* (non-Javadoc)
	 * @see eu.dnetlib.resultset.impl.containers.IResultSetObjectContainer#get(java.lang.String)
	 */
	public ResultSetObject get(String rsId) {
		return rsData.get(rsId);
	}
	
	/* (non-Javadoc)
	 * @see eu.dnetlib.resultset.impl.containers.IResultSetObjectContainer#getWithStoredData(java.lang.String)
	 */
	public ResultSetObject getWithStoredData(String rsId) {
//		no difference for Memory based container
		return get(rsId);
	}

	/* (non-Javadoc)
	 * @see eu.dnetlib.resultset.impl.containers.IResultSetObjectContainer#getData(java.lang.String, int, int)
	 */
	public String[] getData(String rsId, int fromPosition, int toPosition) 
		throws IndexResultSetFaultMessage {
		ResultSetObject rsObject = get(rsId);
		if (rsObject==null)
			throw new IndexResultSetFaultMessage("ResultSet "+rsId+" doesn't exist!", 
					IndexResultSetFaultMessage.ERROR_INVALID_RS);
		
		if (fromPosition<1)
			fromPosition=1;
		
		if (rsObject.getResultSetType()==ResultSetType.PUSH_RS) {
//			PUSH RS
			PushResultSetObject pushRsObject = (PushResultSetObject) rsObject;
//			adapting fromPosition offset
			fromPosition--;
			if (toPosition!=ResultSetConstants.RESULT_SET_END_POSITION_UNSPECIFIED)
			toPosition--;
			
			String[] result = pushRsObject.getData();
			if (result==null ||	fromPosition > result.length)
				return new String[0];
			
			if (toPosition==ResultSetConstants.RESULT_SET_END_POSITION_UNSPECIFIED) {
				if (fromPosition==0) {
					return result;
				}else
					return ResultSetUtils.getSubArray(result,fromPosition,result.length-1);
			} else {
				if (toPosition<fromPosition)
					return new String[0];
				if (toPosition > result.length-1)
					toPosition = result.length-1;
				return ResultSetUtils.getSubArray(result,fromPosition,toPosition);
			}
		} else {
//			PULL RS
			PullResultSetObject pullRsObject = (PullResultSetObject) rsObject;
//			keepAliveTime support:
			long lastAccessTime = System.currentTimeMillis();
			pullRsObject.setLastAccessTime(lastAccessTime);
			store(pullRsObject);
			
			if (pullRsObject.getSize()==0)
				return new String[0];
			
			if (fromPosition>pullRsObject.getSize())
				return new String[0];
//				fromPosition=pullRsObject.getSize();
				
			if (toPosition>pullRsObject.getSize())
				toPosition=pullRsObject.getSize();

//			updating data from data provider if needed
			int totalElements = pullRsObject.getSize();
			int initialPageSize = pullRsObject.getInitialPageSize();
			int startPageNumber = (fromPosition-1)/initialPageSize + 1;
			log.debug("start page number: "+startPageNumber);
			int lastAvailaibleElementNumber = initialPageSize*startPageNumber;
			int currentPageNumber = startPageNumber;
			boolean stopWorking = false;
			boolean updatedData = false;
			
			while (!stopWorking) {
				log.debug("working on curentPageNumber: "+currentPageNumber);
				if (!pullRsObject.isPageRetreived(currentPageNumber)) {
					lastAvailaibleElementNumber = initialPageSize*currentPageNumber;
					int pckgStartElement = initialPageSize*(currentPageNumber-1) + 1;
					log.debug("pckgStartElement: "+pckgStartElement);
					if (lastAvailaibleElementNumber>=toPosition) {
						stopWorking = true;
					} 
					int pckgEndElement = lastAvailaibleElementNumber<totalElements?
							lastAvailaibleElementNumber:totalElements;
					log.debug("pckgEndElement: "+pckgEndElement);
					try {
						log.debug("getting bulkData for bdId: "+pullRsObject.getBdId()+
								", fromPosition: "+pckgStartElement+
								", toPosition: "+pckgEndElement);
						String[] pageData = indexDataProvider.getSimpleBulkData(pullRsObject.getBdId(), 
								pckgStartElement, pckgEndElement);
						pullRsObject.setPageRetrieved(currentPageNumber);
						pullRsObject.setPageData(currentPageNumber, pageData);
						updatedData = true;
					} catch (DataProviderException e) {
						log.error("Exception occured when getting results " +
								"from index data provider!",e);
							throw new IndexResultSetFaultMessage("Exception occured when getting results " +
								"from index data provider" + 
								"; internal error message: "+e.getMessage(), e, IndexResultSetFaultMessage.ERROR_UNKNOWN);
					}
				} else {
					lastAvailaibleElementNumber = initialPageSize*currentPageNumber;
					if (lastAvailaibleElementNumber>=toPosition) {
						stopWorking = true;
					} 
				}
				currentPageNumber = currentPageNumber+1;
			}
//			updating data in rsObjectContainer
			if (updatedData && isCachingEnabled())
				store(pullRsObject);
			return pullRsObject.getData(fromPosition, toPosition);
		}
	}
	
	/* (non-Javadoc)
	 * @see eu.dnetlib.resultset.impl.containers.IResultSetObjectContainer#store(eu.dnetlib.resultset.impl.ResultSetObject)
	 */
	public void store(ResultSetObject rsObject) {
		rsData.put(rsObject.getResultSetId(), rsObject);
	}
	
	/* (non-Javadoc)
	 * @see eu.dnetlib.resultset.impl.containers.IResultSetObjectContainer#remove(java.lang.String)
	 */
	public ResultSetObject remove(String rsId) {
		return rsData.remove(rsId);
	}

	/* (non-Javadoc)
	 * @see eu.dnetlib.resultset.impl.containers.IResultSetObjectContainer#cleanup()
	 */
	public int cleanup() {
		log.debug("starting cleanup operations...");
		int removedObjectsCount = 0;
		long currentTime = System.currentTimeMillis();
		synchronized(rsData) {
			Set<String> keySet = rsData.keySet();
			Iterator<String> keysIt = keySet.iterator();
			while (keysIt.hasNext()) {
				String currentKey = keysIt.next();
				ResultSetObject currentRsObj = rsData.get(currentKey);
				if (currentTime > currentRsObj.getExpirationTime()) {
					log.debug("removing rs: "+currentKey);
					keysIt.remove();
					removedObjectsCount++;
				}
			}
		}
		log.debug("cleanup operations finished");
		return removedObjectsCount;
	}

	/**
	 * Returns internal index data provider.
	 * @return internal index data provider
	 */
	public IDataProviderExt getIndexDataProvider() {
		return indexDataProvider;
	}

	/**
	 * Sets internal index data provider.
	 * @param indexDataProvider
	 */
	public void setIndexDataProvider(IDataProviderExt indexDataProvider) {
		this.indexDataProvider = indexDataProvider;
	}
	
	/**
	 * Returns caching flag.
	 * @return caching flag
	 */
	public boolean isCachingEnabled() {
		return cachingEnabled;
	}

	/**
	 * Sets caching flag.
	 * @param cachingEnabled
	 */
	public void setCachingEnabled(boolean cachingEnabled) {
		this.cachingEnabled = cachingEnabled;
	}
	
}
