package eu.dnetlib.lbs.elasticsearch;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.stereotype.Component;

import eu.dnetlib.lbs.properties.ElasticSearchProperties;

@Component
public class EventStatsManager {

	@Autowired
	private ElasticsearchTemplate elasticsearchTemplate;

	@Autowired
	private ElasticSearchProperties elasticSearchProperties;

	private static final Log log = LogFactory.getLog(EventStatsManager.class);

	public class BrowseEntry {

		private final String value;
		private final long count;

		public BrowseEntry(final String value, final long count) {
			this.value = value;
			this.count = count;
		}

		public String getValue() {
			return this.value;
		}

		public long getCount() {
			return this.count;
		}

	}

	public List<BrowseEntry> browseTopics() {

		final String term = "topic";

		final SearchQuery searchQuery = new NativeSearchQueryBuilder()
				.withQuery(QueryBuilders.matchAllQuery())
				.withSearchType(SearchType.DEFAULT)
				.withIndices(this.elasticSearchProperties.getEventsIndexName())
				.withTypes(this.elasticSearchProperties.getEventsIndexType())
				.addAggregation(AggregationBuilders.terms(term).field(term).size(1000).minDocCount(1))
				.build();

		final Aggregations aggregations =
				this.elasticsearchTemplate.query(searchQuery, SearchResponse::getAggregations);

		return ((StringTerms) aggregations.asMap().get(term)).getBuckets()
				.stream()
				.map(b -> new BrowseEntry(b.getKeyAsString(), b.getDocCount()))
				.collect(Collectors.toList());

	}

}
