Skip to content

Commit

Permalink
add TakeOrderedOperator (#2863)
Browse files Browse the repository at this point in the history
---------
Signed-off-by: Heng Qian <[email protected]>
  • Loading branch information
qianheng-aws authored Aug 5, 2024
1 parent 14a80a9 commit 0e70a50
Show file tree
Hide file tree
Showing 14 changed files with 883 additions and 60 deletions.
14 changes: 14 additions & 0 deletions core/src/main/java/org/opensearch/sql/executor/Explain.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.sql.planner.physical.RemoveOperator;
import org.opensearch.sql.planner.physical.RenameOperator;
import org.opensearch.sql.planner.physical.SortOperator;
import org.opensearch.sql.planner.physical.TakeOrderedOperator;
import org.opensearch.sql.planner.physical.ValuesOperator;
import org.opensearch.sql.planner.physical.WindowOperator;
import org.opensearch.sql.storage.TableScanOperator;
Expand Down Expand Up @@ -73,6 +74,19 @@ public ExplainResponseNode visitSort(SortOperator node, Object context) {
ImmutableMap.of("sortList", describeSortList(node.getSortList()))));
}

@Override
public ExplainResponseNode visitTakeOrdered(TakeOrderedOperator node, Object context) {
return explain(
node,
context,
explainNode ->
explainNode.setDescription(
ImmutableMap.of(
"limit", node.getLimit(),
"offset", node.getOffset(),
"sortList", describeSortList(node.getSortList()))));
}

