Skip to content

Commit

Permalink
Propagate driver context in parallel join build
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Oct 23, 2024
1 parent 5d8823f commit a16d444
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
10 changes: 0 additions & 10 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,16 +745,6 @@ bool HashBuild::finishHashBuild() {
// https://github.com/facebookincubator/velox/issues/3567 is fixed.
CpuWallTiming timing;
{
// If there is a chance the join build is parallel, we suspend the driver
// while the hash table is being built. This is because off-driver thread
// memory allocations inside parallel join build might trigger memory
// arbitration.
std::unique_ptr<SuspendedSection> suspendedSection;
if (allowParallelJoinBuild) {
suspendedSection = std::make_unique<SuspendedSection>(
driverThreadContext()->driverCtx.driver);
}

CpuWallTimer cpuWallTimer{timing};
table_->prepareJoinTable(
std::move(otherTables),
Expand Down
19 changes: 17 additions & 2 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,7 @@ void HashTable<ignoreNullKeys>::parallelJoinBuild() {
rowPartitions.push_back(table->rows()->createRowPartitions(*rows_->pool()));
}

const auto* driverThreadCtx = driverThreadContext();
// The parallel table partitioning step.
for (auto i = 0; i < numPartitions; ++i) {
auto* table = getTable(i);
Expand All @@ -937,7 +938,14 @@ void HashTable<ignoreNullKeys>::parallelJoinBuild() {
return std::make_unique<bool>(true);
}));
VELOX_CHECK(!partitionSteps.empty());
buildExecutor_->add([step = partitionSteps.back()]() { step->prepare(); });
buildExecutor_->add([driverThreadCtx, step = partitionSteps.back()]() {
std::unique_ptr<ScopedDriverThreadContext> scopedDriverThreadContext =
driverThreadCtx == nullptr
? nullptr
: std::make_unique<ScopedDriverThreadContext>(
driverThreadCtx->driverCtx);
step->prepare();
});
}

std::exception_ptr error;
Expand All @@ -961,7 +969,14 @@ void HashTable<ignoreNullKeys>::parallelJoinBuild() {
return std::make_unique<bool>(true);
}));
VELOX_CHECK(!buildSteps.empty());
buildExecutor_->add([step = buildSteps.back()]() { step->prepare(); });
buildExecutor_->add([driverThreadCtx, step = buildSteps.back()]() {
std::unique_ptr<ScopedDriverThreadContext> scopedDriverThreadContext =
driverThreadCtx == nullptr
? nullptr
: std::make_unique<ScopedDriverThreadContext>(
driverThreadCtx->driverCtx);
step->prepare();
});
}
syncWorkItems(buildSteps, error, offThreadBuildTiming_);

Expand Down

0 comments on commit a16d444

Please sign in to comment.