package eu.dnetlib.data.mapreduce.util; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.protobuf.InvalidProtocolBufferException; import eu.dnetlib.data.graph.model.DNGFDecoder; import eu.dnetlib.data.proto.DNGFProtos.DNGF; import eu.dnetlib.data.proto.TypeProtos.Type; import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset; import eu.dnetlib.data.transform.DNGFEntityMerger; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Mapper.Context; public class UpdateMerger { private static final String UPDATE_MERGER = "UPDATE_MERGE"; private static final String N_MERGES = "N_MERGES"; @SuppressWarnings("rawtypes") public static DNGF mergeBodyUpdates(final Context context, final Map map, final Type type) throws InvalidProtocolBufferException { final Map stringMap = Maps.newHashMap(); for(Entry e : map.entrySet()) { stringMap.put(Bytes.toString(e.getKey()), e.getValue()); } return doMerge(context, stringMap, type); } public static DNGF mergeBodyUpdates(final Map map, final Type type) throws InvalidProtocolBufferException { return doMerge(null, map, type); } private static DNGF doMerge(final Context context, final Map map, final Type type) throws InvalidProtocolBufferException { final byte[] value = map.get(type.toString()); if (value == null) return null; DNGF.Builder builder = DNGF.newBuilder(DNGFDecoder.decode(value, WdsDataset.geolocation).getDNGF()); final List keys = Lists.newArrayList(); // we fetch all the body updates for (final String o : map.keySet()) { if (o.startsWith("update_")) { keys.add(o); } } if (!keys.isEmpty()) { // we merge all the sorted updates with the body Collections.sort(keys); for (final String k : keys) { final DNGF update = DNGF.parseFrom(map.get(k)); // System.out.println("\n\nBODY: \n" + body.build().toString()); // System.out.println("UPDATE: \n" + update.toString()); builder.mergeFrom(update); // System.out.println("UDPATED BODY: \n" + body.build().toString() + "\n\n"); } builder = DNGFEntityMerger.merge(builder); if (context != null) { context.getCounter(UPDATE_MERGER, N_MERGES).increment(keys.size()); } } return builder.build(); } }