package eu.dnetlib.lbs.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class LbsQueue<T, K> {

	private final BlockingQueue<K> innerQueue = new LinkedBlockingQueue<>();
	private final AtomicLong lostRecords = new AtomicLong(0);
	private final AtomicLong skippedRecords = new AtomicLong(0);
	private final AtomicLong invalidRecords = new AtomicLong(0);

	private final String name;
	private final Predicate<K> predicate;
	private final Function<T, K> func;
	private final int maxElements;

	private static final Log log = LogFactory.getLog(LbsQueue.class);

	public LbsQueue(final String name, final Function<T, K> func, final Predicate<K> predicate, final int maxElements) {
		this.name = name;
		this.func = func;
		this.predicate = predicate;
		this.maxElements = maxElements;
	}

	public List<K> takeList() {
		try {
			final List<K> list = new ArrayList<>();
			list.add(this.innerQueue.take());
			if (this.maxElements > 1) {
				this.innerQueue.drainTo(list, this.maxElements - 1);
			}
			return list;
		} catch (final Throwable e) {
			log.error("Error indexing record", e);
			return null;
		}
	}

	public K takeOne() {
		try {
			return this.innerQueue.take();
		} catch (final Throwable e) {
			log.error("Error indexing record", e);
			return null;
		}
	}

	public boolean offer(final T obj) {
		try {
			final K newObj = this.func.apply(obj);
			if (newObj == null) {
				log.warn("I received a NULL object");
				this.invalidRecords.incrementAndGet();
			} else if (this.predicate.test(newObj)) {
				if (this.innerQueue.offer(newObj)) {
					return true;
				} else {
					this.lostRecords.incrementAndGet();
				}
			} else {
				log.debug("Skipping object: " + obj);
				this.skippedRecords.incrementAndGet();
			}
		} catch (final Throwable e) {
			log.warn("I received an invalid object: " + e);
			this.invalidRecords.incrementAndGet();
		}
		return false;
	}

	public long size() {
		return this.innerQueue.size();
	}

	public long getLostRecords() {
		return this.lostRecords.get();
	}

	public long getSkippedRecords() {
		return this.skippedRecords.get();
	}

	public long getInvalidRecords() {
		return this.invalidRecords.get();
	}

	public void resetCounters() {
		this.lostRecords.set(0);
		this.skippedRecords.set(0);
		this.invalidRecords.set(0);
	}

	public String getName() {
		return this.name;
	}

	public Predicate<K> getPredicate() {
		return this.predicate;
	}

	public Function<T, K> getFunc() {
		return this.func;
	}

}
