package eu.dnetlib.data.multimap;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;

import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Required;

import voldemort.client.ClientConfig;
import voldemort.client.SocketStoreClientFactory;
import voldemort.client.StoreClient;
import voldemort.client.StoreClientFactory;
import voldemort.server.VoldemortConfig;
import voldemort.server.VoldemortServer;
import voldemort.versioning.Versioned;

import com.google.gson.Gson;

import eu.dnetlib.data.multimap.rmi.MultiMapService;
import eu.dnetlib.enabling.tools.AbstractBaseService;

@Path("multimap")
public class MultiMapServiceImpl extends AbstractBaseService implements MultiMapService {

	private static final Log log = LogFactory.getLog(MultiMapServiceImpl.class);

	private String VOLDEMORT_HOME;
	private String VOLDEMORT_URL;
	private String VOLDEMORT_N_KEYS;
	private String VOLDEMORT_N_VALUES;
	
	private VoldemortServer server;
	private StoreClientFactory factory;

	private StoreClient<String, String> kkStore;
	private StoreClient<String, String> invertedKkStore;
	private StoreClient<String, String> kvStore;

	public String getVOLDEMORT_HOME() {
		return VOLDEMORT_HOME;
	}

	@Required
	public void setVOLDEMORT_HOME(String vOLDEMORT_HOME) {
		VOLDEMORT_HOME = vOLDEMORT_HOME;
	}
	
	public String getVOLDEMORT_URL() {
		return VOLDEMORT_URL;
	}

	@Required
	public void setVOLDEMORT_URL(String vOLDEMORT_URL) {
		VOLDEMORT_URL = vOLDEMORT_URL;
	}
	

	public String getVOLDEMORT_N_KEYS() {
		return VOLDEMORT_N_KEYS;
	}

	@Required
	public void setVOLDEMORT_N_KEYS(String vOLDEMORT_N_KEYS) {
		VOLDEMORT_N_KEYS = vOLDEMORT_N_KEYS;
	}

	public String getVOLDEMORT_N_VALUES() {
		return VOLDEMORT_N_VALUES;
	}

	@Required
	public void setVOLDEMORT_N_VALUES(String vOLDEMORT_N_VALUES) {
		VOLDEMORT_N_VALUES = vOLDEMORT_N_VALUES;
	}
	
