package eu.dnetlib.functionality.notification.dao;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.Date;
import java.util.SortedSet;
import java.util.TreeSet;

import javax.sql.DataSource;

import org.apache.log4j.Logger;
import org.springframework.jdbc.datasource.DataSourceUtils;

import eu.dnetlib.domain.functionality.NotificationEvent;
import eu.dnetlib.domain.functionality.NotificationQuery;
import eu.dnetlib.domain.functionality.NotificationResult;
import eu.dnetlib.domain.functionality.NotificationSchedule;
import eu.dnetlib.domain.functionality.NotificationSubscription;

/**
 * This bean implements notification DAO using an SQL database to persist queries and events through JDBC.
 * The SQL database must support foreign keys and UTF-8. 
 * @author thanos@di.uoa.gr
 * @see NotificationDAO
 * @see eu.dnetlib.domain.functionality.NotificationQuery
 * @see eu.dnetlib.domain.functionality.NotificationEvent
 * @see NotificationDAOException
 *
 */
public class JDBCNotificationDAO implements NotificationDAO {
	private static final Logger logger = Logger.getLogger(JDBCNotificationDAO.class);
	private static final String CREATE_QUERY = "CREATE TABLE IF NOT EXISTS NotificationQuery (queryId VARCHAR (36) NOT NULL PRIMARY KEY, queryLanguage VARCHAR (16) NOT NULL, sourceUri VARCHAR(255) NOT NULL, queryString " +
			"TEXT NOT NULL) DEFAULT CHARACTER SET = 'UTF8', ENGINE = 'InnoDB';";
	private static final String CREATE_SCHEDULE = "CREATE TABLE IF NOT EXISTS NotificationSchedule (queryId VARCHAR (36) NOT NULL PRIMARY KEY, triggerThreshold FLOAT DEFAULT NULL, percentileThreshold BOOLEAN NOT NULL, " +
			"executionPeriod BIGINT NOT NULL, enabled BOOLEAN NOT NULL, FOREIGN KEY (queryId) REFERENCES NotificationQuery (queryId) ON UPDATE CASCADE ON DELETE CASCADE) DEFAULT CHARACTER SET = 'UTF8', ENGINE = 'InnoDB'";
	private static final String CREATE_RESULT = "CREATE TABLE IF NOT EXISTS NotificationResult (queryId VARCHAR (36) NOT NULL, date TIMESTAMP NOT NULL, resultId VARCHAR (36) NOT NULL, name TINYTEXT NOT NULL, value INT NOT " +
			"NULL, PRIMARY KEY (queryId, date, resultId), FOREIGN KEY (queryId) REFERENCES NotificationQuery (queryId) ON UPDATE CASCADE ON DELETE CASCADE) DEFAULT CHARACTER SET = 'UTF8', ENGINE = 'InnoDB';";
	private static final String CREATE_SUBSCRIPTION = "CREATE TABLE IF NOT EXISTS NotificationSubscription (queryId VARCHAR (36) NOT NULL, alertService VARCHAR (255) NOT NULL, enabled BOOLEAN NOT NULL, PRIMARY KEY (queryId, " +
			"alertService), FOREIGN KEY (queryId) REFERENCES NotificationSchedule (queryId) ON UPDATE CASCADE ON DELETE CASCADE) DEFAULT CHARACTER SET = 'UTF8', ENGINE = 'InnoDB';";
	private static final String COUNT_QUERIES = "SELECT COUNT(*) FROM NotificationQuery;";
	private static final String GET_QUERIES = "SELECT queryId, queryLanguage, sourceUri, queryString FROM NotificationQuery ORDER BY queryId LIMIT ? OFFSET ?;";
	private static final String GET_QUERY = "SELECT queryLanguage, sourceUri, queryString FROM NotificationQuery WHERE queryId = ?;";
	private static final String SAVE_QUERY = "INSERT INTO NotificationQuery (queryId, queryLanguage, sourceUri, queryString) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE queryLanguage = ?, sourceUri = ?, queryString = ?;";
	private static final String DELETE_QUERY = "DELETE FROM NotificationQuery WHERE queryId = ?;";
	private static final String COUNT_SCHEDULES = "SELECT COUNT(*) FROM NotificationSchedule;";
	private static final String GET_SCHEDULES = "SELECT queryId, triggerThreshold, percentileThreshold, executionPeriod, enabled FROM NotificationSchedule ORDER BY queryId LIMIT ? OFFSET ?;";
	private static final String GET_ENABLED_SCHEDULES = "SELECT queryId, triggerThreshold, percentileThreshold, executionPeriod FROM NotificationSchedule WHERE enabled ORDER BY queryId LIMIT ? OFFSET ?;";
	private static final String GET_SCHEDULE = "SELECT triggerThreshold, percentileThreshold, executionPeriod, enabled FROM NotificationSchedule WHERE queryId = ?;";
	private static final String SAVE_SCHEDULE = "INSERT INTO NotificationSchedule (queryId, triggerThreshold, percentileThreshold, executionPeriod, enabled) VALUES (?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE triggerThreshold = " +
			"?, percentileThreshold = ?, executionPeriod = ?, enabled = ?";
	private static final String DELETE_SCHEDULE = "DELETE FROM NotificationSchedule WHERE queryId = ?;";
	private static final String COUNT_EVENTS = "SELECT COUNT(DISTINCT queryId, date) FROM NotificationResult;";
	private static final String GET_EVENTS = "SELECT DISTINCT queryId, date FROM NotificationResult ORDER BY queryId, date LIMIT ? OFFSET ?;";
	private static final String COUNT_RESULTS = "SELECT COUNT(*) FROM NotificationResult;";
	private static final String GET_RESULTS = "SELECT queryId, date, resultId, name, value FROM NotificationResult ORDER BY queryId, date, resultId LIMIT ? OFFSET ?;";
	private static final String GET_RESULT = "SELECT name, value FROM NotificationResult WHERE queryId = ? AND date = ? AND resultId = ?;";
	private static final String GET_PREVIOUS_RESULT = "SELECT date, name, value FROM NotificationResult WHERE queryId = ? AND date < ? AND resultId = ? ORDER BY date DESC LIMIT 1;";
	private static final String GET_LAST_RESULT = "SELECT date, name, value FROM NotificationResult WHERE queryId = ? AND resultId = ? ORDER BY date DESC LIMIT 1;";
	private static final String SAVE_RESULT = "INSERT INTO NotificationResult (queryId, date, resultId, name, value) VALUES (?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE name = ?, value = ?;";
	private static final String COUNT_SUBSCRIPTIONS = "SELECT COUNT(*) FROM NotificationSubscription;";
	private static final String GET_SUBSCRIPTIONS = "SELECT queryId, alertService, enabled FROM NotificationSubscription ORDER BY queryId, alertService LIMIT ? OFFSET ?;";
	private static final String GET_ENABLED_SUBSCRIPTIONS = "SELECT alertService FROM NotificationSubscription WHERE queryId = ? AND enabled ORDER BY alertService LIMIT ? OFFSET ?;";
	private static final String GET_SUBSCRIPTION = "SELECT enabled FROM NotificationSubscription WHERE queryId = ? AND alertService = ?;";
	private static final String SAVE_SUBSCRIPTION = "INSERT INTO NotificationSubscription (queryId, alertService, enabled) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE enabled = ?;";
	private static final String DELETE_SUBSCRIPTION = "DELETE FROM NotificationSubscription WHERE queryId = ? AND alertService = ?;";

