package eu.dnetlib.lbs.events.output;

import java.util.concurrent.atomic.AtomicLong;

import javax.annotation.PostConstruct;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.annotation.Autowired;

import eu.dnetlib.lbs.elasticsearch.Event;
import eu.dnetlib.lbs.subscriptions.Subscription;
import eu.dnetlib.lbs.utils.LbsQueue;
import eu.dnetlib.lbs.utils.QueueManager;
import eu.dnetlib.lbs.utils.ThreadManager;

public abstract class AbstractNotificationDispatcher<T> implements NotificationDispatcher, BeanNameAware, Runnable {

	private String dispatcherName;

	@Autowired
	private QueueManager queueManager;

	@Autowired
	private ThreadManager threadManager;

	private LbsQueue<T, T> queue;

	private final AtomicLong count = new AtomicLong(0);

	private static final Log log = LogFactory.getLog(AbstractNotificationDispatcher.class);

	@PostConstruct
	public void init() {
		this.queue = this.queueManager.newQueue(this.dispatcherName + "-queue");
		this.threadManager.newThread(this.dispatcherName, this);
	}

	@Override
	public void run() {
		while (true) {
			for (final T message : this.queue.takeList()) {
				if (message != null) {
					try {
						performAction(message);
						this.count.incrementAndGet();
					} catch (final Throwable e) {
						log.error("Error sending notification", e);
						this.queue.offer(message);
					}
				}
			}
		}
	}

	@Override
	public void sendNotification(final Subscription subscription, final Event... events) {
		try {
			this.queue.offer(prepareAction(subscription, events));
		} catch (final Exception e) {
			log.error("Error sending notification", e);
		}
	}

	abstract protected T prepareAction(final Subscription subscription, final Event... events) throws Exception;

	abstract protected void performAction(final T message) throws Exception;

	@Override
	public String getDispatcherName() {
		return this.dispatcherName;
	}

	public void setDispatcherName(final String dispatcherName) {
		this.dispatcherName = dispatcherName;
	}

	@Override
	public long count() {
		return this.count.get();
	}

	@Override
	public void resetCount() {
		this.count.set(0);
	}

	@Override
	public void setBeanName(final String name) {
		if (StringUtils.isBlank(getDispatcherName())) {
			setDispatcherName(name);
		}
	}

}
