package eu.dnetlib.lbs.utils;

import java.util.Date;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.google.gson.Gson;

import eu.dnetlib.lbs.elasticsearch.Event;
import eu.dnetlib.lbs.events.input.EventMessage;

public class JsonMessageToEventFunction implements Function<String, Event> {

	private final Gson gson = new Gson();

	private static final Log log = LogFactory.getLog(JsonMessageToEventFunction.class);

	@Override
	public Event apply(final String json) {
		try {
			final EventMessage msg = gson.fromJson(json, EventMessage.class);
			if (msg == null || msg.hasNulls()) {
				log.warn("Message is null or some required field is null");
				return null;
			}

			final long now = new Date().getTime();

			final Event event = new Event();
			event.setEventId("event-" + DigestUtils.md5Hex(msg.getTopic()).substring(0, 6) + "-" + calculatePayloadHash(msg.getPayload()));
			event.setProducerId(msg.getProducerId());
			event.setPayload(msg.getPayload());
			event.setMap(msg.getMap()
					.entrySet()
					.stream()
					.filter(e -> e.getValue().asObject() != null)
					.collect(Collectors.toMap(
							e -> e.getKey(),
							e -> e.getValue().asObject())));
			event.setTopic(msg.getTopic());
			event.setCreationDate(now);
			event.setExpiryDate(calculateExpiryDate(now, msg.getTthDays()));
			event.setInstantMessage(msg.getTthDays() == 0);
			return event;
		} catch (final Throwable e) {
			log.warn("Error converting json: " + json, e);
			return null;
		}
	}

	private Long calculateExpiryDate(final long now, final int ttlDays) {
		if (ttlDays < 0) {
			return null;
		} else if (ttlDays == 0) {
			return now;
		} else {
			final long duration = ttlDays * 24 * 60 * 60 * 1000;
			return now + duration;
		}
	}

	private static String calculatePayloadHash(final String payload) {
		// TODO : the payload should be normalized before hashing it
		return DigestUtils.md5Hex(payload);
	}

}
