package eu.dnetlib.r2d2.thrift;

import java.io.IOException;
import java.io.StringReader;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.management.RuntimeErrorException;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.ws.wsaddressing.W3CEndpointReference;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.thrift.TException;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.transaction.annotation.Transactional;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;

import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import eu.dnetlib.data.index.IIndexService;
import eu.dnetlib.data.index.IndexServiceException;
import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
import eu.dnetlib.enabling.resultset.rmi.ResultSetService;
import eu.dnetlib.enabling.tools.ServiceLocator;
import eu.dnetlib.enabling.tools.ServiceResolver;
import eu.dnetlib.miscutils.collections.Filter;
import eu.dnetlib.miscutils.collections.FilteredCollection;
import static eu.dnetlib.miscutils.collections.FilteredCollection.listFilter;
import eu.dnetlib.miscutils.collections.MappedCollection;
import static eu.dnetlib.miscutils.collections.MappedCollection.listMap;
import eu.dnetlib.miscutils.coupling.Holder;
import eu.dnetlib.miscutils.datetime.DateUtils;
import eu.dnetlib.miscutils.functional.UnaryFunction;
import eu.dnetlib.r2d2.ReadOnly;
import eu.dnetlib.r2d2.accesstime.Toucher;
import eu.dnetlib.r2d2.neo4j.BeanDao;
import eu.dnetlib.r2d2.neo4j.Neo4jBean;
import eu.dnetlib.r2d2.neo4j.dao.AccessTimeDao;
import eu.dnetlib.r2d2.neo4j.dao.GroupDao;
import eu.dnetlib.r2d2.neo4j.dao.IdBroker;
import eu.dnetlib.r2d2.neo4j.dao.ItemDao;
import eu.dnetlib.r2d2.neo4j.dao.Neo4jAccessTimeDao;
import eu.dnetlib.r2d2.neo4j.dao.ProfileDao;
import eu.dnetlib.r2d2.neo4j.dao.RLEntryDao;
import eu.dnetlib.r2d2.neo4j.dao.ReadingListDao;
import eu.dnetlib.r2d2.neo4j.dao.TagDao;
import eu.dnetlib.r2d2.neo4j.domain.AccessTime;
import eu.dnetlib.r2d2.neo4j.domain.Neo4jGroup;
import eu.dnetlib.r2d2.neo4j.domain.Neo4jItem;
import eu.dnetlib.r2d2.neo4j.domain.Neo4jProfile;
import eu.dnetlib.r2d2.neo4j.domain.Neo4jRLEntry;
import eu.dnetlib.r2d2.neo4j.domain.Neo4jReadingList;
import eu.dnetlib.r2d2.neo4j.domain.Neo4jTag;
import eu.dnetlib.r2d2.neo4j.domain.Neo4jRLEntry.Kind;
import eu.dnetlib.r2d2.search.SearchConnector;

//@Transactional
public class ScholarlynkImpl implements Scholarlynk.Iface {

	/**
	 * Convert a tag to a string.
	 * 
	 * @author marko
	 * 
	 */
	private static class TagToString implements UnaryFunction<String, Neo4jTag> {
		public String evaluate(final Neo4jTag arg) {
			return arg.getText();
		}
	}

	/**
	 * Convert a neo4j item to a thrift item.
	 * 
	 * @author marko
	 * 
	 */
	private class ItemToItem implements UnaryFunction<Item, Neo4jItem> {
		public Item evaluate(final Neo4jItem item) {

			List<Author> authors = Lists.newArrayList();
			if (item.getExtAuthors() != null)
				authors = listMap(item.getExtAuthors(), new UnaryFunction<Author, String>() {
					public Author evaluate(final String arg) {
						return new Author(null, arg, arg);
					}
				});

			return new Item(item.getId(), item.getTitle(), authors, item.getIconUrl(), item.getItemUrl(), item.getDescription(), null, 0, 0,
					sharedWith(item));
		}
	}

	/**
	 * Convert a neo4j reading list to a thrift bean.
	 * 
	 * @author marko
	 * 
	 */
	private class ReadingListToReadingList implements UnaryFunction<ReadingList, Neo4jReadingList> {
		private final UserToUserRef userToUser = new UserToUserRef();

		public ReadingList evaluate(final Neo4jReadingList profile) {
			final eu.dnetlib.r2d2.thrift.AccessTime access = new AccessTimeToAccessTime().evaluate(toucher.getAccessTime(profile.getId()));

			final int size = Iterables.size(entryDao.getReadingListEntries(profile.getId()));

			final Neo4jRLEntry mvlEntry = entryDao.getEntry(getCurrentUserMVL(), profile);

			double userRating = 0.0;
			if (mvlEntry != null)
				userRating = mvlEntry.getRating();

			final Neo4jProfile owner = profileDao.getReadingListOwner(profile.getId());

			final List<UserRef> sharedWith = sharedWith(profile);

			final int sharedAmong = sharedAmong(profile).size();

			return new ReadingList(profile.getId(), profile.getName(), profile.getDescription(), userToUser.evaluate(owner), profile.getAvatarUrl(),
					sharedAmong, size, access, userRating, userRating, sharedWith(profile));
		}
	}

