package eu.dnetlib.prodman;

import java.io.IOException;
import java.util.Collection;
import java.util.List;

import javax.annotation.PostConstruct;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;

import com.google.common.collect.Lists;

import eu.dnetlib.enabling.hcm.ZooKeeperDLMClient;
import eu.dnetlib.enabling.hcm.ZooKeeperEvent;
import eu.dnetlib.prodman.ServiceStatus.Status;

public class StatusManagerImpl implements StatusManager, Watcher, ApplicationListener {

	private static final String DNET_AGENT = "/dnet-agent";

	private static final Log log = LogFactory.getLog(StatusManagerImpl.class); // NOPMD by marko on 11/24/08 5:02 PM

	private ZooKeeperDLMClient zooKeeperClient;

	private ZooKeeper zk;

	private String datacenterName;

	/**
	 * current list of agents (zk node names).
	 */
	private List<String> agents = Lists.newArrayList();

	/**
	 * minimum number of agents.
	 */
	private int minAgents = 1;

	@PostConstruct
	public void init() throws KeeperException, InterruptedException, IOException {
		log.fatal("INITIALIZING statusmanager");
		obtainNewConnection();
	}

	public void onApplicationEvent(ApplicationEvent aev) {
		if (!(aev instanceof ZooKeeperEvent))
			return;
		onZooKeeperEvent((ZooKeeperEvent) aev);
	}

	private void onZooKeeperEvent(ZooKeeperEvent event) {
		if (event.getEventType() == ZooKeeperEvent.EventType.EnterCluster)
			onConnect();
		else if (event.getEventType() == ZooKeeperEvent.EventType.ExitCluster)
			onDisconnect();
	}

	private void onDisconnect() {
		log.info("status manager: DISCONNECTED");
	}

	private void onConnect() {
		log.info("status manager: CONNECTED");
		zk = zooKeeperClient.getZooKeeper();
		try {
			updateAndWatch();
		} catch (KeeperException e) {
			log.fatal("got exception while processing zookeeper event", e);
		} catch (InterruptedException e) {
			log.fatal("got exception while processing zookeeper event", e);
		}

	}

	private void updateAndWatch() throws KeeperException, InterruptedException {
		maintainAgentState();
		watch();
	}

	public void process(WatchedEvent event) {
		log.fatal("processing zoo event: " + event);
		
		// ignore if expired
		if (event.getState() == KeeperState.Expired)
			return;
		
		try {
			if (event.getType() == EventType.NodeChildrenChanged)
				updateAndWatch();
		} catch (KeeperException e) {
			log.fatal("got exception while processing zookeeper event", e);
		} catch (InterruptedException e) {
			log.fatal("got exception while processing zookeeper event", e);
		}
	}

	private void maintainAgentState() throws KeeperException, InterruptedException {
		ensureRootContainer();
		log.fatal("MAINTAINING AGENTS");
		agents = zk.getChildren(getRootName(), false);

		log.fatal("NOW AGENTS ARE: " + agents);
	}

	private void obtainNewConnection() throws InterruptedException, KeeperException, IOException {		
		afterNewConnection();
	}

	private void afterNewConnection() throws KeeperException, InterruptedException {
		if (zooKeeperClient.isClustered()) {
			maintainAgentState();
			watch();
		}
	}

	protected void watch() throws KeeperException, InterruptedException {
		ensureRootContainer();
		zk.getChildren(getRootName(), this);
	}

	public Collection<ServiceStatus> getServiceStatuses() {
		return Lists.newArrayList(new ServiceStatus("Data center", getDatacenterStatus()), new ServiceStatus("Portal", Status.Ok));
	}

	private Status getDatacenterStatus() {
		// zoo = zooClient.getZooKeeper();
		if (agents.size() < minAgents)
			return Status.Failure;
		return Status.Ok;
	}

	/**
	 * ensure that the zookeep root exists.
	 * 
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	private void ensureRootContainer() throws KeeperException, InterruptedException {
		Stat s = zk.exists(DNET_AGENT, false);
		// TODO: guarantee that if somebody else creates it in the meantime that we are safe from crashes
		if (s == null) {
			zk.create(DNET_AGENT, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		}

		Stat ds = zk.exists(getRootName(), false);
		// TODO: guarantee that if somebody else creates it in the meantime that we are safe from crashes
		if (ds == null) {
			zk.create(getRootName(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		}
	}

	/**
	 * Datacenter name.
	 * 
	 * @return
	 */
	private String getRootName() {
		return DNET_AGENT + "/" + datacenterName;
	}

	public ZooKeeper getZk() {
		return zk;
	}

	public void setZk(final ZooKeeper zk) {
		this.zk = zk;
	}

	public String getDatacenterName() {
		return datacenterName;
	}

	@Required
	public void setDatacenterName(String datacenterName) {
		this.datacenterName = datacenterName;
	}

	public ZooKeeperDLMClient getZooKeeperClient() {
		return zooKeeperClient;
	}

	@Required
	public void setZooKeeperClient(ZooKeeperDLMClient zooKeeperClient) {
		this.zooKeeperClient = zooKeeperClient;
	}

	public List<String> getAgents() {
		return agents;
	}

	public void setAgents(List<String> agents) {
		this.agents = agents;
	}

	public int getMinAgents() {
		return minAgents;
	}

	public void setMinAgents(int minAgents) {
		this.minAgents = minAgents;
	}

	public boolean isReliable() {
		return zooKeeperClient.isClustered();
	}

}
