package eu.dnetlib.data.mapreduce.es;

import java.io.IOException;

import eu.dnetlib.data.proto.DNGFProtos.DNGF;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;

public class ElasticsearchFeedMapper extends TableMapper<NullWritable, BytesWritable> {

	@Override
	protected void setup(final Context context) throws IOException, InterruptedException {

	}

	@Override
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {

		/*
		final DNGFRowKeyDecoder keyDecoder = DNGFRowKeyDecoder.decode(keyIn.copyBytes());

		final DNGF oaf = mergeUpdates(value, context, keyDecoder.getType(), keyDecoder);

		if (isValid(oaf)) {

			context.write(NullWritable.get(), new BytesWritable(Bytes.toBytes(new JsonFormat().printToString(oaf))));
		}
		*/
	}

	/*
	private DNGF mergeUpdates(final Result value, final Context context, final Type type, final DNGFRowKeyDecoder keyDecoder)
			throws InvalidProtocolBufferException {
		try {
			return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString())));
		} catch (final InvalidProtocolBufferException e) {
			System.err.println(String.format("Unable to parse proto (Type: %s) in row: %s", type.toString(), keyDecoder.getKey()));
			throw e;
		}
	}

	*/

	private boolean isValid(final DNGF oaf) {
		return (oaf != null) && oaf.isInitialized();
	}

}