@Override
public ExplainResponseNode visitTableScan(TableScanOperator node, Object context) {
return explain(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.sql.planner.physical.RemoveOperator;
import org.opensearch.sql.planner.physical.RenameOperator;
import org.opensearch.sql.planner.physical.SortOperator;
import org.opensearch.sql.planner.physical.TakeOrderedOperator;
import org.opensearch.sql.planner.physical.ValuesOperator;
import org.opensearch.sql.planner.physical.WindowOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;
Expand Down Expand Up @@ -129,7 +130,13 @@ public PhysicalPlan visitValues(LogicalValues node, C context) {

@Override
public PhysicalPlan visitLimit(LogicalLimit node, C context) {
return new LimitOperator(visitChild(node, context), node.getLimit(), node.getOffset());
PhysicalPlan child = visitChild(node, context);
// Optimize sort + limit to take ordered operator
if (child instanceof SortOperator sortChild) {
return new TakeOrderedOperator(
sortChild.getInput(), node.getLimit(), node.getOffset(), sortChild.getSortList());
}
return new LimitOperator(child, node.getLimit(), node.getOffset());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public static SortOperator sort(PhysicalPlan input, Pair<SortOption, Expression>
return new SortOperator(input, Arrays.asList(sorts));
}

public static TakeOrderedOperator takeOrdered(
PhysicalPlan input, Integer limit, Integer offset, Pair<SortOption, Expression>... sorts) {
return new TakeOrderedOperator(input, limit, offset, Arrays.asList(sorts));
}

public static DedupeOperator dedupe(PhysicalPlan input, Expression... expressions) {
return new DedupeOperator(input, Arrays.asList(expressions));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public R visitSort(SortOperator node, C context) {
return visitNode(node, context);
}

public R visitTakeOrdered(TakeOrderedOperator node, C context) {
return visitNode(node, context);
}

public R visitRareTopN(RareTopNOperator node, C context) {
return visitNode(node, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.opensearch.sql.planner.physical;

import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;

import com.google.common.collect.Ordering;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.utils.ExprValueOrdering;
import org.opensearch.sql.expression.Expression;

public interface SortHelper {

/**
* Construct an expr comparator for sorting on ExprValue.
*
* @param sortList list of sort fields and their related sort options.
* @return A comparator for ExprValue
*/
static Comparator<ExprValue> constructExprComparator(
List<Pair<SortOption, Expression>> sortList) {
return (o1, o2) -> compareWithExpressions(o1, o2, constructComparator(sortList));
}

/**
* Construct an expr ordering for efficiently taking the top-k elements on ExprValue.
*
* @param sortList list of sort fields and their related sort options.
* @return An guava ordering for ExprValue
*/
static Ordering<ExprValue> constructExprOrdering(List<Pair<SortOption, Expression>> sortList) {
return Ordering.from(constructExprComparator(sortList));
}

private static List<Pair<Expression, Comparator<ExprValue>>> constructComparator(
List<Pair<SortOption, Expression>> sortList) {
List<Pair<Expression, Comparator<ExprValue>>> comparators = new ArrayList<>();
for (Pair<SortOption, Expression> pair : sortList) {
SortOption option = pair.getLeft();
ExprValueOrdering ordering =
ASC.equals(option.getSortOrder())
? ExprValueOrdering.natural()
: ExprValueOrdering.natural().reverse();
ordering =
NULL_FIRST.equals(option.getNullOrder()) ? ordering.nullsFirst() : ordering.nullsLast();
comparators.add(Pair.of(pair.getRight(), ordering));
}
return comparators;
}

private static int compareWithExpressions(
ExprValue o1, ExprValue o2, List<Pair<Expression, Comparator<ExprValue>>> comparators) {
for (Pair<Expression, Comparator<ExprValue>> comparator : comparators) {
Expression expression = comparator.getKey();
int result =
comparator
.getValue()
.compare(
expression.valueOf(o1.bindingTuples()), expression.valueOf(o2.bindingTuples()));
if (result != 0) {
return result;
}
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,18 @@

package org.opensearch.sql.planner.physical;

import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;

import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Singular;
import lombok.ToString;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.utils.ExprValueOrdering;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.planner.physical.SortOperator.Sorter.SorterBuilder;

/**
* Sort Operator.The input data is sorted by the sort fields in the {@link SortOperator#sortList}.
Expand All @@ -36,7 +29,7 @@ public class SortOperator extends PhysicalPlan {
@Getter private final PhysicalPlan input;

@Getter private final List<Pair<SortOption, Expression>> sortList;
@EqualsAndHashCode.Exclude private final Sorter sorter;
@EqualsAndHashCode.Exclude private final Comparator<ExprValue> sorter;
@EqualsAndHashCode.Exclude private Iterator<ExprValue> iterator;

/**
Expand All @@ -49,18 +42,7 @@ public class SortOperator extends PhysicalPlan {
public SortOperator(PhysicalPlan input, List<Pair<SortOption, Expression>> sortList) {
this.input = input;
this.sortList = sortList;
SorterBuilder sorterBuilder = Sorter.builder();
for (Pair<SortOption, Expression> pair : sortList) {
SortOption option = pair.getLeft();
ExprValueOrdering ordering =
ASC.equals(option.getSortOrder())
? ExprValueOrdering.natural()
: ExprValueOrdering.natural().reverse();
ordering =
NULL_FIRST.equals(option.getNullOrder()) ? ordering.nullsFirst() : ordering.nullsLast();
sorterBuilder.comparator(Pair.of(pair.getRight(), ordering));
}
this.sorter = sorterBuilder.build();
this.sorter = SortHelper.constructExprComparator(sortList);
}

@Override
Expand Down Expand Up @@ -94,27 +76,6 @@ public ExprValue next() {
return iterator.next();
}

@Builder
public static class Sorter implements Comparator<ExprValue> {
@Singular private final List<Pair<Expression, Comparator<ExprValue>>> comparators;

@Override
public int compare(ExprValue o1, ExprValue o2) {
for (Pair<Expression, Comparator<ExprValue>> comparator : comparators) {
Expression expression = comparator.getKey();
int result =
comparator
.getValue()
.compare(
expression.valueOf(o1.bindingTuples()), expression.valueOf(o2.bindingTuples()));
if (result != 0) {
return result;
}
}
return 0;
}
}

private Iterator<ExprValue> iterator(PriorityQueue<ExprValue> result) {
return new Iterator<ExprValue>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.physical;

import com.google.common.collect.Ordering;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.expression.Expression;

/**
* TakeOrdered Operator. This operator will sort input data as the order of {@link this#sortList}
* specifies and return {@link this#limit} rows from the {@link this#offset} index.
*
* <p>Functionally, this operator is a combination of {@link SortOperator} and {@link
* LimitOperator}. But it can reduce the time complexity from O(nlogn) to O(n), and memory from O(n)
* to O(k) due to use guava {@link com.google.common.collect.Ordering}.
*
* <p>Overall, it's an optimization to replace `Limit(Sort)` in physical plan level since it's all
* about execution. Because most execution engine may not support this operator, it doesn't have a
* related logical operator.
*/
@ToString
@EqualsAndHashCode(callSuper = false)
public class TakeOrderedOperator extends PhysicalPlan {
@Getter private final PhysicalPlan input;

@Getter private final List<Pair<SortOption, Expression>> sortList;
@Getter private final Integer limit;
@Getter private final Integer offset;
@EqualsAndHashCode.Exclude private final Ordering<ExprValue> ordering;
@EqualsAndHashCode.Exclude private Iterator<ExprValue> iterator;

/**
* TakeOrdered Operator Constructor.
*
* @param input input {@link PhysicalPlan}
* @param limit the limit value from LimitOperator
* @param offset the offset value from LimitOperator
* @param sortList list of sort field from SortOperator
*/
public TakeOrderedOperator(
PhysicalPlan input,
Integer limit,
Integer offset,
List<Pair<SortOption, Expression>> sortList) {
this.input = input;
this.sortList = sortList;
this.limit = limit;
this.offset = offset;
this.ordering = SortHelper.constructExprOrdering(sortList);
}

@Override
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitTakeOrdered(this, context);
}

@Override
public void open() {
super.open();
iterator = ordering.leastOf(input, offset + limit).stream().skip(offset).iterator();
}

@Override
public List<PhysicalPlan> getChild() {
return Collections.singletonList(input);
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public ExprValue next() {
return iterator.next();
}
}
21 changes: 21 additions & 0 deletions core/src/test/java/org/opensearch/sql/executor/ExplainTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.remove;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.rename;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.sort;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.takeOrdered;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.values;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.window;

Expand Down Expand Up @@ -220,6 +221,26 @@ void can_explain_limit() {
explain.apply(plan));
}

@Test
void can_explain_takeOrdered() {
Pair<Sort.SortOption, Expression> sort =
ImmutablePair.of(Sort.SortOption.DEFAULT_ASC, ref("a", INTEGER));
PhysicalPlan plan = takeOrdered(tableScan, 10, 5, sort);
assertEquals(
new ExplainResponse(
new ExplainResponseNode(
"TakeOrderedOperator",
Map.of(
"limit",
10,
"offset",
5,
"sortList",
Map.of("a", Map.of("sortOrder", "ASC", "nullOrder", "NULL_FIRST"))),
singletonList(tableScan.explainNode()))),
explain.apply(plan));
}

@Test
void can_explain_nested() {
Set<String> nestedOperatorArgs = Set.of("message.info", "message");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,30 @@ public void visitPaginate_should_remove_it_from_tree() {
new ProjectOperator(new ValuesOperator(List.of(List.of())), List.of(), List.of());
assertEquals(physicalPlanTree, logicalPlanTree.accept(implementor, null));
}

@Test
public void visitLimit_support_return_takeOrdered() {
// replace SortOperator + LimitOperator with TakeOrderedOperator
Pair<Sort.SortOption, Expression> sort =
ImmutablePair.of(Sort.SortOption.DEFAULT_ASC, ref("a", INTEGER));
var logicalValues = values(emptyList());
var logicalSort = sort(logicalValues, sort);
var logicalLimit = limit(logicalSort, 10, 5);
PhysicalPlan physicalPlanTree =
PhysicalPlanDSL.takeOrdered(PhysicalPlanDSL.values(emptyList()), 10, 5, sort);
assertEquals(physicalPlanTree, logicalLimit.accept(implementor, null));

// don't replace if LimitOperator's child is not SortOperator
Pair<ReferenceExpression, Expression> newEvalField =
ImmutablePair.of(ref("name1", STRING), ref("name", STRING));
var logicalEval = eval(logicalSort, newEvalField);
logicalLimit = limit(logicalEval, 10, 5);
physicalPlanTree =
PhysicalPlanDSL.limit(
PhysicalPlanDSL.eval(
PhysicalPlanDSL.sort(PhysicalPlanDSL.values(emptyList()), sort), newEvalField),
10,
5);
assertEquals(physicalPlanTree, logicalLimit.accept(implementor, null));
}
}
Loading

0 comments on commit 0e70a50

Please sign in to comment.