	/**
	 * Convert a neo4j user to a thrift UserRef bean (only a small subset of data).
	 * 
	 * @author marko
	 * 
	 */
	private static class UserToUserRef implements UnaryFunction<UserRef, Neo4jProfile> {
		public UserRef evaluate(final Neo4jProfile profile) {
			if (profile == null)
				return null;
			return new UserRef(profile.getId(), profile.getName());
		}
	}

	/**
	 * Convert a neo4j user to a thrift UserRef bean (only a small subset of data).
	 * 
	 * @author marko
	 * 
	 */
	private static class UserToUser implements UnaryFunction<User, Neo4jProfile> {
		public User evaluate(final Neo4jProfile profile) {
			if (profile == null)
				return null;
			return new User(profile.getId(), profile.getName(), profile.getUid(), "<private>", profile.getAvatarUrl(), null, null);
		}
	}

	/**
	 * Convert a neo4j group bean to a thrift bean.
	 * 
	 * @author marko
	 * 
	 */
	private class GroupToGroup implements UnaryFunction<Group, Neo4jGroup> {
		public Group evaluate(final Neo4jGroup group) {
			final int members = Iterables.size(profileDao.getGroupMembers(group.getId()));
			return new Group(group.getId(), group.getName(), group.getDescription(), members, group.getAvatarUrl());
		}
	}

	private class AccessTimeToAccessTime implements UnaryFunction<eu.dnetlib.r2d2.thrift.AccessTime, AccessTime> {
		@Override
		public eu.dnetlib.r2d2.thrift.AccessTime evaluate(final eu.dnetlib.r2d2.neo4j.domain.AccessTime access) {
			return new eu.dnetlib.r2d2.thrift.AccessTime(new DateUtils(new Date(access.getCreationDate())).getDateAsISO8601String(), new DateUtils(
					new Date(access.getLastAccessDate())).getDateAsISO8601String(), new DateUtils(new Date(access.getLastModificationDate()))
					.getDateAsISO8601String());
		}
	}

	private static final Log log = LogFactory.getLog(ScholarlynkImpl.class); // NOPMD by marko on 11/24/08 5:02 PM

	private String uploadUrl;

	private ServiceLocator<IIndexService> indexLocator;

	@Resource
	private SearchConnector searchConnector;

	Map<String, List<String>> claimed = new HashMap<String, List<String>>();

	@Resource
	private IdBroker idBroker;

	// daos
	@Resource(name = "readingListDao")
	private ReadingListDao readingListDao;

	//	@Resource(name = "accessWrappingReadingListDao")
	//	private ReadingListDao readingListDaoA;

	@Resource
	private ProfileDao profileDao;

	@Resource
	private GroupDao groupDao;

	@Resource
	private ItemDao itemDao;

	@Resource
	private RLEntryDao entryDao;

	@Resource
	private TagDao tagDao;

	@Resource
	private AccessTimeDao accessTimeDao;

	@Resource
	private Toucher toucher;

	private GraphDatabaseService graphDB;

	/**
	 * service resolver.
	 */
	@Resource
	private ServiceResolver serviceResolver;

	private final Map<String, Session> tokenSession = Maps.newHashMap();

	private final ThreadLocal<Session> session = new ThreadLocal<Session>() {
		@Override
		protected Session initialValue() {
			return new Session("u3", "Marko");
		}
	};

	@Deprecated
	@Override
	public void test() {
		log.info("TESTING");

		final Iterable<Neo4jProfile> res = profileDao
				.getUsersSharingInformationObject("1-fd5279d6-6497-4ef0-9756-e8a214445318_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU=::oai:eprints.lse.ac.uk:14764");

		log.info("got res: " + res);
		for (final Neo4jProfile r : res)
			log.info("r: " + r);
	}

	public ScholarlynkImpl() {
		super();
	}

	@PostConstruct
	public void init() {
		log.info("STARTING SCHOLARLYINK");
	}

	protected boolean checkCredentials(final String username, final String password) {
		return true;
	}

