package eu.dnetlib.data.tool.mapreduce; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.List; import org.apache.cassandra.avro.Column; import org.apache.cassandra.avro.ColumnOrSuperColumn; import org.apache.cassandra.avro.Mutation; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.google.common.collect.Iterables; public class ReduceToCassandra extends Reducer> { public final static String MDSTORE_PREFIX = "M/"; public final static String RECORD_PREFIX = "R/"; @Override public void reduce(Text key, Iterable record, Context context) throws IOException, InterruptedException { String[] split = key.toString().split("\\|"); final String objId = RECORD_PREFIX + split[0]; final String mdId = MDSTORE_PREFIX + split[1]; final String value = Text.decode(Iterables.getOnlyElement(record).getBytes()); context.write(ByteBufferUtil.bytes(objId), Collections.singletonList(getMutation(mdId, value))); context.write(ByteBufferUtil.bytes(mdId), Collections.singletonList(getMutation(objId, ""))); } private Mutation getMutation(String columnName, String value) { Column c = new Column(); c.name = ByteBuffer.wrap(Arrays.copyOf(columnName.getBytes(), columnName.length())); c.value = ByteBuffer.wrap(value.getBytes()); c.timestamp = System.currentTimeMillis() * 1000; Mutation m = new Mutation(); m.column_or_supercolumn = new ColumnOrSuperColumn(); m.column_or_supercolumn.column = c; return m; } }