package eu.dnetlib.data.collective.harvest.provider; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.io.DirectoryWalker.CancelException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * DataProvider allows to iterate over the file's content found under a directory structure of a given path. * * It helps to reduce memory usage when you need to handle well populated and deep directory structures as alternative * * * @author claudio * * @param */ public class DataProvider implements Iterable { /** * logger. */ private static final Log log = LogFactory.getLog(DataProvider.class); /** * Handled file types * * @author claudio */ public enum FileType { TEXT, PDF, DOC } /** * Handled protocols * * @author claudio */ public enum Protocols { file, } /** * executor for the producer thread. */ private ExecutorService producer; /** * shared queue. */ private BlockingQueue queue; /** * data source directory. */ private File source = null; /** * specifies the file type; */ private FileType type; /** * Builds a DataProvider. * * Must be used by calling readFilesUnder method. */ public DataProvider(final FileType type) { this.producer = Executors.newSingleThreadExecutor(); this.queue = new ArrayBlockingQueue(20); this.type = type; } /** * Builds a DataProvider. * * Can be used as iterator or by calling readFilesUnder method. * * @param sourcePath * @throws URISyntaxException * if baseUrl does not conform to URI syntax * @throws FileNotFoundException * if baseUrl does not locate a directory on the filesystem * @throws MalformedURLException */ public DataProvider(final FileType type, final String baseUrl) throws URISyntaxException, FileNotFoundException { this(type); this.source = getSource(baseUrl); } @Override public Iterator iterator() { if (source != null) return doReadFilesUnder(source); return null; } /** * Wrapper method that call doReadFilesUnder * * @param sourcePath * data source path * @return a BlockingStream over the files content T * @throws URISyntaxException * @throws FileNotFoundException * @throws MalformedURLException */ public BlockingStream readFilesUnder(final String baseUrl) throws FileNotFoundException, URISyntaxException { return doReadFilesUnder(getSource(baseUrl)); } /** * * * @param baseUrl * @return * @throws URISyntaxException * @throws FileNotFoundException * @throws MalformedURLException */ private File getSource(String baseUrl) throws FileNotFoundException, URISyntaxException { final File file = new File(URI.create(baseUrl).getPath()); if (!file.exists()) throw new FileNotFoundException("file " + baseUrl + " doesn't exist or is not a directory"); return file; } /** * Method actually performs the job by starting a thread with a FileWalker that populates the queue. * * @param sourcePath * data source path * @return a BlockingStream over the files content T */ private BlockingStream doReadFilesUnder(final File source) { log.info("reading files under " + source.getAbsolutePath()); if (source.isDirectory()) doReadFromDirectory(queue); else doReadFromArchive(queue); return new BlockingStream(queue); } private void doReadFromArchive(BlockingQueue queue) { final ArchiveWalker walker = new ArchiveWalker(queue, type, source); producer.execute(new Runnable() { @Override public void run() { try { walker.doWalk(); } catch (IOException e) { throw new IllegalStateException(e); } log.info("finished to iterate under " + source.getAbsolutePath()); } }); } private void doReadFromDirectory(BlockingQueue queue) { final LinkedBlockingQueue dummyQueue = new LinkedBlockingQueue(); final FileWalker tryWalker = new TryFileWalker(dummyQueue, type, source); try { tryWalker.doWalk(); } catch (CancelException e) { // expected } catch (IOException e) { // if the walker cannot read, percolate the exception! log.info("got an exceptionn while trying to read one file from FileWalker, percolating", e); throw new IllegalStateException(e); } final FileWalker walker = new FileWalker(queue, type, source); producer.execute(new Runnable() { @Override public void run() { try { walker.doWalk(); } catch (IOException e) { throw new IllegalStateException(e); } log.info("finished to iterate under " + source.getAbsolutePath()); } }); } }