package eu.dnetlib.lbs.openaire;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.nested.InternalNested;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.google.gson.Gson;

import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.lbs.LiteratureBrokerServiceConfiguration;
import eu.dnetlib.lbs.controllers.AbstractLbsController;
import eu.dnetlib.lbs.elasticsearch.Event;
import eu.dnetlib.lbs.elasticsearch.EventRepository;
import eu.dnetlib.lbs.elasticsearch.Notification;
import eu.dnetlib.lbs.elasticsearch.NotificationRepository;
import eu.dnetlib.lbs.properties.ElasticSearchProperties;
import eu.dnetlib.lbs.subscriptions.MapCondition;
import eu.dnetlib.lbs.subscriptions.Subscription;
import eu.dnetlib.lbs.subscriptions.SubscriptionRepository;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;

@RestController
@RequestMapping("/api/openaireBroker")
@Api(tags = LiteratureBrokerServiceConfiguration.TAG_OPENAIRE)
public class OpenaireBrokerController extends AbstractLbsController {

	private static final int SCROLL_TIMEOUT_IN_MILLIS = 5 * 60 * 1000;

	private static final int SCROLL_PAGE_SIZE = 100;

	@Autowired
	private ElasticsearchTemplate elasticsearchTemplate;

	@Autowired
	private EventRepository eventRepository;

	@Autowired
	private NotificationRepository notificationRepository;

	@Autowired
	private SubscriptionRepository subscriptionRepo;

	@Autowired
	private ElasticSearchProperties props;

	private static final Log log = LogFactory.getLog(OpenaireBrokerController.class);

	@ApiOperation("Return the datasources having events")
	@RequestMapping(value = "/datasources", method = RequestMethod.GET)
	public List<BrowseEntry> findDatasourcesWithEvents() {

		final SearchQuery searchQuery = new NativeSearchQueryBuilder()
				.withQuery(QueryBuilders.matchAllQuery())
				.withSearchType(SearchType.DEFAULT)
				.withIndices(props.getEventsIndexName())
				.withTypes(props.getEventsIndexType())
				.addAggregation(AggregationBuilders.nested("nested", "map")
						// .path("map")
						.subAggregation(AggregationBuilders.terms("by_map").field("map.target_datasource_name").size(1000).minDocCount(1)))
				.build();

		final Aggregations aggregations =
				elasticsearchTemplate.query(searchQuery, SearchResponse::getAggregations);

		final Aggregation aggByMap = ((InternalNested) aggregations.asMap().get("nested")).getAggregations().asMap().get("by_map");
		return ((StringTerms) aggByMap).getBuckets()
				.stream()
				.map(b -> new BrowseEntry(b.getKeyAsString(), b.getDocCount()))
				.collect(Collectors.toList());
	}

	@ApiOperation("Return the topics of the events of a datasource")
	@RequestMapping(value = "/topicsForDatasource", method = RequestMethod.GET)
	public List<BrowseEntry> findTopicsForDatasource(@RequestParam final String ds) {

		final SearchQuery searchQuery = new NativeSearchQueryBuilder()
				.withQuery(QueryBuilders.nestedQuery("map", QueryBuilders.matchQuery("map.target_datasource_name", ds), ScoreMode.None))
				.withSearchType(SearchType.DEFAULT)
				.withIndices(props.getEventsIndexName())
				.withTypes(props.getEventsIndexType())
				.addAggregation(AggregationBuilders.terms("topic").field("topic").size(1000).minDocCount(1))
				.build();

		final Aggregations aggregations =
				elasticsearchTemplate.query(searchQuery, SearchResponse::getAggregations);

		return ((StringTerms) aggregations.asMap().get("topic")).getBuckets()
				.stream()
				.map(b -> new BrowseEntry(b.getKeyAsString(), b.getDocCount()))
				.collect(Collectors.toList());
	}

