package eu.dnetlib.prodman.agent;

import java.io.IOException;

import javax.annotation.PostConstruct;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Required;

public class ProductionManagerAgent implements Watcher {
	private static final Log log = LogFactory.getLog(ProductionManagerAgent.class); // NOPMD by marko on 11/24/08 5:02 PM

	/**
	 * default zookeeper timeout, ok for WAN.
	 */
	private static final int DEFAULT_ZK_TIMEOUT = 300;

	/**
	 * zookeeper root container for dnet agents presence.
	 */
	private static final String DNET_AGENT = "/dnet-agent";

	/**
	 * server connect string, comma separated ip + optional port of servers.
	 */
	private String servers = "127.0.0.1";

	/**
	 * datacenter name.
	 */
	private String datacenterName;

	/**
	 * last ephemeral node name.
	 */
	private transient String ephemeralName;

	/**
	 * zoo keeper instance.
	 */
	private ZooKeeper zk;

	@PostConstruct
	public void init() throws IOException {
		log.info("AGENT STARTED");

		zk = new ZooKeeper(servers, DEFAULT_ZK_TIMEOUT, this);
	}

	public void process(final WatchedEvent event) {
		log.info("GOT ZOOKEEPER EVENT " + event + " on " + zk);

		try {
			if (event.getState() == KeeperState.SyncConnected)
				broadcastPresence();
			else if (event.getState() == KeeperState.Expired)
				recoverConnection();
		} catch (final KeeperException e) {
			log.fatal("got exception while processing zookeeper event", e);
		} catch (final InterruptedException e) {
			log.fatal("got exception while processing zookeeper event", e);
		}
	}

	private void recoverConnection() throws KeeperException, InterruptedException {
		while (true) {
			try {
				zk.close();
				zk = new ZooKeeper(servers, DEFAULT_ZK_TIMEOUT, this);
				log.info("ZK RECONNECTED");
				return;
			} catch (IOException e) {
				log.warn("cannot recover zookeeper connection, retrying", e);
			}
		}
	}

	/**
	 * tell all monitoring consoles that we are alive.
	 * 
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	private void broadcastPresence() throws KeeperException, InterruptedException {
		log.info("BROADCASTING PRESENCE THROUGH ZOOKEEPER NODE");
		ensureRootContainer();
		Stat s = null;
		if (getEphemeralName() != null) {
			log.info("CHECKING EPHEMERAL NODE " + getEphemeralName());
			s = zk.exists(getEphemeralName(), false);
		}
		log.info("EPHEMERAL NODE EXISTS? " + s);
		if (s == null) {
			log.info("CREATING EPHEMERAL NODE");
			ephemeralName = zk.create(getRootName() + "/agent_", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
			log.info("EPHEMERAL NODE CREATED: " + ephemeralName);
		}

	}

	/**
	 * ensure that the zookeep root exists.
	 * 
	 * @throws InterruptedException
	 * @throws KeeperException
	 */
	private void ensureRootContainer() throws KeeperException, InterruptedException {
		log.info("CHECKING DNET_AGENT root");
		final Stat s = zk.exists(DNET_AGENT, false);
		// TODO: guarantee that if somebody else creates it in the meantime that we are safe from crashes
		if (s == null) {
			log.info("creating DNET_AGENT root");
			zk.create(DNET_AGENT, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		}

		log.info("CHECKING DATACENTER root: " + getRootName());
		final Stat ds = zk.exists(getRootName(), false);
		// TODO: guarantee that if somebody else creates it in the meantime that we are safe from crashes
		if (ds == null) {
			log.info("creating datacenter  root: " + getRootName());
			zk.create(getRootName(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		}
	}

	/**
	 * Datacenter name.
	 * 
	 * @return
	 */
	private String getRootName() {
		return DNET_AGENT + "/" + datacenterName;
	}

	public String getDatacenterName() {
		return datacenterName;
	}

	@Required
	public void setDatacenterName(final String datacenterName) {
		this.datacenterName = datacenterName;
	}

	public String getServers() {
		return servers;
	}

	public void setServers(final String servers) {
		this.servers = servers;
	}

	public String getEphemeralName() {
		return ephemeralName;
	}

	public void setEphemeralName(final String ephemeralName) {
		this.ephemeralName = ephemeralName;
	}
}
