package eu.dnetlib.data.mapreduce.util; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; import com.google.common.collect.Maps; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Mapper.Context; import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; import eu.dnetlib.data.proto.OafProtos.Oaf; import eu.dnetlib.data.transform.OafEntityMerger; public class UpdateMerger { private static final String UPDATE_MERGER = "UPDATE_MERGE"; private static final String N_MERGES = "N_MERGES"; @SuppressWarnings("rawtypes") public static Oaf mergeBodyUpdates(final Context context, final Map map) throws InvalidProtocolBufferException { final Map stringMap = Maps.newHashMap(); for(Entry e : map.entrySet()) { stringMap.put(Bytes.toString(e.getKey()), e.getValue()); } return doMerge(context, stringMap); } public static Oaf mergeBodyUpdates(final Map map) throws InvalidProtocolBufferException { return doMerge(null, map); } private static Oaf doMerge(final Context context, final Map map) throws InvalidProtocolBufferException { final byte[] value = map.get(DedupUtils.BODY_S); if (value == null) return null; Oaf.Builder builder = Oaf.newBuilder(Oaf.parseFrom(value)); final List keys = Lists.newArrayList(); // we fetch all the body updates for (final String o : map.keySet()) { if (o.startsWith("update_")) { keys.add(o); } } if (!keys.isEmpty()) { // we merge all the sorted updates with the body Collections.sort(keys); for (final String k : keys) { final Oaf update = Oaf.parseFrom(map.get(k)); // System.out.println("\n\nBODY: \n" + body.build().toString()); // System.out.println("UPDATE: \n" + update.toString()); builder.mergeFrom(update); // System.out.println("UDPATED BODY: \n" + body.build().toString() + "\n\n"); } builder = OafEntityMerger.merge(builder); if (context != null) { context.getCounter(UPDATE_MERGER, N_MERGES).increment(keys.size()); } } return builder.build(); } }