	public String authenticate(final String userName, final String password) throws TException {
		log.info("trying to authenticate user: " + userName);

		if (userName == null)
			return null;

		final UUID token = UUID.randomUUID();
		final Session newSession = new Session();

		final Neo4jProfile user = Iterables.getOnlyElement(profileDao.search(userName, Neo4jProfile.UID, Neo4jProfile.MAIL), null);

		if (user == null) {
			log.info("failing to authenticate because no such user: " + userName);
			return null;
		}

		newSession.setUserId(user.getId());
		newSession.setUserName(user.getUid());
		tokenSession.put(token.toString(), newSession);

		log.info("authenticating user: " + userName + " new session " + newSession);

		session.set(newSession);
		return token.toString();
	}

	@ReadOnly
	public boolean login(final String authToken) throws TException {
		final Session newSession = tokenSession.get(authToken);
		if (newSession == null) {
			log.info("cannot resume session because cannot find token " + authToken);
			return false;
		}

		log.info("resuming already authenticated session: " + newSession);

		session.set(newSession);
		return true;
	}

	@ReadOnly
	public Map<String, String> getUserProperties(final String userId) throws TException {
		return basicGetUserProperties(userId);
	}

	public void setUserProperties(final String userId, final Map<String, String> properties) {
		final String mvlId = properties.get("mvl.id");

		readingListDao.setUserMVL(userId, mvlId);
	}

	public Map<String, String> basicGetUserProperties(final String userId) {
		final Map<String, String> res = Maps.newHashMap();
		res.put("default.privacy.level", "shared");

		final Neo4jReadingList mvl = readingListDao.getUserMVL(userId);
		if (mvl != null)
			res.put("mvl.id", mvl.getId());

		return res;
	}

	@ReadOnly
	public User getUser(final String userId) {
		final Neo4jProfile user = profileDao.getBean(userId);
		return new UserToUser().evaluate(user);
	}

	@ReadOnly
	public User getCurrentUser() {
		final Neo4jProfile user = profileDao.getBean(session.get().getUserId());
		return new UserToUser().evaluate(user);
	}

	@ReadOnly
	public ReadingListResults readingListSearch(final String term, final Range range, final Scope scope) throws TException {
		log.info("Searching users: " + term + " range " + range + " count " + count(range));

		final Holder<Integer> count = new Holder<Integer>();
		final List<ReadingList> readingLists = paginate(readingListDao, term, range, count, new ReadingListToReadingList());

		return new ReadingListResults(count.getValue(), readingLists);
	}

	@ReadOnly
	public UserResults userSearch(final String term, final Range range, final Scope scope) throws TException {
		log.info("Searching users: " + term + " range " + range + " count " + count(range));

		final UserToUser userToUser = new UserToUser();

		final Holder<Integer> count = new Holder<Integer>();
		final List<User> users = paginate(profileDao, term, range, count, new UnaryFunction<User, Neo4jProfile>() {
			@Override
			public User evaluate(final Neo4jProfile profile) {
				return userToUser.evaluate(profile);
			}
		});

		return new UserResults(count.getValue(), users);
	}

	@ReadOnly
	public GroupResults groupSearch(final String term, final Range range, final Scope scope) throws TException {
		log.info("Searching groups: " + term + " range " + range + " count " + count(range));

		final Holder<Integer> count = new Holder<Integer>();
		final List<Group> groups = paginate(groupDao, term, range, count, new GroupToGroup());

		return new GroupResults(count.getValue(), groups);
	}

	@Override
	public GroupMembers groupMembers(final String id, final Range range) throws TException {
		final Holder<Integer> count = new Holder<Integer>();
		final List<UserRef> members = paginate(profileDao.getGroupMembers(id), range, count, new UserToUserRef());
		return new GroupMembers(count.getValue(), members);
	}

