package eu.dnetlib.data.mapreduce.util; import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.util.Bytes; import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; import eu.dnetlib.data.proto.OafProtos.Oaf; public class UpdateMerger { public static byte[] mergeBodyUpdates(final Map map) throws InvalidProtocolBufferException { byte[] value = map.get(Bytes.toBytes("body")); if (value == null) { return null; } final Oaf.Builder body = Oaf.newBuilder(Oaf.parseFrom(value)); final List keys = Lists.newArrayList(); // we fetch all the body updates for (byte[] o : map.keySet()) { final String sKey = Bytes.toString(o); if (sKey.startsWith("update_")) { keys.add(sKey); } } // we merge all the sorted updates with the body Collections.sort(keys); for (String k : keys) { body.mergeFrom(map.get(Bytes.toBytes(k))); } return body.build().toByteArray(); } }