package eu.dnetlib.oai.init.hdfs;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;

import eu.dnetlib.data.hadoop.hdfs.SequenceFileUtils;
import eu.dnetlib.data.information.oai.publisher.info.MDFInfo;
import eu.dnetlib.data.oai.store.mongo.MongoPublisherStore;
import eu.dnetlib.data.oai.store.mongo.MongoPublisherStoreDAO;
import eu.dnetlib.data.oai.store.sync.OAIStoreInitializer;
import eu.dnetlib.miscutils.collections.Pair;

/**
 * Schedulable class for the automatic configuration of the OAI publisher.
 * <p>
 * It reads the configuration profile and properly initializes all the needed stores and relative indices with content from a HDFS file.
 * </p>
 * 
 * @author alessia
 * 
 */
public class PublisherInitializerHDFS extends OAIStoreInitializer {

	private static final Log log = LogFactory.getLog(PublisherInitializerHDFS.class); // NOPMD by marko on 11/24/08 5:02 PM

	private Path path;
	private Configuration conf;
	private int limitRead = -1;

	@Override
	protected void synchronizeContent(final MongoPublisherStore store) {
		PublisherInitializerHDFS.log.info("Synchronizing content for stores: \n\tformat:" + getMdfInfo().getSourceFormatName() + " \n\tinterpretation: "
				+ getMdfInfo().getSourceFormatInterpretation() + " \n\tlayout: " + getMdfInfo().getSourceFormatLayout());

		try {
			Iterable<Pair<Text, Text>> records = SequenceFileUtils.read(path, conf, limitRead);

			Iterable<String> recordsIterable = Iterables.transform(records, new Function<Pair<Text, Text>, String>() {

				@Override
				public String apply(final Pair<Text, Text> pair) {
					return pair.getValue().toString();
				}
			});
			store.feed(recordsIterable, "");
		} catch (Exception e) {
			PublisherInitializerHDFS.log.fatal("Can't synchronize content for stores: \n\tformat:" + getMdfInfo().getSourceFormatName()
					+ " \n\tinterpretation: " + getMdfInfo().getSourceFormatInterpretation() + " \n\tlayout: " + getMdfInfo().getSourceFormatLayout());
			PublisherInitializerHDFS.log.fatal(e.getMessage());
			throw new RuntimeException(e);
		}
	}

	// private boolean isResult(String entityCode) {
	// return entityCode.equals("50");
	// }

	public PublisherInitializerHDFS() {
		super();
		// TODO Auto-generated constructor stub
	}

	public PublisherInitializerHDFS(final MDFInfo mdfInfo, final MongoPublisherStoreDAO oaiStoreDao, final boolean forceContentSyncronization, final Path path,
			final Configuration conf) {
		this(mdfInfo, oaiStoreDao, forceContentSyncronization, path, conf, -1);
	}

	public PublisherInitializerHDFS(final MDFInfo mdfInfo, final MongoPublisherStoreDAO oaiStoreDao, final boolean forceContentSyncronization, final Path path,
			final Configuration conf, final int limitRead) {
		super(mdfInfo, oaiStoreDao, forceContentSyncronization);
		this.path = path;
		this.conf = conf;
		this.limitRead = limitRead;
	}

	public Path getPath() {
		return path;
	}

	public void setPath(final Path path) {
		this.path = path;
	}

	public Configuration getConf() {
		return conf;
	}

	public void setConf(final Configuration conf) {
		this.conf = conf;
	}

	public int getLimitRead() {
		return limitRead;
	}

	public void setLimitRead(final int limitRead) {
		this.limitRead = limitRead;
	}
}