	@ApiOperation("Return a page of events of a datasource (by topic)")
	@RequestMapping(value = "/events/{nPage}/{size}", method = RequestMethod.GET)
	public EventsPage showEvents(@RequestParam final String ds, @RequestParam final String topic, @PathVariable final int nPage, @PathVariable final int size) {

		final SearchQuery searchQuery = new NativeSearchQueryBuilder()
				.withQuery(QueryBuilders.boolQuery()
						.must(QueryBuilders.matchQuery("topic", topic))
						.must(QueryBuilders.nestedQuery("map", QueryBuilders.matchQuery("map.target_datasource_name", ds), ScoreMode.None)))
				.withSearchType(SearchType.DEFAULT)
				.withIndices(props.getEventsIndexName())
				.withTypes(props.getEventsIndexType())
				.withFields("payload")
				.withPageable(PageRequest.of(nPage, size))
				.build();

		final Page<Event> page = eventRepository.search(searchQuery);

		final List<OpenAireEventPayload> list = page.getContent().stream()
				.map(Event::getPayload)
				.map(OpenAireEventPayload::fromJSON)
				.collect(Collectors.toList());

		return new EventsPage(ds, topic, nPage, overrideGetTotalPage(page, size), page.getTotalElements(), list);
	}

	@ApiOperation("Return a page of events of a datasource (by query)")
	@RequestMapping(value = "/events/{nPage}/{size}", method = RequestMethod.POST)
	public EventsPage advancedShowEvents(@PathVariable final int nPage, @PathVariable final int size, @RequestBody final AdvQueryObject qObj) {

		final BoolQueryBuilder mapQuery = QueryBuilders.boolQuery();
		ElasticSearchQueryUtils.addMapCondition(mapQuery, "map.target_datasource_name", qObj.getDatasource());
		ElasticSearchQueryUtils.addMapCondition(mapQuery, "map.target_publication_title", qObj.getTitles());
		ElasticSearchQueryUtils.addMapCondition(mapQuery, "map.target_publication_author_list", qObj.getAuthors());
		ElasticSearchQueryUtils.addMapCondition(mapQuery, "map.target_publication_subject_list", qObj.getSubjects());
		ElasticSearchQueryUtils.addMapConditionForTrust(mapQuery, "map.trust", qObj.getTrust());
		ElasticSearchQueryUtils.addMapConditionForDates(mapQuery, "map.target_dateofacceptance", qObj.getDates());

		final SearchQuery searchQuery = new NativeSearchQueryBuilder()
				.withQuery(QueryBuilders.boolQuery()
						.must(QueryBuilders.matchQuery("topic", qObj.getTopic()))
						.must(QueryBuilders.nestedQuery("map", mapQuery, ScoreMode.None)))
				.withSearchType(SearchType.DEFAULT)
				.withIndices(props.getEventsIndexName())
				.withTypes(props.getEventsIndexType())
				.withFields("payload")
				.withPageable(PageRequest.of(nPage, size))
				.build();

		final Page<Event> page = eventRepository.search(searchQuery);

		final List<OpenAireEventPayload> list = page.getContent().stream()
				.map(Event::getPayload)
				.map(OpenAireEventPayload::fromJSON)
				.collect(Collectors.toList());

		return new EventsPage(qObj.getDatasource(), qObj.getTopic(), nPage, overrideGetTotalPage(page, size), page.getTotalElements(), list);
	}

	@ApiOperation("Perform a subscription")
	@RequestMapping(value = "/subscribe", method = RequestMethod.POST)
	public Subscription registerSubscription(@RequestBody final OpenaireSubscription oSub) {
		final Subscription sub = oSub.asSubscription();

		subscriptionRepo.save(sub);

		return sub;
	}

	@ApiOperation("Return the subscriptions of an user (by email)")
	@RequestMapping(value = "/subscriptions", method = RequestMethod.GET)
	public Map<String, List<SimpleSubscriptionDesc>> subscriptions(@RequestParam final String email) {
		final Iterable<Subscription> iter = subscriptionRepo.findBySubscriber(email);
		return StreamSupport.stream(iter.spliterator(), false)
				.map(this::subscriptionDesc)
				.collect(Collectors.groupingBy(SimpleSubscriptionDesc::getDatasource));
	}

	@ApiOperation("Return a page of notifications")
	@RequestMapping(value = "/notifications/{subscrId}/{nPage}/{size}", method = RequestMethod.GET)
	public EventsPage notifications(@PathVariable final String subscrId, @PathVariable final int nPage, @PathVariable final int size) {

		final Optional<Subscription> optSub = subscriptionRepo.findById(subscrId);

		if (optSub.isPresent()) {
			final Subscription sub = optSub.get();

			final SearchQuery searchQuery = new NativeSearchQueryBuilder()
					.withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId))
					.withSearchType(SearchType.DEFAULT)
					.withIndices(props.getNotificationsIndexName())
					.withTypes(props.getNotificationsIndexType())
					.withFields("payload")
					.withPageable(PageRequest.of(nPage, size))
					.build();

