Skip to content

Commit

Permalink
efficient shuffle dag
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed May 4, 2023
1 parent a8cb813 commit 8c55eba
Show file tree
Hide file tree
Showing 7 changed files with 526 additions and 15 deletions.
8 changes: 6 additions & 2 deletions src/Interpreters/ConcurrentHashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ bool ConcurrentHashJoin::addJoinedBlock(const Block & block, bool check_limits)
if (!inner_join->addJoinedBlock(block, false))
return false;
if (check_limits)
return shared_context->size_limit_per_clone.check(getTotalRowCount(), getTotalByteCount(), "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
{
auto total_rows = getTotalRowCount();
auto total_bytes = getTotalByteCount();
return shared_context->size_limit_per_clone.check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}
return true;
}

Expand All @@ -67,7 +71,7 @@ const Block & ConcurrentHashJoin::getTotals() const

size_t ConcurrentHashJoin::getTotalRowCount() const
{
return inner_join->getTotalByteCount();
return inner_join->getTotalRowCount();
}

size_t ConcurrentHashJoin::getTotalByteCount() const
Expand Down
101 changes: 100 additions & 1 deletion src/Processors/QueryPlan/InnerShuffleStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,15 @@ void InnerShuffleStep::transformPipeline(QueryPipelineBuilder & pipeline, const
// One input port will have n output ports.
// The result blocks are mark by hash id (0 <= id < num_streams), and are delivered into
// different output ports.
size_t num_streams = pipeline.getNumStreams();
/*
size_t num_streams = toPowerOfTwo(static_cast<UInt32>(pipeline.getNumStreams()));
assert(num_streams > 1);
if (num_streams != pipeline.getNumStreams())
{
pipeline.resize(num_streams);
}
*/
size_t num_streams = pipeline.getNumStreams();
auto add_scatter_transform = [&](OutputPortRawPtrs outports)
{
Processors scatters;
Expand Down Expand Up @@ -105,4 +112,96 @@ void InnerShuffleStep::updateOutputStream()
input_streams.front().header,
getDataStreamTraits());
}
InnerShuffleStepV2::InnerShuffleStepV2(const DataStream & input_stream_, const std::vector<String> & hash_columns_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits(input_stream_))
, hash_columns(hash_columns_)
{
}

void InnerShuffleStepV2::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*settings*/)
{
const auto & header = pipeline.getHeader();
std::vector<size_t> keys;
keys.reserve(hash_columns.size());
for (const auto & key_name : hash_columns)
{
keys.push_back(header.getPositionByName(key_name));
}
OutputPortRawPtrs current_outports;

size_t dispatchers_num = 2;
size_t num_streams = pipeline.getNumStreams();
assert(num_streams % 2 == 0);
auto add_scatter_transform = [&](OutputPortRawPtrs outports)
{
Processors scatters;
Processors dispatchers;
std::vector<InputPort *> dispatcher_input_ports;

if (outports.size() != num_streams)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR, "The output ports size is expected to be {}, but got {}", num_streams, outports.size());
}
for (size_t i = 0; i < dispatchers_num; ++i)
{
dispatchers.push_back(std::make_shared<InnerShuffleDispatchTransform>(num_streams/dispatchers_num, num_streams, header));
for (auto & port : dispatchers.back()->getInputs())
{
dispatcher_input_ports.push_back(&port);
}
}

for (auto & outport : outports)
{
auto scatter = std::make_shared<InnerShuffleScatterTransformV2>(num_streams, header, keys);
connect(*outport, scatter->getInputs().front());
scatters.push_back(scatter);
}
for (size_t i = 0; i < num_streams; ++i)
{
connect(scatters[i]->getOutputs().front(), *dispatcher_input_ports[i]);
}
for (auto & dispatcher : dispatchers)
{
scatters.push_back(dispatcher);
}
return scatters;
};
pipeline.transform(add_scatter_transform);

auto add_gather_transform = [&](OutputPortRawPtrs outports)
{
Processors gathers;
assert(outports.size() == num_streams * num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
OutputPortRawPtrs gather_upstream_outports;
auto gather = std::make_shared<InnerShuffleGatherTransformV2>(header, dispatchers_num);
gathers.push_back(gather);
auto & gather_inputs = gather->getInputs();
for (size_t j = 0; j < dispatchers_num; ++j)
{
gather_upstream_outports.push_back(outports[j * dispatchers_num + i]);
}
auto oiter = gather_upstream_outports.begin();
auto iiter = gather_inputs.begin();
for (; oiter != gather_upstream_outports.end(); oiter++, iiter++)
{
connect(**oiter, *iiter);
}
}
return gathers;
};
pipeline.transform(add_gather_transform);

}

void InnerShuffleStepV2::updateOutputStream()
{
output_stream = createOutputStream(
input_streams.front(),
input_streams.front().header,
getDataStreamTraits());
}
}
16 changes: 16 additions & 0 deletions src/Processors/QueryPlan/InnerShuffleStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,20 @@ class InnerShuffleStep : public ITransformingStep
void updateOutputStream() override;

};

class InnerShuffleStepV2 : public ITransformingStep
{
public:
explicit InnerShuffleStepV2(const DataStream & input_stream_, const std::vector<String> & hash_columns_);
~InnerShuffleStepV2() override = default;

String getName() const override { return "InnerShuffle"; }
// The shuffle buckets size is equal to pipeline's num_streams
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override;

private:
std::vector<String> hash_columns; // columns' name to build the hash key

void updateOutputStream() override;
};
}
Loading

0 comments on commit 8c55eba

Please sign in to comment.