Skip to content

Commit

Permalink
feature(hashjoin): Add fast path to list join result
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Nov 17, 2024
1 parent 99979c4 commit 9ed88ab
Show file tree
Hide file tree
Showing 11 changed files with 572 additions and 94 deletions.
2 changes: 2 additions & 0 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ bool GroupingSet::getOutputWithSpill(
false,
false,
false,
false,
&pool_);

initializeAggregates(aggregates_, *mergeRows_, false);
Expand Down Expand Up @@ -1282,6 +1283,7 @@ void GroupingSet::abandonPartialAggregation() {
false,
false,
false,
false,
&pool_);
initializeAggregates(aggregates_, *intermediateRows_, true);
table_.reset();
Expand Down
64 changes: 62 additions & 2 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,41 @@ void HashProbe::maybeSetupSpillInputReader(
inputSpillPartitionSet_.erase(iter);
}

namespace {
bool useEstimatedRowSize(
uint64_t& estimatedRowSize,
const std::vector<RowColumnStats>& varColumnsStats,
uint64_t totalFixedColumnsBytes,
double toleranceRatio) {
uint64_t totalAvgBytes{totalFixedColumnsBytes};
uint64_t totalMaxBytes{totalFixedColumnsBytes};
for (const auto& stats : varColumnsStats) {
totalAvgBytes += stats.avgBytes();
totalMaxBytes += stats.maxBytes();
}
if (totalMaxBytes / totalAvgBytes < toleranceRatio) {
estimatedRowSize = std::min(totalMaxBytes / 2, totalAvgBytes);
return true;
}
return false;
}
}

std::optional<RowColumnStats> HashProbe::columnStats(
int32_t columnIndex) const {
std::vector<RowColumnStats> columnStats;
const auto rowContainers = table_->allRows();
for (const auto* rowContainer : rowContainers) {
VELOX_CHECK_NOT_NULL(rowContainer);
auto statsOpt = rowContainer->columnStats(columnIndex);
if (!statsOpt.has_value()) {
return std::nullopt;
}
columnStats.push_back(statsOpt.value());
}
return RowColumnStats::mergeStats(columnStats);
}

void HashProbe::initializeResultIter() {
VELOX_CHECK_NOT_NULL(table_);
if (resultIter_ != nullptr) {
Expand All @@ -312,8 +347,34 @@ void HashProbe::initializeResultIter() {
varSizeListColumns.push_back(column);
}
}

bool columnStatsValid{true};
std::vector<RowColumnStats> varSizeListColumnsStats;
varSizeListColumnsStats.reserve(varSizeListColumns.size());
for (uint32_t i = 0; i < varSizeListColumns.size(); i++) {
auto statsOpt = columnStats(varSizeListColumns[i]);
if (!statsOpt.has_value()) {
columnStatsValid = false;
varSizeListColumnsStats.clear();
break;
}
varSizeListColumnsStats.push_back(statsOpt.value());
}

bool useEstimatedRowSizeFlag{false};
uint64_t estimatedRowSize{0};
if (columnStatsValid) {
useEstimatedRowSizeFlag = useEstimatedRowSize(
estimatedRowSize,
varSizeListColumnsStats,
fixedSizeListColumnsSizeSum,
5);
}

resultIter_ = std::make_unique<BaseHashTable::JoinResultIterator>(
std::move(varSizeListColumns), fixedSizeListColumnsSizeSum);
std::move(varSizeListColumns),
fixedSizeListColumnsSizeSum,
useEstimatedRowSizeFlag ? std::optional(estimatedRowSize) : std::nullopt);
}

