package eu.dnetlib.enabling.is.sn;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import javax.xml.ws.wsaddressing.W3CEndpointReference;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * Asynchronous but in-order delivery of notifications.
 * 
 * @author marko
 * 
 */
public class AsynchronousNotificationSenderImpl extends AbstractNotificationSender implements Runnable { // NOPMD
	/**
	 * logger.
	 */
	private static final Log log = LogFactory.getLog(AsynchronousNotificationSenderImpl.class); // NOPMD by marko on 11/24/08 5:02 PM

	/**
	 * Encapsulates an notification job.
	 * 
	 * @author marko
	 *
	 */
	class NotificationJob {
		/**
		 * notification destination.
		 */
		private final transient W3CEndpointReference destination;
		
		/**
		 * notification message. 
		 */
		private final transient NotificationMessage message;

		/**
		 * construct a new notification job.
		 * 
		 * @param destination destination
		 * @param message message
		 */
		public NotificationJob(final W3CEndpointReference destination, final NotificationMessage message) {
			super();
			this.destination = destination;
			this.message = message;
		}

		public W3CEndpointReference getDestination() {
			return destination;
		}

		public NotificationMessage getMessage() {
			return message;
		}

	}

	/**
	 * job queue.
	 */
	private BlockingQueue<NotificationJob> jobQueue = new LinkedBlockingQueue<NotificationJob>();

	/**
	 * {@inheritDoc}
	 * 
	 * @see eu.dnetlib.enabling.is.sn.NotificationSender#send(javax.xml.ws.wsaddressing.W3CEndpointReference,
	 *      eu.dnetlib.enabling.is.sn.NotificationMessage)
	 */
	@Override
	public void send(final W3CEndpointReference destination, final NotificationMessage message) {
		log.debug("queuing asynchronous notification");
		try {
			jobQueue.put(new NotificationJob(destination, message));
		} catch (InterruptedException e) {
			log.warn("possibly lost notification", e);
		}
	}

	/**
	 * start this notification sender (called by spring lifecycle).
	 */
	void start() {
		new Thread(this).start(); // NOPMD
	}

	/** 
	 * {@inheritDoc}
	 * @see java.lang.Runnable#run()
	 */
	@Override
	public void run() {
		while (true) {
			try {
				final NotificationJob job = jobQueue.take();

				try {
					getInvoker().send(job.getDestination(), job.getMessage(), 0);
				} catch (javax.xml.ws.soap.SOAPFaultException t) {
					log.fatal("error sending notification to " + job.getDestination().toString(), t);
				}
			} catch (InterruptedException e) {
				log.warn("possibly lost notification", e);
			}
		}
	}

	public BlockingQueue<NotificationJob> getJobQueue() {
		return jobQueue;
	}

	public void setJobQueue(final BlockingQueue<NotificationJob> jobQueue) {
		this.jobQueue = jobQueue;
	}

}
