package eu.dnetlib.r2d2.cassandra;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.perf4j.LoggingStopWatch;
import org.perf4j.StopWatch;
import org.springframework.beans.factory.annotation.Required;

import eu.dnetlib.r2d2.FileStore;

import me.prettyprint.cassandra.service.CassandraClientPool;
import me.prettyprint.cassandra.service.CassandraClient;
import me.prettyprint.cassandra.service.Keyspace;
import me.prettyprint.cassandra.service.PoolExhaustedException;
import me.prettyprint.cassandra.service.CassandraClient.FailoverPolicy;

/**
 * Not finished: supports only writes and not tested
 * 
 * @author marko
 * 
 */
public class CassandraFileStore implements FileStore {
	private static final int READ_SIZE = 10 * 8092;

	private static final int CHUNK_SIZE = 1 * 1024 * 1024;

	private static final Log log = LogFactory.getLog(CassandraFileStore.class); // NOPMD by marko on 11/24/08 5:02 PM

	private static final String CASSANDRA_KEYSPACE = "Scholarlynk";
	private static final String CF_NAME = "Chunks";

	private String keyspace = CASSANDRA_KEYSPACE;
	private String columnFamilyName = CF_NAME;
	private ConsistencyLevel consistencyLevel = ConsistencyLevel.ZERO;

	private CassandraClientPool cassandraClientPool;

	public void write(String id, InputStream input) throws IllegalStateException, PoolExhaustedException, Exception {
		log.info("WRITING file to cassandra " + id);

		CassandraClient client = cassandraClientPool.borrowClient();
		try {
			Keyspace ks = client.getKeyspace(keyspace, consistencyLevel);
			SliceRange r;
			SlicePredicate p;

			byte[] tmp = new byte[READ_SIZE];
			ByteArrayOutputStream buffer = new ByteArrayOutputStream(READ_SIZE);

			int chunk = 0;

			while (true) {

				int size = input.read(tmp);

				if (size < 0)
					log.info("finished reading");
				//				log.info("READING piece " + size);

				if (size > 0)
					buffer.write(tmp, 0, size);

				if ((size <= 0 || buffer.size() >= CHUNK_SIZE) && buffer.size() > 0) {
					//					StopWatch stopWatch = new LoggingStopWatch("storing");
					//					log.info("WRITING chunk " + chunk + " size " + buffer.size());
					ColumnPath cp = new ColumnPath(columnFamilyName);
					cp.setColumn("chunk".getBytes());
					ks.insert(id + "_" + new Integer(chunk).toString(), cp, buffer.toByteArray());
					//					cp.setColumn(new Integer(chunk).toString().getBytes());
					//					ks.insert(id, cp, buffer.toByteArray());

					//					stopWatch.stop();
					chunk++;

					buffer.reset();
				}
				if (size <= 0)
					break;
			}

			ColumnPath cp = new ColumnPath(columnFamilyName);
			cp.setColumn("metadata".getBytes());
			ks.insert(id, cp, "{type:'file'}".getBytes());

			log.info("Stored upload metadata in cassandra");
		} finally {
			cassandraClientPool.releaseClient(client);
		}

	}

	@Override
	public void read(String id, OutputStream output) throws Exception {
		throw new IllegalStateException("not implemented");
	}

	public CassandraClientPool getCassandraClientPool() {
		return cassandraClientPool;
	}

	@Required
	public void setCassandraClientPool(CassandraClientPool cassandraClientPool) {
		this.cassandraClientPool = cassandraClientPool;
	}

	public String getKeyspace() {
		return keyspace;
	}

	public void setKeyspace(String keyspace) {
		this.keyspace = keyspace;
	}

}
