package eu.dnetlib.data.hadoop.action;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;

import eu.dnetlib.data.hadoop.config.ClusterName;
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
import eu.dnetlib.data.hadoop.hdfs.SequenceFileWriterFactory;
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;

public class SequenceFileFeeder {

	private static final Log log = LogFactory.getLog(SequenceFileFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM

	private ResultSetClientFactory resultSetClientFactory;

	@Autowired
	protected ConfigurationEnumerator configurationEnumerator;

	@Autowired
	protected SequenceFileWriterFactory sequenceFileWriterFactory;

	public int feed(final String epr, final ClusterName clusterName, final String path) throws IOException {
		return doWrite(epr, clusterName, path);
	}

	private int doWrite(final String epr, final ClusterName clusterName, final String path) throws IOException {
		final SequenceFile.Writer writer = sequenceFileWriterFactory.getSequenceFileWriter(Text.class, Text.class, getConf(clusterName), new Path(path));
		log.info("Opened sequence file writer: " + writer.toString());

		try {
			final Text idText = new Text();
			final Text bodyText = new Text();
			int count = 0;
			int nulls = 0;
			for (String record : getResultSetClientFactory().getClient(epr)) {
				if (StringUtils.isBlank(record)) {
					nulls++;
				} else {
					idText.set(String.valueOf(count++));
					bodyText.set(record);
					writer.append(idText, bodyText);
				}
			}
			log.info("written " + count + " records in sequence file: " + path);
			if (nulls > 0) {
				log.warn("found " + nulls + " records in epr!");
			}
			return count;
		} finally {
			writer.close();
		}
	}

	protected Configuration getConf(final ClusterName clusterName) {
		return configurationEnumerator.get(clusterName);
	}

	public ResultSetClientFactory getResultSetClientFactory() {
		return resultSetClientFactory;
	}

	@Required
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
		this.resultSetClientFactory = resultSetClientFactory;
	}

}
