package eu.dnetlib.lbs.events.input;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import eu.dnetlib.lbs.elasticsearch.Event;
import eu.dnetlib.lbs.utils.LbsQueue;

public class RabbitMQConsumer implements Runnable {

	private final String queue;
	private final String host;
	private final int port;
	private final String username;
	private final String password;
	private final LbsQueue<String, Event> localQueue;

	private static final Log log = LogFactory.getLog(RabbitMQConsumer.class);

	public RabbitMQConsumer(final String queue, final String host, final int port, final String username, final String password,
			final LbsQueue<String, Event> localQueue) {
		super();
		this.queue = queue;
		this.host = host;
		this.port = port;
		this.username = username;
		this.password = password;
		this.localQueue = localQueue;
	}

	@Override
	public void run() {
		final ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(this.host);
		factory.setPort(this.port);
		factory.setUsername(this.username);
		factory.setPassword(this.password);
		factory.setAutomaticRecoveryEnabled(true);

		log.info("Starting rabbitMQ consumer: " + Thread.currentThread().getName());

		try {
			final Connection connection = factory.newConnection();
			final Channel channel = connection.createChannel();
			final DefaultConsumer consumer = new DefaultConsumer(channel) {

				@Override
				public void handleDelivery(final String consumerTag,
						final Envelope envelope,
						final AMQP.BasicProperties properties,
						final byte[] body)
						throws IOException {

					try {
						getLocalQueue().offer(new String(body, "UTF-8"));
					} catch (final Throwable e) {
						log.error("Error processing event", e);
					} finally {
						getChannel().basicAck(envelope.getDeliveryTag(), false);
					}
				}
			};

			channel.queueDeclare(this.queue, true, false, false, null);
			channel.basicConsume(this.queue, false, consumer);
		} catch (final Exception e) {
			log.error("Error creating consumer", e);
		}
	}

	public String getQueue() {
		return this.queue;
	}

	public String getHost() {
		return this.host;
	}

	public int getPort() {
		return this.port;
	}

	public String getUsername() {
		return this.username;
	}

	public String getPassword() {
		return this.password;
	}

	public LbsQueue<String, Event> getLocalQueue() {
		return this.localQueue;
	}

}
