package eu.dnetlib.data.collective.harvest.provider; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * BlockingStream fetches elements from a shared queue, which must be populated by a producer. * * @author claudio * * @param * Type returned by the next() method */ public class BlockingStream implements Iterator { /** * Logger. */ private static final Log log = LogFactory.getLog(BlockingStream.class); /** * shared queue. */ private BlockingQueue queue; /** * next element to be returned. */ private T nextReturn; /** * flag. */ private boolean hasNext = true; /** * Builds a BlockingStream. * * @param queue * the shared queue. */ public BlockingStream(BlockingQueue queue) { super(); this.queue = queue; } @Override public boolean hasNext() { while (nextReturn == null) { try { nextReturn = (T) queue.poll(2, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error("queue timed out ", e); return false; } if (FileWalker.done == (Object) nextReturn) hasNext = false; } return hasNext; } @Override public T next() { if (!hasNext()) throw new NoSuchElementException(); T retVal = nextReturn; nextReturn = null; return retVal; } @Override public void remove() { throw new UnsupportedOperationException("Not available."); } }