package eu.dnetlib.stat.zmq;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Required;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;

import akka.actor.TypedActor;
import eu.dnetlib.stat.StatEmitterBackend;

@SuppressWarnings("unchecked")
public class ZmqEmitterActor extends TypedActor implements StatEmitterBackend, InitializingBean {
	private static final Log log = LogFactory.getLog(ZmqEmitterActor.class); // NOPMD by marko on 11/24/08 5:02 PM

	private ZMQ.Context zmqContext;

	private Socket socket;
	public static int current = 0;
	
	@Override
	public void afterPropertiesSet() {
		if(zmqContext == null)
			return;
		socket = zmqContext.socket(ZMQ.PUB);
		socket.setHWM(100);
	
		socket.bind("tcp://*:12345");
	}

	public void emit(final String module, final String instance, final String key, final String value) {
//		log.info("emitting: " + Thread.currentThread().toString() + ", " + module + ", " + instance + ", " + key + ": " + value);
		current++;
		send(Thread.currentThread().toString(), module, instance, key, value);
	}

	public void send(final String... str) {
		if(socket == null)
			return;
		
		for (int i = 0; i < str.length; i++)
			socket.send(str[i].getBytes(), i == str.length - 1 ? 0 : ZMQ.SNDMORE);
	}

	public ZMQ.Context getZmqContext() {
		return zmqContext;
	}

	@Required
	public void setZmqContext(final ZMQ.Context zmqContext) {
		this.zmqContext = zmqContext;
	}

}