	@Override
	public void start() {
		File dir = new File(VOLDEMORT_HOME + "/config");
		if (!dir.exists()) {
			dir.mkdirs();
			try {
				FileWriter fw = new FileWriter(VOLDEMORT_HOME + "/config/cluster.xml");
				IOUtils.copy(getClass().getResourceAsStream("/eu/dnetlib/data/multimap/voldemort/config/cluster.xml"), fw);
				fw.flush();
				fw.close();

				fw = new FileWriter(VOLDEMORT_HOME + "/config/server.properties");
				IOUtils.copy(getClass().getResourceAsStream("/eu/dnetlib/data/multimap/voldemort/config/server.properties"), fw);
				fw.flush();
				fw.close();
				
				fw = new FileWriter(VOLDEMORT_HOME + "/config/stores.xml");
				IOUtils.copy(getClass().getResourceAsStream("/eu/dnetlib/data/multimap/voldemort/config/stores.xml"), fw);
				fw.flush();
				fw.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		VoldemortConfig config = VoldemortConfig.loadFromVoldemortHome(VOLDEMORT_HOME);
		server = new VoldemortServer(config);
		server.start();
		log.info("Voldemort server has just started");
		
		factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(getVOLDEMORT_URL()));
		kkStore = factory.getStoreClient("kk-store");
		invertedKkStore = factory.getStoreClient("ikk-store");
		kvStore = factory.getStoreClient("kv-store");
	}

	@Override
	public void stop() {
		server.stop();
	}

	@GET
	@Path("echo/{param}")
	public String echo(@PathParam("param") String param) {
		return param;
	}
	
	/*-----------KEYS RESOURCE-------------*/
	@Override
	@GET
	@Path("keys/{key}")
	public String listKeys(@PathParam("key") String key) {
		log.info("Key aliases: " + key);
		String internalKey;
		
		Versioned<String> versioned = kkStore.get(key);
		if (versioned == null) {
			return null;
		} else {
			internalKey = versioned.getValue();
			versioned = invertedKkStore.get(internalKey);
			return versioned.getValue();
		}
	}

	@Override
	@POST
	@Path("keys/{key}/{alias}")
	public Boolean putKeyAlias(@PathParam("key") String key, @PathParam("alias") String alias) {
		log.info(alias + " @ " + key);
		Versioned<String> versioned = kkStore.get(alias);
		if (versioned != null){
			//aliases are unique
			return false;
		}
		
		versioned = kkStore.get(key);
		if (versioned == null) {
			return false;
		} else {
			// add entry in key-key store
			String internalKey = versioned.getValue();
			kkStore.put(alias, internalKey);
			
			// update inverted key-key store
			versioned = invertedKkStore.get(internalKey);
			Set<String> set = new Gson().fromJson(versioned.getValue(), HashSet.class);
			set.add(alias);
			invertedKkStore.put(internalKey, new Gson().toJson(set));
			return true;
		}
	}

	@Override
	@DELETE
	@Path("keys/{key}")
	public Boolean removeKey(@PathParam("key") String key) {
		log.info("Remove key: " + key);
		
		Versioned<String> versioned = kkStore.get(key);
		if (versioned == null) {
			return false;
		} else {
			// delete entry from key-key store
			String internalKey = versioned.getValue();
			kkStore.delete(key);
			
			// update/delete entry from inverted key-key store
			versioned = invertedKkStore.get(internalKey);
			Set<String> set = new Gson().fromJson(versioned.getValue(), HashSet.class);
			if (set.size() == 1) {
				invertedKkStore.delete(internalKey);
				kvStore.delete(internalKey);
			} else {
				set.remove(key);
				invertedKkStore.put(internalKey, new Gson().toJson(set));
			}
			return true;
		}
	}
	
	/*-----------VALUES RESOURCE-------------*/
	@Override
	@GET
	@Path("values/{key}")
	public String listValues(@PathParam("key") String key) {
		log.info("Get: " + key);
		String internalKey;
		
		Versioned<String> versioned = kkStore.get(key);
		if (versioned == null) {
			return "";
		} else {
			internalKey = versioned.getValue();
			versioned = kvStore.get(internalKey);
			return versioned.getValue();
		}
	}
	
	@Override
	@GET
	@Path("values/{key}/{position}")
	public String getValue(@PathParam("key")String key, @PathParam("position") int position) {
		log.info("Get i=" + position + " : " + key);
		String internalKey;
		
		Versioned<String> versioned = kkStore.get(key);
		if (versioned == null) {
			return "";
		} else {
			internalKey = versioned.getValue();
			versioned = kvStore.get(internalKey);
			List<String> list = new Gson().fromJson(versioned.getValue(), ArrayList.class);
			return list.get(position);
		}
	}

	@Override
	@POST
	@Path("values/{key}")
	public Boolean putValue(@PathParam("key") String key, String value) {
		log.info(key + " + " + value);
		String internalKey;

		Versioned<String> versioned = kkStore.get(key);
		if (versioned == null) {
			// new key (no alias)
			internalKey = String.valueOf(System.currentTimeMillis());
			kkStore.put(key, internalKey);
			Set<String> set = new HashSet<String>();
			set.add(key);
			invertedKkStore.put(internalKey, new Gson().toJson(set));
			
			set = new HashSet<String>();
			set.add(value);
			kvStore.put(internalKey, new Gson().toJson(set));
		} else {
			// key present
			internalKey = versioned.getValue();
			
			versioned = kvStore.get(internalKey);
			HashSet<String> set = new Gson().fromJson(versioned.getValue(), HashSet.class);
			set.add(value);
			kvStore.put(internalKey, new Gson().toJson(set));
		}
		return true;
	}
	
	@Override
	@DELETE
	@Path("values/{key}")
	public Boolean flushValues(@PathParam("key") String key) {
		log.info("Flush values: " + key);
		
		Versioned<String> versioned = kkStore.get(key);
		if (versioned == null) {
			return false;
		} else {
			String internalKey = versioned.getValue();
			Set<String> set = new HashSet<String>();
			kvStore.put(internalKey, new Gson().toJson(set));
			return true;
		}
	}

	/*-----------ROWS RESOURCE-------------*/
	@Override
	@DELETE
	@Path("rows/{key}")
	public Boolean deleteRow(@PathParam("key") String key) {
		// TODO Auto-generated method stub
		return false;
	}

	/*-----------OTHERS-------------*/
	@Override
	public String intersectValues(String key1, String key2) {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public String unionValues(String key1, String key2) {
		// TODO Auto-generated method stub
		return null;
	}

}
