Skip to content

Commit

Permalink
add ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
kasakrisz committed Oct 14, 2024
1 parent 32f6870 commit cd816a0
Showing 1 changed file with 13 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,16 @@

import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelDistribution.Type;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.model.MPartition;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.io.AcidUtils.Operation;
import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortExchange;
import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter;
import org.apache.hadoop.hive.ql.optimizer.calcite.translator.opconventer.HiveOpConverter.OpAttr;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;

import static java.util.Arrays.asList;
import org.apache.hadoop.hive.ql.util.DirectionUtils;
import org.apache.hadoop.hive.ql.util.NullOrdering;

class HiveSortExchangeVisitor extends HiveRelNodeVisitor<HiveSortExchange> {
HiveSortExchangeVisitor(HiveOpConverter hiveOpConverter) {
Expand Down Expand Up @@ -70,38 +61,33 @@ OpAttr visit(HiveSortExchange exchangeRel) throws SemanticException {
exchangeRel.getCluster().getRexBuilder().makeInputRef(exchangeRel.getInput(), index),
exchangeRel.getInput(), inputOpAf.tabAlias, inputOpAf.vcolsInCalcite));
}
// for (int i = 0; i < distribution.getKeys().size(); i++) {
// int key = distribution.getKeys().get(i);
// ColumnInfo colInfo = inputOpAf.inputs.get(0).getSchema().getSignature().get(key);
// ExprNodeDesc column = new ExprNodeColumnDesc(colInfo);
// distributeKeys[i] = column;
// }
} else if (distribution.getType() != Type.ANY) {
throw new SemanticException("Only hash distribution supported for HiveSortExchange");
} else {
partitionKeyList = new ArrayList<>(0);
}
ExprNodeDesc[] expressions = new ExprNodeDesc[exchangeRel.getKeys().size()];
for (int index = 0; index < exchangeRel.getKeys().size(); index++) {
StringBuilder order = new StringBuilder();
StringBuilder nullOrder = new StringBuilder();
for (int index = 0; index < exchangeRel.getCollation().getFieldCollations().size(); index++) {
RelFieldCollation fieldCollation = exchangeRel.getCollation().getFieldCollations().get(index);
expressions[index] = HiveOpConverterUtils.convertToExprNode(exchangeRel.getKeys().get(index),
exchangeRel.getInput(), inputOpAf.tabAlias, inputOpAf.vcolsInCalcite);

order.append(DirectionUtils.codeToSign(DirectionUtils.directionToCode(fieldCollation.getDirection())));
nullOrder.append(NullOrdering.fromDirection(fieldCollation.nullDirection).getSign());
}
exchangeRel.setKeyExpressions(expressions);

Operator<?> inputOp = inputOpAf.inputs.get(0);
List<String> keepColumns = new ArrayList<>();
// final ImmutableBitSet sortColsPos = sortColsPosBuilder.build();
// final ImmutableBitSet sortOutputColsPos = sortOutputColsPosBuilder.build();
final List<ColumnInfo> inputSchema = inputOp.getSchema().getSignature();
for (int pos=0; pos<inputSchema.size(); pos++) {
// if ((sortColsPos.get(pos) && sortOutputColsPos.get(pos)) ||
// (!sortColsPos.get(pos) && !sortOutputColsPos.get(pos))) {
keepColumns.add(inputSchema.get(pos).getInternalName());
// }
for (ColumnInfo columnInfo : inputSchema) {
keepColumns.add(columnInfo.getInternalName());
}

Operator<?> resultOp = HiveOpConverterUtils.genReduceSinkAndBacktrackSelect(
inputOpAf.inputs.get(0), expressions, -1, partitionKeyList, "", "",
inputOpAf.inputs.get(0), expressions, -1, partitionKeyList, order.toString(), nullOrder.toString(),
-1, Operation.NOT_ACID, hiveOpConverter.getHiveConf(), keepColumns);

return new OpAttr(tabAlias, inputOpAf.vcolsInCalcite, resultOp);
Expand Down

0 comments on commit cd816a0

Please sign in to comment.