			final Page<Notification> page = notificationRepository.search(searchQuery);

			final List<OpenAireEventPayload> list = page.getContent().stream()
					.map(Notification::getPayload)
					.map(OpenAireEventPayload::fromJSON)
					.collect(Collectors.toList());

			return new EventsPage(extractDatasource(sub), sub.getTopic(), nPage, overrideGetTotalPage(page, size), page.getTotalElements(), list);
		} else {
			log.warn("Invalid subscription: " + subscrId);
			return new EventsPage("", "", nPage, 0, 0, new ArrayList<>());
		}

	}

	@ApiOperation("Returns notifications using scrolls (first page)")
	@RequestMapping(value = "/scroll/notifications/start/{subscrId}", method = RequestMethod.GET)
	public ScrollPage prepareScrollNotifications(@PathVariable final String subscrId) {

		final Optional<Subscription> optSub = subscriptionRepo.findById(subscrId);

		if (optSub.isPresent()) {
			final QueryBuilder searchQuery = new NativeSearchQueryBuilder()
					.withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId))
					.withSearchType(SearchType.DEFAULT)
					.withIndices(props.getNotificationsIndexName())
					.withTypes(props.getNotificationsIndexType())
					.withFields("payload")
					.build()
					.getQuery();

			final SearchResponse scrollResp = elasticsearchTemplate.getClient()
					.prepareSearch(props.getNotificationsIndexName())
					.setScroll(new TimeValue(SCROLL_TIMEOUT_IN_MILLIS)) // 5 minutes
					.setQuery(searchQuery)
					.setSize(SCROLL_PAGE_SIZE)
					.get();

			final List<OpenAireEventPayload> values = calculateEventPayloads(scrollResp);

			return new ScrollPage(scrollResp.getScrollId(), values.isEmpty() || scrollResp.getScrollId() == null, values);
		} else {
			log.warn("Invalid subscription: " + subscrId);
			return new ScrollPage();
		}

	}

	@ApiOperation("Returns notifications using scrolls (other pages)")
	@RequestMapping(value = "/scroll/notifications/{scrollId}", method = RequestMethod.GET)
	public ScrollPage scrollNotifications(@PathVariable final String scrollId) {
		final SearchResponse scrollResp =
				elasticsearchTemplate.getClient().prepareSearchScroll(scrollId).setScroll(new TimeValue(SCROLL_TIMEOUT_IN_MILLIS)).execute().actionGet();
		final List<OpenAireEventPayload> values = calculateEventPayloads(scrollResp);
		return new ScrollPage(scrollResp.getScrollId(), values.isEmpty() || scrollResp.getScrollId() == null, values);
	}

	private List<OpenAireEventPayload> calculateEventPayloads(final SearchResponse scrollResp) {
		if (scrollResp.getHits().getHits().length > 0) {
			final Gson gson = new Gson();
			return Arrays.stream(scrollResp.getHits().getHits())
					.map(hit -> hit.getSource().get("payload").toString())
					.map(s -> gson.fromJson(s, OpenAireEventPayload.class))
					.collect(Collectors.toList());
		} else {
			return new ArrayList<>();
		}

	}

	private SimpleSubscriptionDesc subscriptionDesc(final Subscription s) {
		return new SimpleSubscriptionDesc(s.getSubscriptionId(), extractDatasource(s), s.getTopic(), s.getCreationDate(), s.getLastNotificationDate(),
				OpenaireBrokerController.this.notificationRepository.countBySubscriptionId(s.getSubscriptionId()));
	}

	private String extractDatasource(final Subscription sub) {
		return sub.getConditionsAsList().stream()
				.filter(c -> c.getField().equals("target_datasource_name"))
				.map(MapCondition::getListParams)
				.filter(l -> !l.isEmpty())
				.map(l -> l.get(0).getValue())
				.findFirst()
				.get();
	}

	private long overrideGetTotalPage(final Page<?> page, final int size) {
		return (page.getTotalElements() + size - 1) / size;
	}

}