	private DataSource dataSource;

	public void setDataSource(final DataSource dataSource) {
		this.dataSource = dataSource;
	}
	
	/**
	 * Create necessary tables if they do not exist.
	 * @throws NotificationDAOException if any errors occur
	 */
	public void init() throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final Statement statement = connection.createStatement();
			statement.executeUpdate(CREATE_QUERY);
			statement.executeUpdate(CREATE_SCHEDULE);
			statement.executeUpdate(CREATE_RESULT);
			statement.executeUpdate(CREATE_SUBSCRIPTION);
			statement.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			logger.info("JDBC notification DAO initialization complete");
		} catch (final SQLException e) {
			throw new NotificationDAOException("error initializing JDBC notification DAO", e);
		}
	}
	
	@Override
	public int countQueries() throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement countQueries = connection.prepareStatement(COUNT_QUERIES);
			final ResultSet resultSet = countQueries.executeQuery();
			final Integer queries = (resultSet.next()) ? resultSet.getInt(1) : null;
			countQueries.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			if (queries == null)
				throw new NotificationDAOException("error counting queries");
			else
				return queries;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error counting queries", e);
		}
	}
	
	@Override
	public SortedSet<NotificationQuery> getQueries(final int limit, final int offset) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement getQueries = connection.prepareStatement(GET_QUERIES);
			getQueries.setInt(1, limit);
			getQueries.setInt(2, offset);
			final ResultSet resultSet = getQueries.executeQuery();
			final SortedSet<NotificationQuery> queries = new TreeSet<NotificationQuery>();
			while (resultSet.next()) {
				queries.add(new NotificationQuery(resultSet.getString(1), resultSet.getString(2), new URI(resultSet.getString(3)), resultSet.getString(4)));
			}
			getQueries.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			return queries;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error retrieving queries (limit: " + limit + ", offset: " + offset + ")", e);
		} catch (final URISyntaxException e) {
			throw new NotificationDAOException("error retrieving queries (limit: " + limit + ", offset: " + offset + ")", e);
		}
	}
	
	@Override
	public NotificationQuery getQuery(final String queryId) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement getQuery = connection.prepareStatement(GET_QUERY);
			getQuery.setString(1, queryId);
			final ResultSet resultSet = getQuery.executeQuery();
			final NotificationQuery query = resultSet.next() ? new NotificationQuery(queryId, resultSet.getString(1), new URI(resultSet.getString(2)), resultSet.getString(3)) : null;
			getQuery.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			return query;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error retrieving query " + queryId, e);
		} catch (final URISyntaxException e) {
			throw new NotificationDAOException("error retrieving query " + queryId, e);
		}
	}
	
	@Override
	public void saveQuery(final NotificationQuery query) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement saveQuery = connection.prepareStatement(SAVE_QUERY);
			saveQuery.setString(1, query.getQueryId());
			saveQuery.setString(2, query.getQueryLanguage());
			saveQuery.setString(3, query.getSourceUri().toString());
			saveQuery.setString(4, query.getQueryString());
			saveQuery.setString(5, query.getQueryLanguage());
			saveQuery.setString(6, query.getSourceUri().toString());
			saveQuery.setString(7, query.getQueryString());
			saveQuery.executeUpdate();
			saveQuery.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
		} catch (final SQLException e) {
			throw new NotificationDAOException("error saving query " + query, e);
		}
	}
	
	@Override
	public void deleteQuery(final String queryId) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement deleteQuery = connection.prepareStatement(DELETE_QUERY);
			deleteQuery.setString(1, queryId);
			deleteQuery.executeUpdate();
			deleteQuery.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
		} catch (final SQLException e) {
			throw new NotificationDAOException("error deleting query " + queryId, e);
		}
	}

	@Override
	public int countSchedules() throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement countSchedules = connection.prepareStatement(COUNT_SCHEDULES);
			final ResultSet resultSet = countSchedules.executeQuery();
			final Integer schedules = resultSet.next() ? resultSet.getInt(1) : null;
			countSchedules.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			if (schedules == null)
				throw new NotificationDAOException("error counting schedules");
			else
				return schedules;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error counting schedules", e);
		}
	}
	
	@Override
	public SortedSet<NotificationSchedule> getSchedules(final int limit, final int offset) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement getSchedules = connection.prepareStatement(GET_SCHEDULES);
			getSchedules.setInt(1, limit);
			getSchedules.setInt(2, offset);
			final ResultSet resultSet = getSchedules.executeQuery();
			final SortedSet<NotificationSchedule> schedules = new TreeSet<NotificationSchedule>();
			while (resultSet.next()) {
				Float triggerThreshold = resultSet.getFloat(2);
				if (resultSet.wasNull())
					triggerThreshold = null;
				schedules.add(new NotificationSchedule(resultSet.getString(1), triggerThreshold, resultSet.getBoolean(3), resultSet.getLong(4), resultSet.getBoolean(5)));
			}
			getSchedules.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			return schedules;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error retrieving schedules (limit: " + limit + ", offset: " + offset + ")", e);
		}
	}
	
	@Override
	public SortedSet<NotificationSchedule> getEnabledSchedules(final int limit, final int offset) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement getEnabledSchedules = connection.prepareStatement(GET_ENABLED_SCHEDULES);
			getEnabledSchedules.setInt(1, limit);
			getEnabledSchedules.setInt(2, offset);
			final ResultSet resultSet = getEnabledSchedules.executeQuery();
			final SortedSet<NotificationSchedule> schedules = new TreeSet<NotificationSchedule>();
			while (resultSet.next()) {
				Float triggerThreshold = resultSet.getFloat(2);
				if (resultSet.wasNull())
					triggerThreshold = null;
				schedules.add(new NotificationSchedule(resultSet.getString(1), triggerThreshold, resultSet.getBoolean(3), resultSet.getLong(4), true));
			}
			getEnabledSchedules.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			return schedules;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error retrieving enabled schedules (limit: " + limit + ", offset: " + offset + ")", e);
		}
	}
	
	@Override
	public NotificationSchedule getSchedule(final String queryId) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement getSchedule = connection.prepareStatement(GET_SCHEDULE);
			getSchedule.setString(1, queryId);
			final ResultSet resultSet = getSchedule.executeQuery();
			NotificationSchedule schedule = null;
			if (resultSet.next()) {
				Float triggerThreshold = resultSet.getFloat(1);
				if (resultSet.wasNull())
					triggerThreshold = null;
				schedule = new NotificationSchedule(queryId, triggerThreshold, resultSet.getBoolean(2), resultSet.getLong(3), resultSet.getBoolean(4));
			}
			getSchedule.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			return schedule;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error retrieving schedule " + queryId, e);
		}
	}
	
	@Override
	public void saveSchedule(final NotificationSchedule schedule) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement saveSchedule = connection.prepareStatement(SAVE_SCHEDULE);
			saveSchedule.setString(1, schedule.getQueryId());
			if (schedule.getTriggerThreshold() == null) {
				saveSchedule.setNull(2, Types.FLOAT);
				saveSchedule.setNull(6, Types.FLOAT);
			} else {
				saveSchedule.setFloat(2, schedule.getTriggerThreshold());
				saveSchedule.setFloat(6, schedule.getTriggerThreshold());
			}
			saveSchedule.setBoolean(3, schedule.isPercentileThreshold());
			saveSchedule.setBoolean(7, schedule.isPercentileThreshold());
			saveSchedule.setLong(4, schedule.getExecutionPeriod());
			saveSchedule.setLong(8, schedule.getExecutionPeriod());
			saveSchedule.setBoolean(5, schedule.isEnabled());
			saveSchedule.setBoolean(9, schedule.isEnabled());
			saveSchedule.executeUpdate();
			saveSchedule.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
		} catch (final SQLException e) {
			throw new NotificationDAOException("error saving schedule " + schedule);
		}
	}
	
	@Override
	public void deleteSchedule(final String queryId) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement deleteSchedule = connection.prepareStatement(DELETE_SCHEDULE);
			deleteSchedule.setString(1, queryId);
			deleteSchedule.executeUpdate();
			deleteSchedule.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
		} catch (final SQLException e) {
			throw new NotificationDAOException("error deleting schedule " + queryId, e);
		}
	}
	
	@Override
	public int countEvents() throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement countEvents = connection.prepareStatement(COUNT_EVENTS);
			final ResultSet resultSet = countEvents.executeQuery();
			final Integer events = resultSet.next() ? resultSet.getInt(1) : null;
			countEvents.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			if (events == null)
				throw new NotificationDAOException("error counting events");
			else
				return events;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error counting events", e);
		}
	}
	
	@Override
	public SortedSet<NotificationEvent> getEvents(final int limit, final int offset) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement getEvents = connection.prepareStatement(GET_EVENTS);
			getEvents.setInt(1, limit);
			getEvents.setInt(2, offset);
			final ResultSet resultSet = getEvents.executeQuery();
			final SortedSet<NotificationEvent> events = new TreeSet<NotificationEvent>();
			while (resultSet.next())
				events.add(new NotificationEvent(resultSet.getString(1), new Date(resultSet.getTimestamp(2).getTime())));
			getEvents.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			return events;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error retrieving events (limit: " + limit + ", offset: " + offset + ")", e);
		}
	}
	
	@Override
	public int countResults() throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement countResults = connection.prepareStatement(COUNT_RESULTS);
			final ResultSet resultSet = countResults.executeQuery();
			final Integer results = resultSet.next() ? resultSet.getInt(1) : null;
			countResults.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			if (results == null)
				throw new NotificationDAOException("error counting results");
			else
				return results;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error counting results", e);
		}
	}
	
	@Override
	public SortedSet<NotificationResult> getResults(final int limit, final int offset) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement getResults = connection.prepareStatement(GET_RESULTS);
			getResults.setInt(1, limit);
			getResults.setInt(2, offset);
			final ResultSet resultSet = getResults.executeQuery();
			final SortedSet<NotificationResult> results = new TreeSet<NotificationResult>();
			while (resultSet.next())
				results.add(new NotificationResult(resultSet.getString(1), new Date(resultSet.getTimestamp(2).getTime()), resultSet.getString(3), resultSet.getString(4), resultSet.getInt(5)));
			getResults.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			return results;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error retrieving results (limit: " + limit + ", offset: " + offset + ")", e);
		}
	}
	
	@Override
	public NotificationResult getResult(final String queryId, final Date date, final String resultId) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement getResult = connection.prepareStatement(GET_RESULT);
			getResult.setString(1, queryId);
			getResult.setTimestamp(2, new Timestamp(date.getTime()));
			getResult.setString(3, resultId);
			final ResultSet resultSet = getResult.executeQuery();
			final NotificationResult result = resultSet.next() ? new NotificationResult(queryId, date, resultId, resultSet.getString(1), resultSet.getInt(2)) : null;
			getResult.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			return result;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error retrieving result " + resultId + " of event " + date + " of query " + queryId, e);
		}
	}
	
	@Override
	public NotificationResult getPreviousResult(final String queryId, final Date date, final String resultId) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement getPreviousResult = connection.prepareStatement(GET_PREVIOUS_RESULT);
			getPreviousResult.setString(1, queryId);
			getPreviousResult.setTimestamp(2, new Timestamp(date.getTime()));
			getPreviousResult.setString(3, resultId);
			final ResultSet resultSet = getPreviousResult.executeQuery();
			final NotificationResult result = resultSet.next() ? new NotificationResult(queryId, resultSet.getTimestamp(1), resultId, resultSet.getString(2), resultSet.getInt(3)) : null;
			getPreviousResult.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			return result;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error retrieving result " + resultId + " of previous event of event " + date + " of query " + queryId, e);
		}
	}

	@Override
	public NotificationResult getLastResult(final String queryId, final String resultId) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement getLastResult = connection.prepareStatement(GET_LAST_RESULT);
			getLastResult.setString(1, queryId);
			getLastResult.setString(2, resultId);
			final ResultSet resultSet = getLastResult.executeQuery();
			final NotificationResult result = resultSet.next() ? new NotificationResult(queryId, resultSet.getTimestamp(1), resultId, resultSet.getString(2), resultSet.getInt(3)) : null;
			getLastResult.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			return result;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error retrieving result " + resultId + " of the last event of query " + queryId, e); 
		}
	}

	@Override
	public void saveResult(final NotificationResult result) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement saveResult = connection.prepareStatement(SAVE_RESULT);
			saveResult.setString(1, result.getQueryId());
			saveResult.setTimestamp(2, new Timestamp(result.getDate().getTime()));
			saveResult.setString(3, result.getResultId());
			saveResult.setString(4, result.getName());
			saveResult.setInt(5, result.getValue());
			saveResult.setString(6, result.getName());
			saveResult.setInt(7, result.getValue());
			saveResult.executeUpdate();
			saveResult.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
		} catch (final SQLException e) {
			throw new NotificationDAOException("error saving result " + result, e);
		}
	}
	
	@Override
	public int countSubscriptions() throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement countSubscriptions = connection.prepareStatement(COUNT_SUBSCRIPTIONS);
			final ResultSet resultSet = countSubscriptions.executeQuery();
			final Integer subscriptions = resultSet.next() ? resultSet.getInt(1) : null;
			countSubscriptions.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			if (subscriptions == null)
				throw new NotificationDAOException("error counting subscriptions");
			else
				return subscriptions;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error counting subscriptions", e);
		}
	}
	
	@Override
	public SortedSet<NotificationSubscription> getSubscriptions(final int limit, final int offset) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement getSubscriptions = connection.prepareStatement(GET_SUBSCRIPTIONS);
			getSubscriptions.setInt(1, limit);
			getSubscriptions.setInt(2, offset);
			final ResultSet resultSet = getSubscriptions.executeQuery();
			final SortedSet<NotificationSubscription> subscriptions = new TreeSet<NotificationSubscription>();
			while (resultSet.next())
				subscriptions.add(new NotificationSubscription(resultSet.getString(1), new URL(resultSet.getString(2)), resultSet.getBoolean(3)));
			getSubscriptions.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			return subscriptions;
		} catch (final MalformedURLException e) {
			throw new NotificationDAOException("error retrieving subscriptions (limit: " + limit + ", offset: " + offset + ")", e);
		} catch (final SQLException e) {
			throw new NotificationDAOException("error retrieving subscriptions (limit: " + limit + ", offset: " + offset + ")", e);
		}
	}
	
	@Override
	public SortedSet<NotificationSubscription> getEnabledSubscriptions(final String queryId, final int limit, final int offset) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement getEnabledSubscriptions = connection.prepareStatement(GET_ENABLED_SUBSCRIPTIONS);
			getEnabledSubscriptions.setString(1, queryId);
			getEnabledSubscriptions.setInt(2, limit);
			getEnabledSubscriptions.setInt(3, offset);
			final ResultSet resultSet = getEnabledSubscriptions.executeQuery();
			final SortedSet<NotificationSubscription> subscriptions = new TreeSet<NotificationSubscription>();
			while (resultSet.next())
				subscriptions.add(new NotificationSubscription(queryId, new URL(resultSet.getString(1)), true));
			getEnabledSubscriptions.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			return subscriptions;
		} catch (final MalformedURLException e) {
			throw new NotificationDAOException("error retrieving enabled subscriptions of query " + queryId + "(limit: " + limit + ", offset: " + offset + ")", e);
		} catch (final SQLException e) {
			throw new NotificationDAOException("error retrieving enabled subscriptions of query " + queryId + "(limit: " + limit + ", offset: " + offset + ")", e);
		}
	}
	
	public NotificationSubscription getSubscription(final String queryId, final URL alertService) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement getSubscription = connection.prepareStatement(GET_SUBSCRIPTION);
			getSubscription.setString(1, queryId);
			getSubscription.setString(2, alertService.toString());
			final ResultSet resultSet = getSubscription.executeQuery();
			final NotificationSubscription subscription = resultSet.next() ? new NotificationSubscription(queryId, alertService, resultSet.getBoolean(1)) : null;
			getSubscription.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
			return subscription;
		} catch (final SQLException e) {
			throw new NotificationDAOException("error retrieving subscription (query: " + queryId + ", alert service: " + alertService + ")", e);
		}
	}
	
	public void saveSubscription(final NotificationSubscription subscription) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement saveSubscription = connection.prepareStatement(SAVE_SUBSCRIPTION);
			saveSubscription.setString(1, subscription.getQueryId());
			saveSubscription.setString(2, subscription.getAlertService().toString());
			saveSubscription.setBoolean(3, subscription.isEnabled());
			saveSubscription.setBoolean(4, subscription.isEnabled());
			saveSubscription.executeUpdate();
			saveSubscription.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
		} catch (final SQLException e) {
			throw new NotificationDAOException("error saving subscription " + subscription, e);
		}
	}
	
	public void deleteSubscription(final String queryId, final URL alertService) throws NotificationDAOException {
		try {
			final Connection connection = DataSourceUtils.getConnection(dataSource);
			final PreparedStatement deleteSubscription = connection.prepareStatement(DELETE_SUBSCRIPTION);
			deleteSubscription.setString(1, queryId);
			deleteSubscription.setString(2, alertService.toString());
			deleteSubscription.executeUpdate();
			deleteSubscription.close();
			DataSourceUtils.releaseConnection(connection, dataSource);
		} catch (final SQLException e) {
			throw new NotificationDAOException("error deleting subscription (query: " + queryId + ", alert service: " + alertService + ")", e);
		}
	}
}
