package eu.dnetlib.lbs.events.manager;

import java.util.List;
import java.util.stream.StreamSupport;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import eu.dnetlib.lbs.elasticsearch.Event;
import eu.dnetlib.lbs.elasticsearch.EventRepository;
import eu.dnetlib.lbs.elasticsearch.Notification;
import eu.dnetlib.lbs.elasticsearch.NotificationRepository;
import eu.dnetlib.lbs.events.output.DispatcherManager;
import eu.dnetlib.lbs.subscriptions.NotificationFrequency;
import eu.dnetlib.lbs.subscriptions.Subscription;
import eu.dnetlib.lbs.subscriptions.SubscriptionRepository;
import eu.dnetlib.lbs.utils.LbsQueue;

public class EventManager implements Runnable {

	private final EventRepository eventRepository;
	private final NotificationRepository notificationRepository;
	private final SubscriptionRepository subscriptionRepo;
	private final DispatcherManager dispatcherManager;
	private final LbsQueue<String, Event> queue;

	private static final Log log = LogFactory.getLog(EventManager.class);

	public EventManager(final EventRepository eventRepository, final NotificationRepository notificationRepository,
			final SubscriptionRepository subscriptionRepo,
			final DispatcherManager dispatcherManager,
			final LbsQueue<String, Event> queue) {
		this.eventRepository = eventRepository;
		this.notificationRepository = notificationRepository;
		this.subscriptionRepo = subscriptionRepo;
		this.dispatcherManager = dispatcherManager;
		this.queue = queue;
	}

	public boolean add(final String s) {
		return this.queue.offer(s);
	}

	@Override
	public void run() {
		log.info("Event indexer started: " + Thread.currentThread().getName());
		while (true) {
			final List<Event> list = this.queue.takeList();
			this.eventRepository.saveAll(list);

			// TODO: cache of Subscription with NotificationFrequency.realtime
			list.stream().filter(Event::isInstantMessage).forEach(e -> {
				final Iterable<Subscription> iter = this.subscriptionRepo.findByTopic(e.getTopic());
				StreamSupport.stream(iter.spliterator(), false)
						.filter(s -> s.verifyEventConditions(e))
						.filter(s -> s.getFrequency() == NotificationFrequency.realtime)
						.forEach(s -> {
							final Notification n = new Notification(s, e);
							this.notificationRepository.save(n);
							this.dispatcherManager.dispatch(s, e);
						});
			});
		}
	}

	public LbsQueue<String, Event> getQueue() {
		return this.queue;
	}

}
