package eu.dnetlib.enabling.dlm;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.springframework.beans.factory.annotation.Required;

import eu.dnetlib.enabling.dlm.rmi.DlmService;
import eu.dnetlib.enabling.tools.AbstractBaseService;

/**
 * Zookeeper server.
 * 
 * @author marko
 * 
 */
public class ZooKeeperDlmServiceImpl extends AbstractBaseService implements DlmService {

	/**
	 * logger.
	 */
	private static final Log log = LogFactory.getLog(ZooKeeperDlmServiceImpl.class); // NOPMD by marko on 11/24/08 5:02 PM

	/**
	 * Default zookeeper election algo.
	 */
	private static final int DEFAULT_ZK_ELECTION = 3;

	/**
	 * default max connections.
	 */
	private static final int DEFAULT_MAX_CNXNS = 10;

	/**
	 * zookeeper server peer.
	 */
	private QuorumPeer quorumPeer;

	/**
	 * client port.
	 */
	private int clientPort;

	/**
	 * data directory.
	 */
	private String dataDir;

	/**
	 * max client connections.
	 */
	private int maxClientCnxns = DEFAULT_MAX_CNXNS;

	/**
	 * tick time.
	 */
	private int tickTime;

	/**
	 * init limit.
	 */
	private int initLimit;

	/**
	 * sync limit.
	 */
	private int syncLimit;

	/**
	 * server id. each server needs a unique id.
	 */
	private long serverId;

	/**
	 * true if enabled
	 */
	private boolean enabled = true;

	/**
	 * global server map. Currently it has to be statically defined.
	 */
	private Map<Long, String> servers;

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.enabling.tools.AbstractBaseService#start()
	 */
	@Override
	public void start() {
		super.start();

		Thread thread = new Thread(new Runnable() {
			public void run() {
				try {
					final NIOServerCnxn.Factory cnxnFactory = new NIOServerCnxn.Factory(getClientPort(), getMaxClientCnxns());

					quorumPeer = new QuorumPeer();
					quorumPeer.setClientPort(getClientPort());
					quorumPeer.setTxnFactory(new FileTxnSnapLog(new File(getDataDir()), new File(getDataDir())));
					quorumPeer.setQuorumPeers(buildServerMap(getServers()));
					//			quorumPeer.setElectionType(config.getElectionAlg());

					quorumPeer.setElectionType(DEFAULT_ZK_ELECTION);
					quorumPeer.setMyid(getServerId());
					quorumPeer.setTickTime(getTickTime());
					quorumPeer.setInitLimit(getInitLimit());
					quorumPeer.setSyncLimit(getSyncLimit());
					//			quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
					quorumPeer.setQuorumVerifier(new QuorumMaj(getServers().size()));
					quorumPeer.setCnxnFactory(cnxnFactory);

					new File(dataDir).mkdir();

					log.info("STARTING ZOOKEEPER: " + serverId);
					log.info("zookeeper servers" + servers);
					quorumPeer.start();
					quorumPeer.join();
				} catch (final InterruptedException e) {
					// warn, but generally this is ok
					log.warn("Quorum Peer interrupted", e);
				} catch (IOException e) {
					log.fatal("io exception in creating zookeeper server", e);
				}
			}
		});

		Thread simpleThread = new Thread(new Runnable() {
			public void run() {
				log.warn("Either no config or no quorum defined in config, running " + " in standalone mode");

				try {
					final NIOServerCnxn.Factory cnxnFactory = new NIOServerCnxn.Factory(getClientPort(), getMaxClientCnxns());

					ZooKeeperServer zkServer = new ZooKeeperServer();

					zkServer.setTxnLogFactory(new FileTxnSnapLog(new File(getDataDir()), new File(getDataDir())));
					zkServer.setTickTime(getTickTime());

					cnxnFactory.startup(zkServer);
					cnxnFactory.join();

				} catch (final InterruptedException e) {
					// warn, but generally this is ok
					log.warn("Quorum Peer interrupted", e);
				} catch (IOException e) {
					log.fatal("io exception in creating zookeeper server", e);
				}
			}
		});

		if (enabled) {
			if (servers.isEmpty() || servers.size() == 1)
				simpleThread.start();
			else
				thread.start();
		}
	}

	/**
	 * transform a long->string map in a long-> QuorumServer map.
	 * 
	 * @param arg
	 *            user friendly map
	 * @return zookeeper map
	 */
	private Map<Long, QuorumServer> buildServerMap(final Map<Long, String> arg) {
		final Map<Long, QuorumServer> res = new HashMap<Long, QuorumServer>();

		for (final Map.Entry<Long, String> entry : arg.entrySet()) {
			final String[] parts = entry.getValue().split(":");

			if (parts.length != (2 + 1))
				log.error(entry.getValue() + " does not have the form host:port or host:port:port");

			final InetSocketAddress addr = new InetSocketAddress(parts[0], Integer.parseInt(parts[1]));
			final InetSocketAddress electionAddr = new InetSocketAddress(parts[0], Integer.parseInt(parts[2]));

			res.put(entry.getKey(), new QuorumServer(entry.getKey(), addr, electionAddr));
		}
		return res;
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.enabling.tools.AbstractBaseService#stop()
	 */
	@Override
	public void stop() {
		super.stop();

	}

	public QuorumPeer getQuorumPeer() {
		return quorumPeer;
	}

	public void setQuorumPeer(final QuorumPeer quorumPeer) {
		this.quorumPeer = quorumPeer;
	}

	public String getDataDir() {
		return dataDir;
	}

	@Required
	public void setDataDir(final String dataDir) {
		this.dataDir = dataDir;
	}

	public int getMaxClientCnxns() {
		return maxClientCnxns;
	}

	public void setMaxClientCnxns(final int maxClientCnxns) {
		this.maxClientCnxns = maxClientCnxns;
	}

	public int getClientPort() {
		return clientPort;
	}

	@Required
	public void setClientPort(final int clientPort) {
		this.clientPort = clientPort;
	}

	public int getTickTime() {
		return tickTime;
	}

	@Required
	public void setTickTime(final int tickTime) {
		this.tickTime = tickTime;
	}

	public int getInitLimit() {
		return initLimit;
	}

	@Required
	public void setInitLimit(final int initLimit) {
		this.initLimit = initLimit;
	}

	public int getSyncLimit() {
		return syncLimit;
	}

	@Required
	public void setSyncLimit(final int syncLimit) {
		this.syncLimit = syncLimit;
	}

	public long getServerId() {
		return serverId;
	}

	@Required
	public void setServerId(final long serverId) {
		this.serverId = serverId;
	}

	public Map<Long, String> getServers() {
		return servers;
	}

	@Required
	public void setServers(final Map<Long, String> servers) {
		this.servers = servers;
	}

	public boolean isEnabled() {
		return enabled;
	}

	public void setEnabled(boolean enabled) {
		this.enabled = enabled;
	}

}
