package eu.dnetlib.data.actionmanager.blackboard;

import java.util.Date;

import com.google.common.collect.Iterables;

import eu.dnetlib.data.hadoop.HadoopIsClient;
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerAction;
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
import eu.dnetlib.rmi.data.hadoop.ClusterName;
import eu.dnetlib.rmi.data.hadoop.HadoopService;
import eu.dnetlib.rmi.data.hadoop.actionmanager.ActionManagerSet;
import eu.dnetlib.rmi.data.hadoop.actionmanager.RawSet;
import eu.dnetlib.rmi.enabling.ISRegistryService;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;

public class GarbageActionManagerActionFromHDFS extends AbstractActionManagerAction implements BlackboardServerAction<ActionManagerActions> {

	private static final Log log = LogFactory.getLog(GarbageActionManagerActionFromHDFS.class);

	private final static String UPDATE_ACTION_PROFILE = "for $x in "
			+ "/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'ActionManagerSetDSResourceType' and .//SET/@id = '%s'] "
			+ "return update delete $x//RAW_SETS/EXPIRED[@id = '%s']";

	@Autowired
	private UniqueServiceLocator serviceLocator;

	@Autowired
	private HadoopIsClient isClient;

	@Override
	public void execute(final BlackboardServerHandler handler, final BlackboardJob job) {
		try {
			final String basePath = isClient.getBasePathHDFS();
			final Integer garbageRetain = Integer.valueOf(isClient.getGarbageRetainThreshold());
			final Long dateLimit =  new Date().getTime() - Long.valueOf(isClient.getGarbageTimeMargin());

			final HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
			final ISRegistryService isRegistry = serviceLocator.getService(ISRegistryService.class);

			for (ActionManagerSet set : isClient.listValidSets()) {
				for (RawSet expired : Iterables.limit(set.getExpired(), garbageRetain)) {
					final Date d = DateUtils.parseDate(expired.getLastUpdate(), ActionManagerSet.DATE_PATTERNS);
					if (d.getTime() < dateLimit) {

						log.info(String.format("removing raw action set %s/%s", set.getId(), expired.getId()));
						hadoopService.deleteHdfsPath(ClusterName.DM.toString(), basePath + "/" + set.getDirectory() + "/" + expired.getId());

						final String xUpdate = String.format(UPDATE_ACTION_PROFILE, set.getId(), expired.getId());
						log.info(String.format("updating ActionSet profile: %s", xUpdate));
						isRegistry.executeXUpdate(xUpdate);
					}
				}
			}

			handler.done(job);
		} catch (Throwable e) {
			handler.failed(job, e);
		}
	}
}
