Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

[SQL-DS-CACHE-124] [POAE7-1133] Fix failed query in TPCDS #125

Merged
merged 6 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,13 @@ private static JsonNode constructTree(Expression expr, JsonNode rootNode) {
((ObjectNode) tmpNode).put("dataType", tmpExpr.dataType().toString());
((ObjectNode) tmpNode).put("value", tmpExpr.value().toString());
return tmpNode;
} else if (expr instanceof UnscaledValue) {
// TODO: cast to Long?
return constructTree(exprs.get(0), tmpNode);

} else {
//TODO: will include other type?
throw new UnsupportedOperationException("should not reach here.");
throw new UnsupportedOperationException("should not reach here. Expr: " + expr.toString());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ object AggUtils {
case filter: Filter =>
filter.child match {
case l@LogicalRelation (fsRelation: HadoopFsRelation, _, _, _) =>
if (!fsRelation.resultExpr.isEmpty) {

if (!fsRelation.resultExpr.isEmpty && fsRelation.resultExpr.get.size != 0) {

// judge whether all filter could pushdown here, if not, we will NOT
// do agg push down(don't ignore partial agg)
val filterExpressions = splitConjunctivePredicates(filter.condition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ object DataSourceStrategy {
*/
protected[sql] def translateAggregate(groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[AggregateExpression]): String = {
if (groupingExpressions.size > 0 && aggregateExpressions.size == 0) return ""
AggregateConvertor.toJsonString(
scala.collection.JavaConverters.seqAsJavaList(groupingExpressions),
scala.collection.JavaConverters.seqAsJavaList(aggregateExpressions))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,15 @@ object FileSourceStrategy extends Strategy with Logging {
// but will keep agg info in fsRelation. Ideally, following filter/ projection
// will apply FileSourceStrategy very soon.
if (SparkSession.getActiveSession.get.conf.get(APE_AGGREGATION_PUSHDOWN_ENABLED, false)) {
fsRelation.groupExpr = Some(groupingExpr)
fsRelation.resultExpr = Some(aggExpr
.map(expr => expr.asInstanceOf[AggregateExpression]))

var withoutDistict = true
aggExpr.map(expr => if (expr.isDistinct) withoutDistict = false)

if (withoutDistict) {
fsRelation.groupExpr = Some(groupingExpr)
fsRelation.resultExpr = Some(aggExpr)
}

}
Seq()

Expand Down Expand Up @@ -241,16 +247,23 @@ object FileSourceStrategy extends Strategy with Logging {
val partialResultExpressions =
groupingAttributes ++
partialAggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
val aggregateExpression =

val aggregateExpression = {
if (afterScanFilters.isEmpty) DataSourceStrategy.translateAggregate(gExpression, aggExpressions)
else ""
}
logInfo("agg pd info: " + gExpression.mkString +
" " + aggExpressions.mkString)

// set null to avoid influence later node/plan.
fsRelation.groupExpr = None
fsRelation.resultExpr = None
val onlyGroup = (aggExpressions.size == 0) && (gExpression.size > 0)

val outAttributes: Seq[Attribute] =
if (!partialResultExpressions.isEmpty && afterScanFilters.isEmpty) partialResultExpressions
else outputAttributes
if (!onlyGroup && !partialResultExpressions.isEmpty && afterScanFilters.isEmpty) {
partialResultExpressions
} else outputAttributes

val schema = outAttributes.toStructType
logInfo(s"Output Data Schema after agg pd: ${schema.simpleString}")
Expand Down
8 changes: 4 additions & 4 deletions oap-ape/ape-native/src/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,10 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr
int rowsAfterFilter = doFilter(rowsToRead, buffersPtr, nullsPtr);
ARROW_LOG(DEBUG) << "after filter " << rowsAfterFilter;

rowsRet = doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr);
int tmp = doAggregation(rowsAfterFilter, map, keys, results, buffersPtr, nullsPtr);
// if the last batch are empty after filter, it will return 0 regard less of the
// group num
if (tmp != 0) rowsRet = tmp;
}

if (aggExprs.size()) {
Expand Down Expand Up @@ -370,9 +373,6 @@ int Reader::doAggregation(int batchSize, ApeHashMap& map, std::vector<Key>& keys
std::dynamic_pointer_cast<RootAggExpression>(agg)->getResult(
results[i], keys.size(), indexes);
} else {
if (results[i].nullVector->size() == 0) {
results[i].nullVector->resize(1);
}
std::dynamic_pointer_cast<RootAggExpression>(agg)->getResult(results[i]);
}
} else {
Expand Down
9 changes: 5 additions & 4 deletions oap-ape/ape-native/src/utils/AggExpression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ int AggExpression::ExecuteWithParam(int batchSize,

void Count::getResultInternal(DecimalVector& result) {
result.type = ResultType::LongType;
if (result.nullVector->size() == 0) {
result.nullVector->resize(1);
result.nullVector->at(0) = 1;
result.nullVector->resize(1);
result.nullVector->at(0) = 1;
if (result.data.size() == 0) {
result.data.push_back(arrow::BasicDecimal128(0));
}
if (typeid(*child) == typeid(LiteralExpression)) { // for count(*) or count(1)
result.data[0] += arrow::BasicDecimal128(batchSize_);
Expand All @@ -84,7 +85,7 @@ void Count::getResultInternal(DecimalVector& result) {
auto tmp = DecimalVector();
child->getResult(tmp);
for (int i = 0; i < tmp.data.size(); i++) {
if (tmp.nullVector->at(i)) result.data[0] += 1;
if (tmp.nullVector->at(i)) result.data[0] += arrow::BasicDecimal128(1);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions oap-ape/ape-native/src/utils/ApeDecimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <arrow/type.h>
#include <arrow/util/basic_decimal.h>
#include <arrow/util/decimal.h>
#include <cstdint>

namespace ape {
Expand Down
5 changes: 4 additions & 1 deletion oap-ape/ape-native/src/utils/DumpUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ class DumpUtils {
break;
}
case ResultType::DoubleType: {
// TODO: convert
// TODO: this is just for UnscaledValue case, if the data type is Double, this
// will not work
arrow::Decimal128 tmp(result.data[i]);
*((double*)bufferAddr + i) = tmp.ToDouble(0);
break;
}
case ResultType::Decimal64Type: {
Expand Down