	@Override
	public UserGroups userGroups(final String id, final Range range) throws TException {
		final Holder<Integer> count = new Holder<Integer>();
		final List<Group> groups = paginate(groupDao.getUserGroups(id), range, count, new GroupToGroup());
		return new UserGroups(count.getValue(), groups);
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.r2d2.thrift.Scholarlynk.Iface#getReadingList(java.lang.String)
	 */
	@Override
	@ReadOnly
	public ReadingList getReadingList(final String id) {
		final Neo4jReadingList rl = readingListDao.getBean(id);
		if (rl == null)
			return null;

		return new ReadingListToReadingList().evaluate(rl);
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.r2d2.thrift.Scholarlynk.Iface#getEntries(java.lang.String, eu.dnetlib.r2d2.thrift.Range)
	 */
	@Override
	@ReadOnly
	public RLEntryResults getEntries(final String id, final Range range, final RLEntryFilter filterParam) throws TException {
		RLEntryFilter filter = filterParam;
		if (filter == null)
			filter = RLEntryFilter.ALL;

		final Holder<Integer> count = new Holder<Integer>();

		Iterable<Neo4jRLEntry> entryList;
		if (filter == RLEntryFilter.ALL)
			entryList = entryDao.getReadingListEntries(id);
		else
			entryList = entryDao.getReadingListEntries(id, filter == RLEntryFilter.ITEMS_ONLY ? Neo4jRLEntry.Kind.ITEM : Neo4jRLEntry.Kind.READING_LIST);

		final List<RLEntry> entries = paginate(entryList, range, count, new UnaryFunction<RLEntry, Neo4jRLEntry>() {
			public RLEntry evaluate(final Neo4jRLEntry entry) {
				final List<String> tags = listMap(tagDao.getTagsOfBean(entry.getId()), new TagToString());

				if (entry.getKind() == Kind.ITEM)
					return new RLEntry(entry.getId(), new ItemToItem().evaluate(itemDao.getItemForEntry(entry.getId())), null, tags, entry.getRating());
				else
					return new RLEntry(entry.getId(), null,
							new ReadingListToReadingList().evaluate(readingListDao.getReadingListForEntry(entry.getId())), tags, entry.getRating());
			}
		});
		return new RLEntryResults(count.getValue(), entries);
	}

	/**
	 * TODO: cleanup this mess.
	 * 
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.r2d2.thrift.Scholarlynk.Iface#search(java.lang.String, eu.dnetlib.r2d2.thrift.Range,
	 *      eu.dnetlib.r2d2.thrift.Scope)
	 */
	@ReadOnly
	public SearchResults search(final String term, final Range rangeParam, final Scope scope) throws TException {
		log.info("searching " + term + " for user " + session.get().getUserName() + " range " + rangeParam + " scope " + scope);

		final Range range = defaultRange(rangeParam);

		if (scope != null && scope == Scope.MATERIALIZED)
			return materializedSearch(term, range, scope);

		try {
			String cqlTerm = term == null ? "" : term.trim();
			if (cqlTerm.isEmpty())
				cqlTerm = "textual";

			final W3CEndpointReference epr = indexLocator.getService().indexLookup("all", cqlTerm, "DMF", "index");
			final ResultSetService resultSet = serviceResolver.getService(ResultSetService.class, epr);
			final String rsId = serviceResolver.getResourceIdentifier(epr);

			final int from = range.getStart();

			final int total = resultSet.getNumberOfElements(rsId);

			final int count = range.getCount();

			int max = from + count;
			if (count == 0)
				max = from + 10;
			if (total < max)
				max = total;

			final List<String> elements = resultSet.getResult(rsId, from + 1, max, "waiting");

			final List<Item> items = parseDMF(elements);

			final UserRef me = new UserRef("u1", "Tim");

			final List<SearchResult> results = new ArrayList<SearchResult>();
			for (final Item item : items) {

				if (claimed.get(item.getId()) != null) {
					final List<String> res = claimed.get(item.getId());
					for (final Author author : item.getAuthors()) {
						if (author.getOriginalName().equals(res.get(0)))
							author.setName(res.get(1));
					}
				}

				final SearchResult result = new SearchResult();
				result.setItem(item);
				findResultEntries(result);
				//				entry.setReadingLists(new ArrayList<RLEntryReadingList>());

				results.add(result);
			}

			return new SearchResults(total, results);
		} catch (final IndexServiceException e) {
			log.fatal("xx", e);
		} catch (final ResultSetException e) {
			log.fatal("xx", e);
		}
		return null;
	}

	private SearchResults materializedSearch(final String term, final Range range, final Scope scope) {
		log.info("search only in materialized items");

		final Holder<Integer> count = new Holder<Integer>();

		final Iterable<Neo4jItem> items = itemDao.search(term);

		final List<SearchResult> searchResults = paginate(items, range, count, new UnaryFunction<SearchResult, Neo4jItem>() {
			public SearchResult evaluate(final Neo4jItem item) {
				final SearchResult result = new SearchResult(new ItemToItem().evaluate(item), null, new ArrayList<SearchResultEntry>());
				return result;
			}
		});

		findResultEntries(searchResults);

		return new SearchResults(count.getValue(), searchResults);
	}

	@Override
	@ReadOnly
	public SearchResult getInformationObjectAsSearchResult(final String id) throws TException {
		if (isReadingList(id))
			return getReadingListAsSearchResult(id);
		return getItemAsSearchResult(id);
	}

	protected boolean isReadingList(final String id) {
		return itemDao.getNodeType(id).equals(Neo4jReadingList.class);

		// old way:
		/*
		 * final Neo4jReadingList readingList = readingListDao.getBean(id);
		 * 
		 * if (readingList != null) { final Node node = ((Neo4jBean) readingList).getNode(); return
		 * node.getProperty("eu.dnetlib.r2d2.neo4j.domain.Neo4jReadingList._id", null) != null; } return false;
		 */
	}

	public SearchResult getReadingListAsSearchResult(final String id) {
		final ReadingList readingList = getReadingList(id);

		return new SearchResult(null, readingList, new ArrayList<SearchResultEntry>());
	}

	public SearchResult getItemAsSearchResult(final String id) throws TException {

		final Item item = getItem(id);

		final SearchResult result = new SearchResult(item, null, new ArrayList<SearchResultEntry>());
		findResultEntries(result);
		return result;
	}

	private Range defaultRange(final Range rangeParam) {
		if (rangeParam != null)
			return rangeParam;
		return new Range(0, 10);
	}

	private void findResultEntries(final List<SearchResult> searchResults) {
		for (final SearchResult result : searchResults)
			findResultEntries(result);
	}

	/**
	 * This method will search if a given search result matches one or more entries in some reading list and populate
	 * the search result record accordingy, with references to the reading list and tags etc.
	 * 
	 * @param result
	 */
	private void findResultEntries(final SearchResult result) {
		final String itemId = result.getItem().getId();

		final ArrayList<SearchResultEntry> resultEntries = new ArrayList<SearchResultEntry>();

		for (final Neo4jRLEntry entry : entryDao.getEntriesForItem(itemId)) {
			final Neo4jReadingList rl = readingListDao.getReadingListOfEntry(entry.getId());

			// skip if the reading list is not mine
			if (!profileDao.getReadingListOwner(rl.getId()).getId().equals(session.get().getUserId()))
				continue;

			final Neo4jItem item = itemDao.getItemForEntry(entry.getId());

			final List<String> tags = listMap(tagDao.getTagsOfBean(entry.getId()), new UnaryFunction<String, Neo4jTag>() {
				public String evaluate(final Neo4jTag tag) {
					return tag.getText();
				}
			});

			if (rl == null)
				throw new IllegalStateException("cannot find reading list for entry " + entry.getId());
			if (item == null)
				throw new IllegalStateException("cannot find item for entry " + entry.getId());

			resultEntries.add(new SearchResultEntry(new RLEntryDescription(entry.getId(), rl.getId(), item.getId(), entry.getRating()),
					new ReadingListToReadingList().evaluate(rl), tags));
		}

		// DO nothing.

		result.setEntries(resultEntries);
	}

	private List<Item> parseDMF(final List<String> elements) {
		if (elements == null || elements.isEmpty())
			return new ArrayList<Item>(); // hack, mapped collection has a bug?

		return listMap(elements, new UnaryFunction<Item, String>() {
			public Item evaluate(final String arg) {
				try {
					return parseDMF(arg);
				} catch (final ParserConfigurationException e) {
					return null;
				} catch (final SAXException e) {
					return null;
				} catch (final IOException e) {
					return null;
				} catch (final XPathExpressionException e) {
					return null;
				}
			}
		});
	}

	private Item parseDMF(final String dmf) throws ParserConfigurationException, SAXException, IOException, XPathExpressionException {
		final DocumentBuilder builder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
		final Document doc = builder.parse(new InputSource(new StringReader(dmf)));

		final Item it = new Item();

		final XPath xpath = XPathFactory.newInstance().newXPath();
		it.setId(xpath.evaluate("//*[local-name() = 'objIdentifier']", doc));
		it.setTitle(xpath.evaluate("//*[local-name() = 'title']", doc));
		it.setDescription(xpath.evaluate("//*[local-name() = 'description']", doc));

		final NodeList nodes = (NodeList) xpath.evaluate("//*[local-name() = 'creator']", doc, XPathConstants.NODESET);
		final List<Author> authors = new ArrayList<Author>();
		for (int i = 0; i < nodes.getLength(); i++) {
			final Element author = (Element) nodes.item(i);
			final String name = author.getTextContent();
			authors.add(new Author(null, name, name));
		}
		it.setAuthors(authors);

		return it;
	}

	public void claim(final String itemId, final String authorName) throws TException {
		log.info("claiming " + itemId + " " + authorName);

		claimed.put(itemId, Lists.newArrayList(authorName, session.get().getUserName()));
	}

	/**
	 * TODO: should we create a new object for each tag, or reuse the tag objects? if reusing we should be sure that the
	 * search is exact and that the transactions work for real.
	 * 
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.r2d2.thrift.Scholarlynk.Iface#addTag(java.lang.String, java.lang.String)
	 */
	@Override
	public void addTag(final String objectId, final String tag) throws TException {
		Neo4jTag bean = Iterables.getOnlyElement(tagDao.search(tag), null);

		if (bean == null) {
			bean = tagDao.newBean();
			bean.setId(autoId(null));
			bean.setText(tag);
			tagDao.saveBean(bean);
		}

		tagDao.addTagToBean(objectId, bean.getId());
	}

	@Override
	public void removeTag(final String objectId, final String tag) throws TException {
		final Neo4jTag bean = Iterables.getOnlyElement(tagDao.search(tag));
		tagDao.removeTagFromBean(objectId, bean.getId());
	}

	public void abortUpload(final String id) throws TException {
		log.info("");
	}

	public void commitUpload(final String id) throws TException {
		log.info("confirmed upload " + id);
	}

	public String uploadItem(final Item item) throws TException, InvalidOperation {

		if (itemDao.getBean(item.getId()) != null)
			throw new InvalidOperation(1, "cannot upload same item twice");

		final String itemUrl = uploadUrl + item.getId();
		log.info("Uploading item: " + uploadUrl + item.getId());

		final Neo4jItem bean = itemDao.newBean();
		bean.setId(autoId(item.getId()));
		bean.setTitle(item.getTitle());
		bean.setDescription(item.getDescription());
		bean.setIconUrl(item.getIconUrl());
		bean.setItemUrl(itemUrl);

		itemDao.saveBean(bean);

		return itemUrl;
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.r2d2.thrift.Scholarlynk.Iface#createUser(eu.dnetlib.r2d2.thrift.User)
	 */
	@Override
	public String createUser(final User user) throws TException {
		final Neo4jProfile userBean = profileDao.newBean();
		userBean.setId(autoId(user.getId()));
		userBean.setName(user.getName());
		userBean.setAvatarUrl(user.getAvatarUrl());
		userBean.setUid(user.getUid());
		userBean.setMail(user.getMail());

		profileDao.saveBean(userBean);

		final Neo4jReadingList mvl = readingListDao.newBean();

		mvl.setId(deriveMvlId(userBean.getId()));
		mvl.setName("MVL");
		mvl.setDescription(userBean.getName() + "'s Virtual Library");
		readingListDao.saveBean(mvl);
		readingListDao.setUserMVL(userBean.getId(), mvl.getId());
		profileDao.setReadingListOwner(mvl.getId(), userBean.getId());

		return userBean.getId();
	}

	/**
	 * This is useful during the first testing phase, when we want that the mvl id has a nice and intuitive id, since we
	 * are going to poke around it manually around it.
	 * 
	 * @param id
	 * @return
	 */
	private String deriveMvlId(final String id) {
		if (id == null)
			return autoId(null);

		if (id.length() < 4 && id.startsWith("u"))
			return "r" + id.substring(1);
		return autoId(null);
	}

	@Override
	public String createGroup(final Group group) throws TException {
		final Neo4jGroup bean = groupDao.newBean();
		bean.setId(autoId(group.getId()));
		bean.setName(group.getName());
		bean.setDescription(group.getDescription());
		bean.setAvatarUrl(group.getAvatarUrl());

		groupDao.saveBean(bean);

		return bean.getId();
	}

	@Override
	public String createReadingList(final ReadingList readingList) throws TException {
		final Neo4jReadingList bean = readingListDao.newBean();
		bean.setId(autoId(readingList.getId()));
		bean.setName(readingList.getName());
		bean.setDescription(readingList.getDescription());
		bean.setAvatarUrl(readingList.getIconUrl());

		log.info("READING LIST DAO : " + readingListDao + " " + this);
		readingListDao.saveBean(bean);

		profileDao.setReadingListOwner(bean.getId(), readingList.getOwner().getId());

		return bean.getId();
	}

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.r2d2.thrift.Scholarlynk.Iface#addEntry(eu.dnetlib.r2d2.thrift.RLEntryDescription)
	 * 
	 *      It prevents an entry to be inserted more than once.
	 */
	@Override
	public String addEntry(final RLEntryDescription entry) throws TException {

		final String oldEntry = checkDuplicateEntry(entry);
		if (oldEntry != null)
			return oldEntry;

		Neo4jRLEntry.Kind kind = null;
		final String itemId = entry.getItemId();

		final Class<? extends Neo4jBean> clazz = itemDao.getNodeType(itemId);

		if (clazz == null)
			kind = Neo4jRLEntry.Kind.ITEM;
		else if (clazz.equals(Neo4jItem.class))
			kind = Neo4jRLEntry.Kind.ITEM;
		else if (clazz.equals(Neo4jReadingList.class))
			kind = Neo4jRLEntry.Kind.READING_LIST;

		final Neo4jRLEntry bean = entryDao.newBean();
		bean.setId(autoId(entry.getId()));
		bean.setKind(kind);

		if (kind == Neo4jRLEntry.Kind.ITEM) {

			log.info("inserting reading list " + entry.getItemId() + " as child of reading list " + entry.getReadingListId());
			readingListDao.setReadingListForEntry(entry.getItemId(), bean.getId());
		} else {
			log.info("inserting item " + entry.getItemId() + " as child of reading list " + entry.getReadingListId());

			final Neo4jItem item = itemDao.getBean(entry.getItemId());
			if (item == null)
				materializeItem(entry.getItemId()); // TODO: better error reporting

			itemDao.setItemForEntry(entry.getItemId(), bean.getId());
		}

		entryDao.addEntryToReadingList(bean.getId(), entry.getReadingListId());

		// we also have to make sure each added entry is also addedd to the MVL. Recursive application
		// of this method will check if the entry is already in the MVL.
		if (!isMvl(entry.getReadingListId()))
			addEntry(new RLEntryDescription(null, getCurrentUserMVL().getId(), entry.getItemId(), 0));

		return bean.getId();
	}

	/**
	 * If the entry description to be added describes an already existing entry, return the id of the existing entry.
	 * 
	 * @param entryDesc
	 * @return
	 */
	private String checkDuplicateEntry(final RLEntryDescription entryDesc) {
		final Iterable<Neo4jRLEntry> entries = entryDao.getReadingListEntries(entryDesc.getReadingListId());
		for (final Neo4jRLEntry entry : entries) {
			final Neo4jItem item = itemDao.getItemForEntry(entry.getId());
			if (entryDesc.getItemId().equals(item.getId()))
				return entry.getId();
		}

		return null;
	}

	@Override
	public void modifyEntry(final RLEntryDescription entry) throws TException {
		throw new TException("not yet implemented");
	}

	@Override
	public void rateInformationObject(final String ioId, final double rating) throws TException {
		final Neo4jReadingList mvl = getCurrentUserMVL();

		final Neo4jRLEntry entry = entryDao.getEntry(mvl.getId(), ioId);
		if (entry == null)
			throw new TException("the currently logged user (" + session.get().getUserName() + ") cannot rate object " + ioId
					+ " because it's not contained in his virtual library");
		entry.setRating(rating);
		entryDao.saveBean(entry);
	}

	/**
	 * Returns true if the reading list is a MVL reading list.
	 * 
	 * TODO: ugly
	 * 
	 * @param rlId
	 * @return
	 */
	protected boolean isMvl(final String rlId) {
		//		return getCurrentUserMVL().getId().equals(rlId);
		return readingListDao.getBean(rlId).getName().equals("MVL");
	}

	protected Neo4jReadingList getCurrentUserMVL() {
		final Neo4jReadingList res = readingListDao.getUserMVL(session.get().getUserId());
		if (res == null)
			throw new IllegalStateException("currently logged user doesn't have a MVL");
		return res;
	}

	@Override
	public void addUserToGroup(final String groupId, final String userId) {
		groupDao.addUserToGroup(groupId, userId);
	}

	private <T, S extends Neo4jBean> List<T> paginate(
			final BeanDao<S> dao,
			final String terms,
			final Range range,
			final Holder<Integer> countHolder,
			final UnaryFunction<T, S> mapper) {
		return paginate(dao.search(start(range), count(range), terms), countHolder, mapper);
	}

	private <T, S extends Neo4jBean> List<T> paginate(
			final eu.dnetlib.r2d2.neo4j.SearchResults<S> results,
			final Holder<Integer> countHolder,
			final UnaryFunction<T, S> mapper) {
		countHolder.setValue(results.getCount());
		return listMap(results.getResults(), mapper);
	}

	/**
	 * This is useful for all those dao functions which return an iterator but don't do pagination in the dao, like
	 * group getMembers etc.
	 * 
	 * <p>
	 * New paginate(dao, term,....) delegate the search pagination to the dao itself
	 * </p>
	 * 
	 * @param <T>
	 *            target type
	 * @param <S>
	 *            source type
	 * @param results
	 *            lowlevel results iterator
	 * @param range
	 *            user requested result range (page)
	 * @param countHolder
	 *            used to return the total count
	 * @param mapper
	 *            function which transforms source records to target records, only invoked for the objects within the
	 *            range
	 * @return list of max range.getCount() elements
	 */

	private <T, S> List<T> paginate(final Iterator<S> results, final Range range, final Holder<Integer> countHolder, final UnaryFunction<T, S> mapper) {
		return paginate(results, range, countHolder, mapper);
	}

	private <T, S> List<T> paginate(final Iterable<S> results, final Range range, final Holder<Integer> countHolder, final UnaryFunction<T, S> mapper) {
		final List<T> transportBeans = new ArrayList<T>();
		final int start = start(range);
		final int limit = start + count(range);

		int count = 0;
		for (final S profile : results) {
			if (count >= start && count < limit)
				transportBeans.add(mapper.evaluate(profile));

			count++;
		}
		countHolder.setValue(count);
		return transportBeans;
	}

	/**
	 * Some items are held externally, for example in the driver information space. However, when an entry is created
	 * and added to a reading list, the corresponding item has to exist inside neo4j. We materialize thus an external
	 * item when needed.
	 * 
	 * @param id
	 */
	public void materializeItem(final String id) {
		log.info("materializing item: " + id);

		final Item ext = getExternalItem(id);
		Neo4jItem bean = itemDao.getBean(id);
		if (bean == null)
			bean = itemDao.newBean();
		bean.setId(ext.getId());
		bean.setTitle(ext.getTitle());
		bean.setDescription(ext.getDescription());
		bean.setIconUrl(ext.getIconUrl());
		bean.setItemUrl(ext.getItemUrl());
		bean.setExtAuthors(listMap(ext.getAuthors(), new UnaryFunction<String, Author>() {
			public String evaluate(final Author arg) {
				return arg.getName();
			}
		}));

		itemDao.saveBean(bean);
	}

	public void deleteItem(final String id) {
		log.info("deleting item " + id);
		itemDao.deleteBean(id);
	}

	@ReadOnly
	public Item getItem(final String id) {
		log.info("getting item: " + id);

		final Neo4jItem item = itemDao.getBean(id);

		if (item != null)
			return new Item(item.getId(), item.getTitle(), null, item.getIconUrl(), item.getItemUrl(), item.getDescription(), null, 0, 0, sharedBy(id));

		log.info("item is not found inside neo4j, try fetch item from external source");
		return getExternalItem(id);
	}

	public Item getExternalItem(final String id) {
		final String itemSource = searchConnector.documentById(id);

		try {
			return parseDMF(itemSource);
		} catch (final XPathExpressionException e) {
			throw new IllegalStateException(e);
		} catch (final ParserConfigurationException e) {
			throw new IllegalStateException(e);
		} catch (final SAXException e) {
			throw new IllegalStateException(e);
		} catch (final IOException e) {
			throw new IllegalStateException(e);
		}
	}

	/**
	 * since range can be null, this helper avoids continuous checks.
	 * 
	 * @param range
	 * @return
	 */
	protected int count(final Range range) {
		if (range == null)
			return 10;
		return range.getCount();
	}

	/**
	 * since range can be null, this helper avoids continuous checks.
	 * 
	 * @param range
	 * @return
	 */
	protected int start(final Range range) {
		if (range == null)
			return 0;
		return range.getStart();
	}

	/**
	 * If the id is not provided, allocate a new one.
	 * <p>
	 * This method can be changed in future, to enforce that the id is allocated only by the server, however during
	 * early stage of development it's easier to create objects with known ids.
	 * </p>
	 * 
	 * @param id
	 * @return
	 */
	protected String autoId(final String id) {
		if (id == null || "".equals(id))
			return idBroker.generateId();
		return id;
	}

	protected List<UserRef> sharedWith(final Neo4jBean bean) {
		return sharedBy(bean.getId());
	}

	/**
	 * Returns the users which share this object.
	 * 
	 * @param id
	 * @return
	 */
	protected List<UserRef> sharedBy(final String id) {
		final String currentUserId = session.get().getUserId();

		return listFilter(sharedAmong(id), new Filter<UserRef>() {
			public Boolean evaluate(final UserRef user) {
				return !currentUserId.equals(user.getId());
			}
		});
	}

	protected List<UserRef> sharedAmong(final Neo4jBean bean) {
		return sharedAmong(bean.getId());
	}

	/**
	 * Return the whole list of people who share this item.
	 * 
	 * @param id
	 * @return
	 */
	protected List<UserRef> sharedAmong(final String id) {
		final Iterable<Neo4jProfile> sharedProfiles = profileDao.getUsersSharingInformationObject(id);

		final String currentUserId = session.get().getUserId();
		return listMap(sharedProfiles, new UserToUserRef());

	}

	public String getUploadUrl() {
		return uploadUrl;
	}

	public ServiceLocator<IIndexService> getIndexLocator() {
		return indexLocator;
	}

	@Required
	public void setIndexLocator(final ServiceLocator<IIndexService> indexLocator) {
		this.indexLocator = indexLocator;
	}

	@Required
	public void setUploadUrl(final String uploadUrl) {
		this.uploadUrl = uploadUrl;
	}

	public ReadingListDao getReadingListDao() {
		return readingListDao;
	}

	public void setReadingListDao(final ReadingListDao readingListDao) {
		this.readingListDao = readingListDao;
	}

	public GraphDatabaseService getGraphDB() {
		return graphDB;
	}

	@Required
	public void setGraphDB(final GraphDatabaseService graphDB) {
		this.graphDB = graphDB;
	}

	public ProfileDao getProfileDao() {
		return profileDao;
	}

	public void setProfileDao(final ProfileDao profileDao) {
		this.profileDao = profileDao;
	}

	public GroupDao getGroupDao() {
		return groupDao;
	}

	public void setGroupDao(final GroupDao groupDao) {
		this.groupDao = groupDao;
	}

}
