package eu.dnetlib.lbs.events.input;

import javax.annotation.PostConstruct;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import eu.dnetlib.lbs.elasticsearch.Event;
import eu.dnetlib.lbs.events.manager.EventManager;
import eu.dnetlib.lbs.events.manager.EventManagerFactory;
import eu.dnetlib.lbs.properties.RabbitMQProperties;
import eu.dnetlib.lbs.topics.TopicTypeRepository;
import eu.dnetlib.lbs.utils.EventVerifier;
import eu.dnetlib.lbs.utils.JsonMessageToEventFunction;
import eu.dnetlib.lbs.utils.LbsQueue;
import eu.dnetlib.lbs.utils.QueueManager;
import eu.dnetlib.lbs.utils.ThreadManager;

@Component
@ConditionalOnProperty(prefix = "lbs.rabbit", name = "enabled")
public class RabbitMQConsumerLauncher {

	@Autowired
	private EventManagerFactory eventManagerFactory;
	@Autowired
	private RabbitMQConsumerFactory rabbitMQConsumerFactory;
	@Autowired
	private QueueManager queueManager;
	@Autowired
	private TopicTypeRepository topicTypeRepo;
	@Autowired
	private RabbitMQProperties props;
	@Autowired
	private ThreadManager threadManager;

	private static final Log log = LogFactory.getLog(RabbitMQConsumerLauncher.class);

	@PostConstruct
	public void init() {
		for (int i = 0; i < this.props.getNumberOfConsumers(); i++) {
			final LbsQueue<String, Event> queue =
					this.queueManager.newQueue("event-manager-" + i, new JsonMessageToEventFunction(), new EventVerifier(this.topicTypeRepo));
			final EventManager manager = this.eventManagerFactory.newEventManager(queue);
			final RabbitMQConsumer consumer = this.rabbitMQConsumerFactory.newConsumer(queue);

			log.info("Launching indexer and consumer threads: " + i);

			this.threadManager.newThread("rabbit-manager-" + i, manager);
			this.threadManager.newThread("rabbit-consumer-" + i, consumer);
		}
	}

}
