package eu.dnetlib.data.collective.harvest.provider; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.StringWriter; import java.util.Collection; import java.util.concurrent.BlockingQueue; import org.apache.commons.io.DirectoryWalker; import org.apache.commons.io.IOUtils; import org.apache.commons.io.filefilter.FileFilterUtils; import org.apache.commons.io.filefilter.HiddenFileFilter; import org.apache.commons.io.filefilter.IOFileFilter; import org.apache.commons.io.filefilter.SuffixFileFilter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.xml.sax.SAXException; import com.google.common.collect.Lists; import eu.dnetlib.data.collective.harvest.provider.DataProvider.FileType; /** * FileWalker runs recursively under a directory structure starting from a given path and for each file reads it's * content and puts it in a shared queue. * * Acts as a producer. * * @author claudio * * @param * Type of expected content to extract from files NB. Generally strings. */ public class FileWalker extends DirectoryWalker { /** * Logger. */ private static final Log log = LogFactory.getLog(FileWalker.class); /** * Shared queue. */ private BlockingQueue queue; /** * Reference to the starting directory */ private File source; /** * specifies the file type; */ private FileType type; /** * a flag used to inform that source has no more elements. */ public final static Object done = new Object(); public static String IGNORE_PREFIX = "."; public static String IGNORE_SUFFIX = "~"; static IOFileFilter fileFilter = FileFilterUtils.notFileFilter(FileFilterUtils.orFileFilter(new SuffixFileFilter("~"), HiddenFileFilter.HIDDEN)); /** * Builds a FileWalker. * * @param sourcePath * the * @param queue */ public FileWalker(BlockingQueue queue, final FileType type, final File source) { super(fileFilter, -1); this.source = source; this.type = type; this.queue = queue; } /** * Wrapper method, starts the walk and when it's done adds the flag to the queue. */ @SuppressWarnings("unchecked") public void doWalk() throws IOException { log.info("starting to iterate " + type.toString() + " files under " + source.getAbsolutePath()); walk(source, queue); enqueue(queue, (T) done); } @Override @SuppressWarnings({ "unchecked", "rawtypes" }) protected void handleFile(File file, int depth, Collection results) throws IOException { enqueue((BlockingQueue) results, (T) readFile(file)); } @Override @SuppressWarnings("rawtypes") protected boolean handleDirectory(File directory, int depth, Collection results) throws IOException { if (directory.getName().startsWith(IGNORE_PREFIX)) return false; return super.handleDirectory(directory, depth, results); } ///////////////// helpers /** * Adds the element to the queue */ private void enqueue(BlockingQueue queue, T element) { try { queue.put(element); } catch (InterruptedException e) { log.warn("ops... ", e); } } /** * given a file, return its content as a string * * @param file * the source * @return the file content as a single string * @throws IOException * @throws TikaException * @throws SAXException */ private String readFile(final File file) throws IOException { FileInputStream fis = new FileInputStream(file); String fileContent = null; switch (type) { case TEXT: final StringWriter sw = new StringWriter(); IOUtils.copy(fis, sw); sw.flush(); fileContent = sw.toString(); break; default: throw new UnsupportedOperationException("FileType should be one of: " + Lists.newArrayList(FileType.values())); } fis.close(); return fileContent; } }