package eu.dnetlib.data.tool; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Type; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import eu.dnetlib.data.collective.harvest.provider.DataProvider; import eu.dnetlib.data.collective.harvest.provider.DataProvider.FileType; import eu.dnetlib.data.tool.parser.RecordParser; import eu.dnetlib.data.tool.parser.StreamingRecordParser; public class FileSequencer { public static final String KEY_SPLIT = "|"; private Map> repoToStoreId = Maps.newHashMap(); private Map> metadata = Maps.newHashMap(); private RecordParser parser = new StreamingRecordParser(); private Configuration conf = new Configuration(); private URI uriToDestFile; private URI uriToMetadataFile; private String source; public FileSequencer(String[] args) throws URISyntaxException, IOException { source = args[0]; uriToDestFile = new URI(args[1]); uriToMetadataFile = new URI(args[1] + "_metadata"); repoToStoreId = loadFromFile(args[2]); } public static void main(String[] args) { FileSequencer seq; try { seq = new FileSequencer(args); seq.run(); } catch (URISyntaxException e) { System.out.println("error parsing destination uri"); System.exit(1); } catch (IOException e) { System.out.println("IO error"); System.exit(1); } } private Map> loadFromFile(String path) throws IOException { System.out.println("\n loading mapping from " + path); Map> mapping = Maps.newHashMap(); @SuppressWarnings("unchecked") List lines = FileUtils.readLines(new File(path)); for (String line : lines) { String[] split = line.split("\\" + KEY_SPLIT); if (split.length == 5) { String format = split[0]; String layout = split[1]; String interp = split[2]; String repoId = split[3]; String mdId = split[4]; if (metadata.get(mdId) == null) { metadata.put(mdId, new HashMap()); metadata.get(mdId).put("format", format); metadata.get(mdId).put("layout", layout); metadata.get(mdId).put("interp", interp); } if (mapping.get(repoId) == null) mapping.put(repoId, new HashMap()); mapping.get(repoId).put(format, mdId); } else { System.out.println("\n malformed mapping file!"); } } return mapping; } public void run() throws IOException, URISyntaxException { writeDestination(); writeMetadata(); } private void writeMetadata() throws IOException { FileSystem fs = FileSystem.get(uriToMetadataFile, conf); Writer writer = SequenceFile.createWriter(fs, conf, new Path(uriToMetadataFile.getPath()), Text.class, Text.class); Gson gson = new Gson(); Type t = new TypeToken>() {}.getType(); for (Entry> entry : metadata.entrySet()) { String key = entry.getKey(); String value = gson.toJson(entry.getValue(), t); writer.append(new Text(key), new Text(value)); } writer.close(); } private void writeDestination() throws IOException, URISyntaxException, FileNotFoundException { FileSystem fs = FileSystem.get(uriToDestFile, conf); Writer writer = SequenceFile.createWriter(fs, conf, new Path(uriToDestFile.getPath()), Text.class, Text.class); DataProvider provider = new DataProvider(FileType.TEXT, source); for (String record : provider) { Map properties = getRecordProperties(record); if (properties.size() > 0) { String repoId = properties.get(StreamingRecordParser.REPOSITORY_ID); String objId = properties.get(StreamingRecordParser.OBJ_IDENTIFIER); String mdFormat = properties.get(StreamingRecordParser.MD_FORMAT); if (repoId == null || objId == null) System.out.println("incomplete record: " + properties); else { Map mdIds = repoToStoreId.get(repoId); if (mdIds != null && mdIds.containsKey(mdFormat)) { String mdId = mdIds.get(mdFormat); writer.append(new Text(objId + KEY_SPLIT + mdId), new Text(record)); } else { System.out.println("couldn't find mdId for repo: " + repoId); } } } } writer.close(); } private Map getRecordProperties(final String record) { Map properties = Maps.newHashMap(); try { properties = parser.parseRecord(record); } catch (Throwable e) { System.out.println("unable to parse: " + record); } return properties; } }