Skip to content

Commit

Permalink
clean deprecated codes
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed May 4, 2023
1 parent 8c55eba commit 5243326
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 465 deletions.
121 changes: 35 additions & 86 deletions src/Processors/QueryPlan/InnerShuffleStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ InnerShuffleStep::InnerShuffleStep(const DataStream & input_stream_, const std::
, hash_columns(hash_columns_)
{
}

/**
* 1. InnerShuffleScatterTransform scatter each input block into num_streams blocks. The join keys are
* used as the hash keys. num_streams should be a number of power of 2.
* 2. To avoid createting too many edges between scatter and gather processors. we make a small set of
* InnerShuffleDispatchTransform. It collect split chunks from InnerShuffleScatterTransform and
* dispatch them into different output ports.
* 3. InnerShuffleGatherTransform gather data from InnerShuffleDispatchTransforms and merge them into
* one outport.
*/
void InnerShuffleStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*settings*/)
{
const auto & header = pipeline.getHeader();
Expand All @@ -47,91 +55,25 @@ void InnerShuffleStep::transformPipeline(QueryPipelineBuilder & pipeline, const
}
OutputPortRawPtrs current_outports;

// Split one block into n blocks by hash function, n is equal to num_streams.
// One input port will have n output ports.
// The result blocks are mark by hash id (0 <= id < num_streams), and are delivered into
// different output ports.
/*
size_t num_streams = toPowerOfTwo(static_cast<UInt32>(pipeline.getNumStreams()));
assert(num_streams > 1);
if (num_streams != pipeline.getNumStreams())
{
pipeline.resize(num_streams);
}
*/
size_t num_streams = pipeline.getNumStreams();
auto add_scatter_transform = [&](OutputPortRawPtrs outports)
if (num_streams != alignStreamsNum(static_cast<UInt32>(num_streams)))
{
Processors scatters;
if (outports.size() != num_streams)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR, "The output ports size is expected to be {}, but got {}", num_streams, outports.size());
}
for (auto & outport : outports)
{
auto scatter = std::make_shared<InnerShuffleScatterTransform>(num_streams, header, keys);
connect(*outport, scatter->getInputs().front());
scatters.push_back(scatter);
}
return scatters;
};
pipeline.transform(add_scatter_transform);
throw Exception(
ErrorCodes::LOGICAL_ERROR, "The num_streams {} is not a power of 2", num_streams);
}

// Gather the blocks from the upstream output porst marked with the same id.
auto add_gather_transform = [&](OutputPortRawPtrs outports)
// should not be a high overhead operation, small number is OK.
size_t max_dispatcher_num = 8;
size_t dispatchers_num = num_streams/2;
if (dispatchers_num > max_dispatcher_num)
{
Processors gathers;
assert(outports.size() == num_streams * num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
OutputPortRawPtrs gather_upstream_outports;
auto gather = std::make_shared<InnerShuffleGatherTransform>(header, num_streams);
gathers.push_back(gather);
auto & gather_inputs = gather->getInputs();
for (size_t j = 0; j < num_streams; ++j)
{
gather_upstream_outports.push_back(outports[j * num_streams + i]);
}
auto oiter = gather_upstream_outports.begin();
auto iiter = gather_inputs.begin();
for (; oiter != gather_upstream_outports.end(); oiter++, iiter++)
{
connect(**oiter, *iiter);
}
}
return gathers;
};
pipeline.transform(add_gather_transform);
}

void InnerShuffleStep::updateOutputStream()
{
output_stream = createOutputStream(
input_streams.front(),
input_streams.front().header,
getDataStreamTraits());
}
InnerShuffleStepV2::InnerShuffleStepV2(const DataStream & input_stream_, const std::vector<String> & hash_columns_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits(input_stream_))
, hash_columns(hash_columns_)
{
}

void InnerShuffleStepV2::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*settings*/)
{
const auto & header = pipeline.getHeader();
std::vector<size_t> keys;
keys.reserve(hash_columns.size());
for (const auto & key_name : hash_columns)
dispatchers_num = max_dispatcher_num;
}
if (!dispatchers_num)
{
keys.push_back(header.getPositionByName(key_name));
dispatchers_num = 1;
}
OutputPortRawPtrs current_outports;

size_t dispatchers_num = 2;
size_t num_streams = pipeline.getNumStreams();
assert(num_streams % 2 == 0);
auto add_scatter_transform = [&](OutputPortRawPtrs outports)
{
Processors scatters;
Expand All @@ -154,7 +96,7 @@ void InnerShuffleStepV2::transformPipeline(QueryPipelineBuilder & pipeline, cons

for (auto & outport : outports)
{
auto scatter = std::make_shared<InnerShuffleScatterTransformV2>(num_streams, header, keys);
auto scatter = std::make_shared<InnerShuffleScatterTransform>(num_streams, header, keys);
connect(*outport, scatter->getInputs().front());
scatters.push_back(scatter);
}
Expand All @@ -169,26 +111,27 @@ void InnerShuffleStepV2::transformPipeline(QueryPipelineBuilder & pipeline, cons
return scatters;
};
pipeline.transform(add_scatter_transform);

auto add_gather_transform = [&](OutputPortRawPtrs outports)
{
Processors gathers;
assert(outports.size() == num_streams * num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
OutputPortRawPtrs gather_upstream_outports;
auto gather = std::make_shared<InnerShuffleGatherTransformV2>(header, dispatchers_num);
auto gather = std::make_shared<InnerShuffleGatherTransform>(header, dispatchers_num);
gathers.push_back(gather);
auto & gather_inputs = gather->getInputs();
for (size_t j = 0; j < dispatchers_num; ++j)
{
gather_upstream_outports.push_back(outports[j * dispatchers_num + i]);
gather_upstream_outports.push_back(outports[j * num_streams + i]);
}
auto oiter = gather_upstream_outports.begin();
auto iiter = gather_inputs.begin();
for (; oiter != gather_upstream_outports.end(); oiter++, iiter++)
for (; oiter != gather_upstream_outports.end();)
{
connect(**oiter, *iiter);
oiter++;
iiter++;
}
}
return gathers;
Expand All @@ -197,11 +140,17 @@ void InnerShuffleStepV2::transformPipeline(QueryPipelineBuilder & pipeline, cons

}

void InnerShuffleStepV2::updateOutputStream()
void InnerShuffleStep::updateOutputStream()
{
output_stream = createOutputStream(
input_streams.front(),
input_streams.front().header,
getDataStreamTraits());
}
UInt32 InnerShuffleStep::alignStreamsNum(UInt32 n)
{
if (n <= 1)
return 1;
return static_cast<UInt32>(1) << (32 - std::countl_zero(n - 1));
}
}
19 changes: 1 addition & 18 deletions src/Processors/QueryPlan/InnerShuffleStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,7 @@ class InnerShuffleStep : public ITransformingStep
String getName() const override { return "InnerShuffle"; }
// The shuffle buckets size is equal to pipeline's num_streams
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override;

private:
std::vector<String> hash_columns; // columns' name to build the hash key

void updateOutputStream() override;

};

class InnerShuffleStepV2 : public ITransformingStep
{
public:
explicit InnerShuffleStepV2(const DataStream & input_stream_, const std::vector<String> & hash_columns_);
~InnerShuffleStepV2() override = default;

String getName() const override { return "InnerShuffle"; }
// The shuffle buckets size is equal to pipeline's num_streams
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override;

static UInt32 alignStreamsNum(UInt32 n);
private:
std::vector<String> hash_columns; // columns' name to build the hash key

Expand Down
Loading

0 comments on commit 5243326

Please sign in to comment.