package eu.dnetlib.data.tool.mapreduce; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import org.apache.cassandra.avro.Column; import org.apache.cassandra.avro.ColumnOrSuperColumn; import org.apache.cassandra.avro.Mutation; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import com.google.common.collect.Lists; public class CassandraMapper extends Mapper> { public static long FUTURE_LIMIT = 100000000000000L; public final static String MDSTORE_SUFFIX = "/M"; public final static String RECORD_SUFFIX = "/R"; public final static String DATE_SUFFIX = "/D"; @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String[] split = key.toString().split("\\|"); final String objId = split[0] + RECORD_SUFFIX; final String mdId = split[1] + MDSTORE_SUFFIX; final String date = split[1] + DATE_SUFFIX; String record = Text.decode(value.getBytes()); final String dateColumn = getDateColumn(objId); // addInsertion(objId, CF_BODY, HFactory.createStringColumn(mdId.getMdIdKey(), record)) context.write(ByteBufferUtil.bytes(objId), Lists.newArrayList(getMutation(mdId, record))); // addInsertion(mdId.getMdIdKey(), CF_BODY, HFactory.createStringColumn(objId, dateColumn)) context.write(ByteBufferUtil.bytes(mdId), Lists.newArrayList(getMutation(objId, dateColumn))); // addInsertion(mdId.getMdIdDate(), CF_BODY, HFactory.createStringColumn(dateColumn, objId)) context.write(ByteBufferUtil.bytes(date), Lists.newArrayList(getMutation(dateColumn, objId))); } protected Mutation getMutation(String columnName, String value) { Column c = new Column(); c.name = ByteBuffer.wrap(Arrays.copyOf(columnName.getBytes(), columnName.length())); c.value = ByteBuffer.wrap(value.getBytes()); c.timestamp = System.currentTimeMillis() * 1000; Mutation m = new Mutation(); m.column_or_supercolumn = new ColumnOrSuperColumn(); m.column_or_supercolumn.column = c; return m; } private String getDateColumn(String objId) { long time = System.currentTimeMillis(); return String.valueOf(FUTURE_LIMIT - time) + "/" + objId; } }