package eu.dnetlib.data.hadoop.blackboard;

import java.io.IOException;

import eu.dnetlib.enabling.resultset.client.ResultSetClient;
import eu.dnetlib.rmi.common.ResultSet;
import eu.dnetlib.rmi.data.hadoop.ClusterName;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.springframework.beans.factory.annotation.Autowired;

import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
import eu.dnetlib.data.hadoop.hdfs.SequenceFileWriterFactory;
import org.springframework.beans.factory.annotation.Value;

public class SequenceFileFeeder {

	private static final Log log = LogFactory.getLog(SequenceFileFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM

	@Autowired
	protected ConfigurationEnumerator configurationEnumerator;

	@Autowired
	protected SequenceFileWriterFactory sequenceFileWriterFactory;

	@Autowired
	private ResultSetClient resultSetClient;

	@Value("${services.hadoop.hdfs.writer.bulk.size}")
	private int bulkSize;

	public int feed(final ResultSet<String> resultSet, final ClusterName clusterName, final String path) throws IOException {
		return doWrite(resultSet, clusterName, path);
	}

	private int doWrite(final ResultSet<String> resultSet, final ClusterName clusterName, final String path) throws IOException {
		try(final SequenceFile.Writer writer = sequenceFileWriterFactory.getSequenceFileWriter(Text.class, Text.class, getConf(clusterName), new Path(path))) {
            log.debug("Opened sequence file writer: " + writer.toString());
            final Text idText = new Text();
			final Text bodyText = new Text();
			int count = 0;
			int nulls = 0;
			for (String record : resultSetClient.iter(resultSet, String.class)) {
				if (StringUtils.isBlank(record)) {
					nulls++;
				} else {
					idText.set(String.valueOf(count++));
					bodyText.set(record);
					writer.append(idText, bodyText);
					if (count % bulkSize == 0) {
						writer.hflush();
						writer.hsync();
                        log.debug(String.format("%s records so far %s", writer.toString(), count));
                    }
				}
			}
			log.info("written " + count + " records in sequence file: " + path);
			if (nulls > 0) {
				log.warn("found " + nulls + " records in epr!");
			}
			return count;
		}
	}

	protected Configuration getConf(final ClusterName clusterName) {
		return configurationEnumerator.get(clusterName);
	}

}
