Skip to content

Commit

Permalink
Generating proper aggregation sort for if multiple pivots.
Browse files Browse the repository at this point in the history
  • Loading branch information
dennisoelkers committed Nov 13, 2024
1 parent 473e747 commit 37a3716
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.graylog.plugins.views.search.searchtypes.pivot.SortSpec;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.search.aggregations.Aggregation;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.search.aggregations.AggregationBuilder;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.search.aggregations.AggregationBuilders;
import org.graylog.shaded.elasticsearch7.org.elasticsearch.search.aggregations.BucketOrder;
import org.graylog.storage.elasticsearch7.views.ESGeneratedQueryContext;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -46,12 +48,24 @@ protected void record(ESGeneratedQueryContext queryContext, Pivot pivot, PivotSp
aggTypes(queryContext, pivot).record(spec, name, aggregationClass);
}

protected List<BucketOrder> orderListForPivot(Pivot pivot, ESGeneratedQueryContext esGeneratedQueryContext, BucketOrder defaultOrder) {
public record SortOrders(List<BucketOrder> orders, List<AggregationBuilder> subAggregations) {}

protected SortOrders orderListForPivot(Pivot pivot, ESGeneratedQueryContext esGeneratedQueryContext, BucketOrder defaultOrder) {
final List<AggregationBuilder> subaggregations = new ArrayList<>();
final List<BucketOrder> ordering = pivot.sort()
.stream()
.map(sortSpec -> {
if (sortSpec instanceof PivotSort) {
return BucketOrder.key(sortSpec.direction().equals(SortSpec.Direction.Ascending));
final var isAscending = sortSpec.direction().equals(SortSpec.Direction.Ascending);
if (sortSpec instanceof PivotSort pivotSort) {
if (isSortOnNumericPivotField(pivot, pivotSort, esGeneratedQueryContext)) {
/* When we sort on a numeric pivot field, we create a metric sub-aggregation for that field, which returns
the numeric value of it, so that we can sort on it numerically. Any metric aggregation (min/max/avg) will work. */
final var aggregationName = "sort_helper" + pivotSort.field();
subaggregations.add(AggregationBuilders.max(aggregationName).field(pivotSort.field()));
return BucketOrder.aggregation(aggregationName, isAscending);
} else {
return BucketOrder.key(isAscending);
}
}
if (sortSpec instanceof SeriesSort) {
final Optional<SeriesSpec> matchingSeriesSpec = pivot.series()
Expand All @@ -61,14 +75,14 @@ protected List<BucketOrder> orderListForPivot(Pivot pivot, ESGeneratedQueryConte
return matchingSeriesSpec
.map(seriesSpec -> {
if (seriesSpec.literal().equals("count()")) {
return BucketOrder.count(sortSpec.direction().equals(SortSpec.Direction.Ascending));
return BucketOrder.count(isAscending);
}

String orderPath = seriesSpec.statsSubfieldName()
.map(subField -> esGeneratedQueryContext.seriesName(seriesSpec, pivot) + "." + subField)
.orElse(esGeneratedQueryContext.seriesName(seriesSpec, pivot));

return BucketOrder.aggregation(orderPath, sortSpec.direction().equals(SortSpec.Direction.Ascending));
return BucketOrder.aggregation(orderPath, isAscending);
})
.orElse(null);
}
Expand All @@ -77,7 +91,19 @@ protected List<BucketOrder> orderListForPivot(Pivot pivot, ESGeneratedQueryConte
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
return ordering.isEmpty() ? List.of(defaultOrder) : ordering;
return ordering.isEmpty()
? new SortOrders(List.of(defaultOrder), List.of())
: new SortOrders(ordering, List.copyOf(subaggregations));
}

private boolean isSortOnNumericPivotField(Pivot pivot, PivotSort pivotSort, ESGeneratedQueryContext queryContext) {
return queryContext.fieldType(pivot.effectiveStreams(), pivotSort.field())
.filter(this::isNumericFieldType)
.isPresent();
}

private boolean isNumericFieldType(String fieldType) {
return fieldType.equals("long") || fieldType.equals("double") || fieldType.equals("float");
}

public abstract Stream<PivotBucket> extractBuckets(Pivot pivot, BucketSpec bucketSpec, PivotBucket initialBucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.graylog.plugins.views.search.searchtypes.pivot.SortSpec;
import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.Aggregation;
import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.AggregationBuilder;
import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.AggregationBuilders;
import org.graylog.shaded.opensearch2.org.opensearch.search.aggregations.BucketOrder;
import org.graylog.storage.opensearch2.views.OSGeneratedQueryContext;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -46,12 +48,24 @@ protected void record(OSGeneratedQueryContext queryContext, Pivot pivot, PivotSp
aggTypes(queryContext, pivot).record(spec, name, aggregationClass);
}

protected List<BucketOrder> orderListForPivot(Pivot pivot, OSGeneratedQueryContext queryContext, BucketOrder defaultOrder) {
public record SortOrders(List<BucketOrder> orders, List<AggregationBuilder> subAggregations) {}

protected SortOrders orderListForPivot(Pivot pivot, OSGeneratedQueryContext queryContext, BucketOrder defaultOrder) {
final List<AggregationBuilder> subaggregations = new ArrayList<>();
final List<BucketOrder> ordering = pivot.sort()
.stream()
.map(sortSpec -> {
if (sortSpec instanceof PivotSort) {
return BucketOrder.key(sortSpec.direction().equals(SortSpec.Direction.Ascending));
final var isAscending = sortSpec.direction().equals(SortSpec.Direction.Ascending);
if (sortSpec instanceof PivotSort pivotSort) {
if (isSortOnNumericPivotField(pivot, pivotSort, queryContext)) {
/* When we sort on a numeric pivot field, we create a metric sub-aggregation for that field, which returns
the numeric value of it, so that we can sort on it numerically. Any metric aggregation (min/max/avg) will work. */
final var aggregationName = "sort_helper" + pivotSort.field();
subaggregations.add(AggregationBuilders.max(aggregationName).field(pivotSort.field()));
return BucketOrder.aggregation(aggregationName, isAscending);
} else {
return BucketOrder.key(isAscending);
}
}
if (sortSpec instanceof SeriesSort) {
final Optional<SeriesSpec> matchingSeriesSpec = pivot.series()
Expand All @@ -61,13 +75,13 @@ protected List<BucketOrder> orderListForPivot(Pivot pivot, OSGeneratedQueryConte
return matchingSeriesSpec
.map(seriesSpec -> {
if (seriesSpec.literal().equals("count()")) {
return BucketOrder.count(sortSpec.direction().equals(SortSpec.Direction.Ascending));
return BucketOrder.count(isAscending);
}
String orderPath = seriesSpec.statsSubfieldName()
.map(subField -> queryContext.seriesName(seriesSpec, pivot) + "." + subField)
.orElse(queryContext.seriesName(seriesSpec, pivot));

return BucketOrder.aggregation(orderPath, sortSpec.direction().equals(SortSpec.Direction.Ascending));
return BucketOrder.aggregation(orderPath, isAscending);
})
.orElse(null);
}
Expand All @@ -76,7 +90,19 @@ protected List<BucketOrder> orderListForPivot(Pivot pivot, OSGeneratedQueryConte
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
return ordering.isEmpty() ? List.of(defaultOrder) : ordering;
return ordering.isEmpty()
? new SortOrders(List.of(defaultOrder), List.of())
: new SortOrders(ordering, List.copyOf(subaggregations));
}

private boolean isSortOnNumericPivotField(Pivot pivot, PivotSort pivotSort, OSGeneratedQueryContext queryContext) {
return queryContext.fieldType(pivot.effectiveStreams(), pivotSort.field())
.filter(this::isNumericFieldType)
.isPresent();
}

private boolean isNumericFieldType(String fieldType) {
return fieldType.equals("long") || fieldType.equals("double") || fieldType.equals("float");
}

public abstract Stream<PivotBucket> extractBuckets(Pivot pivot, BucketSpec bucketSpecs, PivotBucket previousBucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.graylog2.plugin.indexer.searches.timeranges.TimeRange;

import javax.annotation.Nonnull;
import java.util.List;
import java.util.stream.Stream;

public class OSTimeHandler extends OSPivotBucketSpecHandler<Time> {
Expand Down Expand Up @@ -72,12 +71,14 @@ && isAllMessages(timerange)) {
} else {
for (String timeField : timeSpec.fields()) {
final DateHistogramInterval dateHistogramInterval = new DateHistogramInterval(interval.toDateInterval(query.effectiveTimeRange(pivot)).toString());
final List<BucketOrder> ordering = orderListForPivot(pivot, queryContext, defaultOrder);
final var ordering = orderListForPivot(pivot, queryContext, defaultOrder);
final DateHistogramAggregationBuilder builder = AggregationBuilders.dateHistogram(name)
.field(timeField)
.order(ordering)
.order(ordering.orders())
.format(DATE_TIME_FORMAT);

ordering.subAggregations().forEach(builder::subAggregation);

setInterval(builder, dateHistogramInterval);

if (root == null && leaf == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.graylog.plugins.views.search.aggregations.MissingBucketConstants;
import org.graylog.plugins.views.search.searchtypes.pivot.BucketSpec;
import org.graylog.plugins.views.search.searchtypes.pivot.Pivot;
import org.graylog.plugins.views.search.searchtypes.pivot.PivotSort;
import org.graylog.plugins.views.search.searchtypes.pivot.SortSpec;
import org.graylog.plugins.views.search.searchtypes.pivot.buckets.Values;
import org.graylog.plugins.views.search.searchtypes.pivot.buckets.ValuesBucketOrdering;
import org.graylog.shaded.opensearch2.org.opensearch.index.query.BoolQueryBuilder;
Expand All @@ -46,7 +44,6 @@
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -61,12 +58,14 @@ public class OSValuesHandler extends OSPivotBucketSpecHandler<Values> {
@Nonnull
@Override
public CreatedAggregations<AggregationBuilder> doCreateAggregation(Direction direction, String name, Pivot pivot, Values bucketSpec, OSGeneratedQueryContext queryContext, Query query) {
final List<BucketOrder> ordering = orderListForPivot(pivot, queryContext, DEFAULT_ORDER);
final var ordering = orderListForPivot(pivot, queryContext, DEFAULT_ORDER);
final int limit = bucketSpec.limit();
final List<String> orderedBuckets = ValuesBucketOrdering.orderFields(bucketSpec.fields(), pivot.sort());
final var termsAggregation = createTerms(orderedBuckets, limit);

applyOrdering(pivot, termsAggregation, ordering, queryContext);
//applyOrdering(pivot, termsAggregation, ordering, queryContext);
termsAggregation.order(ordering.orders());
ordering.subAggregations().forEach(termsAggregation::subAggregation);

final FiltersAggregationBuilder filterAggregation = createFilter(name, orderedBuckets, bucketSpec.skipEmptyValues())
.subAggregation(termsAggregation);
Expand Down Expand Up @@ -105,32 +104,6 @@ private Script scriptForPivots(Collection<String> pivots) {
return new Script(scriptSource);
}

private TermsAggregationBuilder applyOrdering(Pivot pivot, TermsAggregationBuilder terms, List<BucketOrder> ordering, OSGeneratedQueryContext queryContext) {
return sortsOnNumericPivotField(pivot, queryContext)
/* When we sort on a numeric pivot field, we create a metric sub-aggregation for that field, which returns
the numeric value of it, so that we can sort on it numerically. Any metric aggregation (min/max/avg) will work. */
.map(pivotSort -> terms
.subAggregation(AggregationBuilders.max(SORT_HELPER).field(pivotSort.field()))
.order(BucketOrder.aggregation(SORT_HELPER, SortSpec.Direction.Ascending.equals(pivotSort.direction()))))
.orElseGet(() -> terms
.order(ordering.isEmpty() ? List.of(DEFAULT_ORDER) : ordering));
}

private Optional<PivotSort> sortsOnNumericPivotField(Pivot pivot, OSGeneratedQueryContext queryContext) {
return Optional.ofNullable(pivot.sort())
.filter(sorts -> sorts.size() == 1)
.map(sorts -> sorts.get(0))
.filter(sort -> sort instanceof PivotSort)
.map(sort -> (PivotSort) sort)
.filter(pivotSort -> queryContext.fieldType(pivot.effectiveStreams(), pivotSort.field())
.filter(this::isNumericFieldType)
.isPresent());
}

private boolean isNumericFieldType(String fieldType) {
return fieldType.equals("long") || fieldType.equals("double") || fieldType.equals("float");
}

@Override
public Stream<PivotBucket> extractBuckets(Pivot pivot, BucketSpec bucketSpecs, PivotBucket initialBucket) {
var values = (Values) bucketSpecs;
Expand Down

0 comments on commit 37a3716

Please sign in to comment.