void HashProbe::asyncWaitForHashTable() {
Expand Down Expand Up @@ -1942,5 +2003,4 @@ void HashProbe::clearBuffers() {
operatorCtx_->execCtx()->vectorPool()->clear();
filter_->clearCache();
}

} // namespace facebook::velox::exec
5 changes: 5 additions & 0 deletions velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ class HashProbe : public Operator {
// memory reclamation or operator close.
void clearBuffers();

// Returns the aggregated column stats at 'columnIndex' of 'table_'. Returns
// nullopt if the column stats is invalidated.
// NOTE: Calling this on a fixed size column will also return nullopt.
std::optional<RowColumnStats> columnStats(int32_t columnIndex) const;

// TODO: Define batch size as bytes based on RowContainer row sizes.
const vector_size_t outputBatchSize_;

Expand Down
33 changes: 20 additions & 13 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ HashTable<ignoreNullKeys>::HashTable(
isJoinBuild,
hasProbedFlag,
hashMode_ != HashMode::kHash,
// TODO(jtan6): Change the bool to true for join case.
false,
pool);
nextOffset_ = rows_->nextOffset();
}
Expand Down Expand Up @@ -1826,10 +1828,9 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
uint64_t maxBytes) {
VELOX_CHECK_LE(inputRows.size(), hits.size());

if (iter.varSizeListColumns.empty() && !hasDuplicates_) {
// When there is no duplicates, and no variable length columns are selected
// to be projected, we are able to calculate fixed length columns total size
// directly and go through fast path.
if (iter.estimatedRowSize.has_value() && !hasDuplicates_) {
// When there is no duplicates, and row size is estimable, we are able to
// go through fast path.
return listJoinResultsFastPath(
iter, includeMisses, inputRows, hits, maxBytes);
}
Expand Down Expand Up @@ -1859,9 +1860,10 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
hits[numOut] = hit;
numOut++;
iter.lastRowIndex++;
totalBytes +=
(joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) +
iter.fixedSizeListColumnsSizeSum);
totalBytes += iter.estimatedRowSize.has_value()
? iter.estimatedRowSize.value()
: (joinProjectedVarColumnsSize(iter.varSizeListColumns, hit) +
iter.fixedSizeListColumnsSizeSum);
} else {
const auto numRows = rows->size();
auto num =
Expand All @@ -1873,11 +1875,16 @@ int32_t HashTable<ignoreNullKeys>::listJoinResults(
num * sizeof(char*));
iter.lastDuplicateRowIndex += num;
numOut += num;
for (const auto* dupRow : *rows) {
totalBytes +=
joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow);
if (iter.estimatedRowSize.has_value()) {
totalBytes += iter.estimatedRowSize.value() * numRows;
} else {
for (const auto* dupRow : *rows) {
totalBytes +=
joinProjectedVarColumnsSize(iter.varSizeListColumns, dupRow) +
iter.fixedSizeListColumnsSizeSum;
}
totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows);
}
totalBytes += (iter.fixedSizeListColumnsSizeSum * numRows);
if (iter.lastDuplicateRowIndex >= numRows) {
iter.lastDuplicateRowIndex = 0;
iter.lastRowIndex++;
Expand All @@ -1900,8 +1907,8 @@ int32_t HashTable<ignoreNullKeys>::listJoinResultsFastPath(
int32_t numOut = 0;
const auto maxOut = std::min(
static_cast<uint64_t>(inputRows.size()),
(iter.fixedSizeListColumnsSizeSum != 0
? maxBytes / iter.fixedSizeListColumnsSizeSum
(iter.estimatedRowSize.value() != 0
? maxBytes / iter.estimatedRowSize.value()
: std::numeric_limits<uint64_t>::max()));
int32_t i = iter.lastRowIndex;
const auto numRows = iter.rows->size();
Expand Down
30 changes: 16 additions & 14 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ class BaseHashTable {
struct JoinResultIterator {
JoinResultIterator(
std::vector<vector_size_t>&& _varSizeListColumns,
uint64_t _fixedSizeListColumnsSizeSum)
uint64_t _fixedSizeListColumnsSizeSum,
std::optional<uint64_t> _estimatedRowSize)
: varSizeListColumns(std::move(_varSizeListColumns)),
fixedSizeListColumnsSizeSum(_fixedSizeListColumnsSizeSum) {}
fixedSizeListColumnsSizeSum(_fixedSizeListColumnsSizeSum),
estimatedRowSize(_estimatedRowSize) {}

void reset(const HashLookup& lookup) {
rows = &lookup.rows;
Expand All @@ -157,6 +159,7 @@ class BaseHashTable {
return !rows || lastRowIndex == rows->size();
}

const std::optional<uint64_t> estimatedRowSize;
/// The indexes of the build side projected columns that are variable sized.
const std::vector<vector_size_t> varSizeListColumns;
/// The per row total bytes of the build side projected columns that are
Expand Down Expand Up @@ -635,18 +638,6 @@ class HashTable : public BaseHashTable {
/// purpose.
void checkConsistency() const;

auto& testingOtherTables() const {
return otherTables_;
}

uint64_t testingRehashSize() const {
return rehashSize();
}

char** testingTable() const {
return table_;
}

void extractColumn(
folly::Range<char* const*> rows,
int32_t columnIndex,
Expand All @@ -659,6 +650,17 @@ class HashTable : public BaseHashTable {
result);
}

auto& testingOtherTables() const {
return otherTables_;
}

uint64_t testingRehashSize() const {
return rehashSize();
}

char** testingTable() const {
return table_;
}
private:
// Enables debug stats for collisions for debug build.
#ifdef NDEBUG
Expand Down
Loading

0 comments on commit 9ed88ab

Please sign in to comment.