diff --git a/src/Processors/QueryPlan/InnerShuffleStep.cpp b/src/Processors/QueryPlan/InnerShuffleStep.cpp index 997a33af8e9f..0c7d43af1981 100644 --- a/src/Processors/QueryPlan/InnerShuffleStep.cpp +++ b/src/Processors/QueryPlan/InnerShuffleStep.cpp @@ -35,7 +35,15 @@ InnerShuffleStep::InnerShuffleStep(const DataStream & input_stream_, const std:: , hash_columns(hash_columns_) { } - +/** + * 1. InnerShuffleScatterTransform scatter each input block into num_streams blocks. The join keys are + * used as the hash keys. num_streams should be a number of power of 2. + * 2. To avoid createting too many edges between scatter and gather processors. we make a small set of + * InnerShuffleDispatchTransform. It collect split chunks from InnerShuffleScatterTransform and + * dispatch them into different output ports. + * 3. InnerShuffleGatherTransform gather data from InnerShuffleDispatchTransforms and merge them into + * one outport. + */ void InnerShuffleStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*settings*/) { const auto & header = pipeline.getHeader(); @@ -47,91 +55,25 @@ void InnerShuffleStep::transformPipeline(QueryPipelineBuilder & pipeline, const } OutputPortRawPtrs current_outports; - // Split one block into n blocks by hash function, n is equal to num_streams. - // One input port will have n output ports. - // The result blocks are mark by hash id (0 <= id < num_streams), and are delivered into - // different output ports. - /* - size_t num_streams = toPowerOfTwo(static_cast(pipeline.getNumStreams())); - assert(num_streams > 1); - if (num_streams != pipeline.getNumStreams()) - { - pipeline.resize(num_streams); - } - */ size_t num_streams = pipeline.getNumStreams(); - auto add_scatter_transform = [&](OutputPortRawPtrs outports) + if (num_streams != alignStreamsNum(static_cast(num_streams))) { - Processors scatters; - if (outports.size() != num_streams) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, "The output ports size is expected to be {}, but got {}", num_streams, outports.size()); - } - for (auto & outport : outports) - { - auto scatter = std::make_shared(num_streams, header, keys); - connect(*outport, scatter->getInputs().front()); - scatters.push_back(scatter); - } - return scatters; - }; - pipeline.transform(add_scatter_transform); + throw Exception( + ErrorCodes::LOGICAL_ERROR, "The num_streams {} is not a power of 2", num_streams); + } - // Gather the blocks from the upstream output porst marked with the same id. - auto add_gather_transform = [&](OutputPortRawPtrs outports) + // should not be a high overhead operation, small number is OK. + size_t max_dispatcher_num = 8; + size_t dispatchers_num = num_streams/2; + if (dispatchers_num > max_dispatcher_num) { - Processors gathers; - assert(outports.size() == num_streams * num_streams); - for (size_t i = 0; i < num_streams; ++i) - { - OutputPortRawPtrs gather_upstream_outports; - auto gather = std::make_shared(header, num_streams); - gathers.push_back(gather); - auto & gather_inputs = gather->getInputs(); - for (size_t j = 0; j < num_streams; ++j) - { - gather_upstream_outports.push_back(outports[j * num_streams + i]); - } - auto oiter = gather_upstream_outports.begin(); - auto iiter = gather_inputs.begin(); - for (; oiter != gather_upstream_outports.end(); oiter++, iiter++) - { - connect(**oiter, *iiter); - } - } - return gathers; - }; - pipeline.transform(add_gather_transform); -} - -void InnerShuffleStep::updateOutputStream() -{ - output_stream = createOutputStream( - input_streams.front(), - input_streams.front().header, - getDataStreamTraits()); -} -InnerShuffleStepV2::InnerShuffleStepV2(const DataStream & input_stream_, const std::vector & hash_columns_) - : ITransformingStep(input_stream_, input_stream_.header, getTraits(input_stream_)) - , hash_columns(hash_columns_) -{ -} - -void InnerShuffleStepV2::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*settings*/) -{ - const auto & header = pipeline.getHeader(); - std::vector keys; - keys.reserve(hash_columns.size()); - for (const auto & key_name : hash_columns) + dispatchers_num = max_dispatcher_num; + } + if (!dispatchers_num) { - keys.push_back(header.getPositionByName(key_name)); + dispatchers_num = 1; } - OutputPortRawPtrs current_outports; - size_t dispatchers_num = 2; - size_t num_streams = pipeline.getNumStreams(); - assert(num_streams % 2 == 0); auto add_scatter_transform = [&](OutputPortRawPtrs outports) { Processors scatters; @@ -154,7 +96,7 @@ void InnerShuffleStepV2::transformPipeline(QueryPipelineBuilder & pipeline, cons for (auto & outport : outports) { - auto scatter = std::make_shared(num_streams, header, keys); + auto scatter = std::make_shared(num_streams, header, keys); connect(*outport, scatter->getInputs().front()); scatters.push_back(scatter); } @@ -169,26 +111,27 @@ void InnerShuffleStepV2::transformPipeline(QueryPipelineBuilder & pipeline, cons return scatters; }; pipeline.transform(add_scatter_transform); - + auto add_gather_transform = [&](OutputPortRawPtrs outports) { Processors gathers; - assert(outports.size() == num_streams * num_streams); for (size_t i = 0; i < num_streams; ++i) { OutputPortRawPtrs gather_upstream_outports; - auto gather = std::make_shared(header, dispatchers_num); + auto gather = std::make_shared(header, dispatchers_num); gathers.push_back(gather); auto & gather_inputs = gather->getInputs(); for (size_t j = 0; j < dispatchers_num; ++j) { - gather_upstream_outports.push_back(outports[j * dispatchers_num + i]); + gather_upstream_outports.push_back(outports[j * num_streams + i]); } auto oiter = gather_upstream_outports.begin(); auto iiter = gather_inputs.begin(); - for (; oiter != gather_upstream_outports.end(); oiter++, iiter++) + for (; oiter != gather_upstream_outports.end();) { connect(**oiter, *iiter); + oiter++; + iiter++; } } return gathers; @@ -197,11 +140,17 @@ void InnerShuffleStepV2::transformPipeline(QueryPipelineBuilder & pipeline, cons } -void InnerShuffleStepV2::updateOutputStream() +void InnerShuffleStep::updateOutputStream() { output_stream = createOutputStream( input_streams.front(), input_streams.front().header, getDataStreamTraits()); } +UInt32 InnerShuffleStep::alignStreamsNum(UInt32 n) +{ + if (n <= 1) + return 1; + return static_cast(1) << (32 - std::countl_zero(n - 1)); +} } diff --git a/src/Processors/QueryPlan/InnerShuffleStep.h b/src/Processors/QueryPlan/InnerShuffleStep.h index 86eebc040b3f..27220819d459 100644 --- a/src/Processors/QueryPlan/InnerShuffleStep.h +++ b/src/Processors/QueryPlan/InnerShuffleStep.h @@ -18,24 +18,7 @@ class InnerShuffleStep : public ITransformingStep String getName() const override { return "InnerShuffle"; } // The shuffle buckets size is equal to pipeline's num_streams void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override; - -private: - std::vector hash_columns; // columns' name to build the hash key - - void updateOutputStream() override; - -}; - -class InnerShuffleStepV2 : public ITransformingStep -{ -public: - explicit InnerShuffleStepV2(const DataStream & input_stream_, const std::vector & hash_columns_); - ~InnerShuffleStepV2() override = default; - - String getName() const override { return "InnerShuffle"; } - // The shuffle buckets size is equal to pipeline's num_streams - void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override; - + static UInt32 alignStreamsNum(UInt32 n); private: std::vector hash_columns; // columns' name to build the hash key diff --git a/src/Processors/Transforms/InnerShuffleTransform.cpp b/src/Processors/Transforms/InnerShuffleTransform.cpp index ba0b7e8186c1..8d481b4d35cc 100644 --- a/src/Processors/Transforms/InnerShuffleTransform.cpp +++ b/src/Processors/Transforms/InnerShuffleTransform.cpp @@ -36,7 +36,7 @@ static OutputPorts buildMultiOutputports(size_t num_streams, const Block & heade return outports; } -InnerShuffleScatterTransformV2::InnerShuffleScatterTransformV2(size_t num_streams_, const Block & header_, const std::vector & hash_columns_) +InnerShuffleScatterTransform::InnerShuffleScatterTransform(size_t num_streams_, const Block & header_, const std::vector & hash_columns_) : IProcessor({header_}, {header_}) , num_streams(num_streams_) , header(header_) @@ -44,13 +44,13 @@ InnerShuffleScatterTransformV2::InnerShuffleScatterTransformV2(size_t num_stream {} -IProcessor::Status InnerShuffleScatterTransformV2::prepare() +IProcessor::Status InnerShuffleScatterTransform::prepare() { - auto & outport = outputs.front(); - auto & inport = inputs.front(); - if (outport.isFinished()) + auto & output = outputs.front(); + auto & input = inputs.front(); + if (output.isFinished()) { - inport.close(); + input.close(); return Status::Finished; } if (has_input) @@ -59,35 +59,35 @@ IProcessor::Status InnerShuffleScatterTransformV2::prepare() } if (has_output) { - if(outport.canPush()) + if (output.canPush()) { auto empty_block = header.cloneEmpty(); Chunk chunk(empty_block.getColumns(), 0); auto chunk_info = std::make_shared(); chunk_info->chunks.swap(output_chunks); chunk.setChunkInfo(chunk_info); - outport.push(std::move(chunk)); + output.push(std::move(chunk)); has_output = false; } return Status::PortFull; } - if (inport.isFinished()) + if (input.isFinished()) { - outport.finish(); + output.finish(); return Status::Finished; } - inport.setNeeded(); - if (!inport.hasData()) + input.setNeeded(); + if (!input.hasData()) { return Status::NeedData; } - input_chunk = inport.pull(true); + input_chunk = input.pull(true); has_input = true; return Status::Ready; } -void InnerShuffleScatterTransformV2::work() +void InnerShuffleScatterTransform::work() { if (!has_input) { @@ -127,7 +127,6 @@ void InnerShuffleScatterTransformV2::work() output_chunks.clear(); for (auto & scattered_block : result_blocks) { - LOG_ERROR(&Poco::Logger::get("InnerShuffleScatterTransformV2"), "scattered_block.rows() = {}, num_rows={}, num_streams:{}", scattered_block.rows(), num_rows, num_streams); Chunk chunk(scattered_block.getColumns(), scattered_block.rows()); output_chunks.emplace_back(std::move(chunk)); } @@ -171,13 +170,12 @@ IProcessor::Status InnerShuffleDispatchTransform::prepare() } return Status::Finished; } - + if (has_input) { return Status::Ready; } - - // if (!output_chunks.empty()) [[likely]] + { bool has_chunks_out = false; bool has_pending_chunks = false; @@ -206,15 +204,15 @@ IProcessor::Status InnerShuffleDispatchTransform::prepare() } bool all_input_finished = true; - for (auto & inport : inputs) + for (auto & input : inputs) { - if (inport.isFinished()) + if (input.isFinished()) continue; all_input_finished = false; - inport.setNeeded(); - if (inport.hasData()) + input.setNeeded(); + if (input.hasData()) { - input_chunks.emplace_back(inport.pull(true)); + input_chunks.emplace_back(input.pull(true)); has_input = true; } } @@ -247,21 +245,16 @@ void InnerShuffleDispatchTransform::work() { if (output_chunks[i].empty() || output_chunks[i].back().getNumRows() >= DEFAULT_BLOCK_SIZE) { - LOG_ERROR(&Poco::Logger::get("InnerShuffleDispatchTransform"), "xxx new_chunk cols:{}", chunk.getColumns().size()); Chunk new_chunk(chunk.getColumns(), chunk.getNumRows()); - LOG_ERROR(&Poco::Logger::get("InnerShuffleDispatchTransform"), "xxx 2 new_chunk cols:{}", new_chunk.getColumns().size()); output_chunks[i].push_back(std::move(new_chunk)); - LOG_ERROR(&Poco::Logger::get("InnerShuffleDispatchTransform"), "xxx 1 new_chunk cols:{}", output_chunks[i].back().getColumns().size()); } else { auto & last_chunk = output_chunks[i].back(); auto src_cols = chunk.getColumns(); auto dst_cols = last_chunk.mutateColumns(); - LOG_ERROR(&Poco::Logger::get("InnerShuffleDispatchTransform"), "xxxx src cols:{}, dst cols:{}/{}", src_cols.size(), dst_cols.size(), last_chunk.getColumns().size()); for (size_t n = 0; n < src_cols.size(); ++n) { - LOG_ERROR(&Poco::Logger::get("InnerShuffleDispatchTransform"), "xxx src_cols[{}] = {}, dst_cols[{}]:{}", n, fmt::ptr(src_cols[n].get()), n , fmt::ptr(dst_cols[n].get())); dst_cols[n]->insertRangeFrom(*src_cols[n], 0, src_cols[n]->size()); } auto rows = dst_cols[0]->size(); @@ -273,7 +266,7 @@ void InnerShuffleDispatchTransform::work() has_input = false; } -InnerShuffleGatherTransformV2::InnerShuffleGatherTransformV2(const Block & header_, size_t input_num_) +InnerShuffleGatherTransform::InnerShuffleGatherTransform(const Block & header_, size_t input_num_) : IProcessor(buildMultiInputports(input_num_, header_), {header_}) { for (auto & port : inputs) @@ -282,7 +275,7 @@ InnerShuffleGatherTransformV2::InnerShuffleGatherTransformV2(const Block & heade } } -IProcessor::Status InnerShuffleGatherTransformV2::prepare() +IProcessor::Status InnerShuffleGatherTransform::prepare() { if (outputs.front().isFinished()) { @@ -320,7 +313,6 @@ IProcessor::Status InnerShuffleGatherTransformV2::prepare() input_port_iter = (input_port_iter + 1) % inputs.size(); break; } - } input_port_iter = (input_port_iter + 1) % inputs.size(); i += 1; @@ -338,278 +330,9 @@ IProcessor::Status InnerShuffleGatherTransformV2::prepare() return Status::Ready; } -void InnerShuffleGatherTransformV2::work() +void InnerShuffleGatherTransform::work() { has_input = false; has_output = true; } - -InnerShuffleScatterTransform::InnerShuffleScatterTransform(size_t num_streams_, const Block & header_, const std::vector & hash_columns_) - : IProcessor({header_}, buildMultiOutputports(num_streams_, header_)) - , num_streams(num_streams_) - , header(header_) - , hash_columns(hash_columns_) -{ - for (size_t i = 0; i < num_streams; ++i) - { - pending_output_chunks.emplace_back(std::list()); - } -} - -IProcessor::Status InnerShuffleScatterTransform::prepare() -{ - bool all_output_finished = true; - bool has_port_full = false; - auto outport_it = outputs.begin(); - auto outchunk_it = pending_output_chunks.begin(); - for (; outchunk_it != pending_output_chunks.end();) - { - auto & outport = *outport_it; - auto & part_chunks = *outchunk_it; - if (outport.isFinished()) - { - part_chunks.clear(); - } - else - { - all_output_finished = false; - if (!part_chunks.empty()) - { - // No matter can push or not, mark as port full. - has_port_full = true; - if (outport.canPush()) - { - outport.push(std::move(part_chunks.front())); - part_chunks.pop_front(); - } - } - } - outport_it++; - outchunk_it++; - } - if (all_output_finished) - { - for (auto & input : inputs) - input.close(); - return Status::Finished; - } - if (has_port_full) - { - return Status::PortFull; - } - - if (has_input) - { - return Status::Ready; - } - - auto & input = inputs.front(); - if (input.isFinished()) - { - bool has_pending_chunks = false; - auto outport_iter = outputs.begin(); - auto outchunk_iter = pending_output_chunks.begin(); - for (; outchunk_iter != pending_output_chunks.end();) - { - const auto & chunks = *outchunk_iter; - if (chunks.empty() && !outport_iter->isFinished() && !outport_iter->hasData()) - { - outport_iter->finish(); - } - else - { - has_pending_chunks = true; - } - outport_iter++; - outchunk_iter++; - } - if (has_pending_chunks) - return Status::Ready; - return Status::Finished; - } - - input.setNeeded(); - if (!input.hasData()) - return Status::NeedData; - input_chunk = input.pull(true); - has_input = true; - return Status::Ready; -} - -void InnerShuffleScatterTransform::work() -{ - if (!has_input) - { - return; - } - Block block = header.cloneWithColumns(input_chunk.detachColumns()); - size_t num_rows = block.rows(); - WeakHash32 hash(num_rows); - for (const auto col_index : hash_columns) - { - const auto & key_col = block.getByPosition(col_index).column->convertToFullColumnIfConst(); - const auto & key_col_no_lc = recursiveRemoveLowCardinality(recursiveRemoveSparse(key_col)); - key_col_no_lc->updateWeakHash32(hash); - } - - IColumn::Selector selector(num_rows); - const auto & hash_data = hash.getData(); - for (size_t i = 0; i < num_rows; ++i) - { - selector[i] = hash_data[i] & (num_streams - 1); - } - - Blocks result_blocks; - for (size_t i = 0; i < num_streams; ++i) - { - result_blocks.emplace_back(header.cloneEmpty()); - } - - for (size_t i = 0, num_cols = header.columns(); i < num_cols; ++i) - { - auto shuffled_columms = block.getByPosition(i).column->scatter(num_streams, selector); - for (size_t block_index = 0; block_index < num_streams; ++block_index) - { - result_blocks[block_index].getByPosition(i).column = std::move(shuffled_columms[block_index]); - } - } - for (size_t i = 0; i < result_blocks.size(); ++i) - { - pending_output_chunks[i].emplace_back(result_blocks[i].getColumns(), result_blocks[i].rows()); - } - - has_output = true; - has_input = false; -} - -InnerShuffleGatherTransform::InnerShuffleGatherTransform(const Block & header_, size_t inputs_num_) - : IProcessor(buildMultiInputports(inputs_num_, header_), {header_}) -{ - for (auto & port : inputs) - { - running_inputs.emplace_back(&port); - } -} - -IProcessor::Status InnerShuffleGatherTransform::prepare() -{ - auto & output = outputs.front(); - if (output.isFinished()) - { - for (auto * input : running_inputs) - { - if (!input->isFinished()) - input->close(); - } - return Status::Finished; - } - - if (!output.canPush()) - { - return Status::PortFull; - } - - if (has_input) - { - return Status::Ready; - } - - if (has_output) - { - output.push(std::move(output_chunk)); - has_output = false; - return Status::PortFull; - } - - bool all_inputs_closed = true; - for (auto it = running_inputs.begin(); it != running_inputs.end();) - { - auto * input = *it; - if (input->isFinished()) - { - running_inputs.erase(it++); - continue; - } - it++; - all_inputs_closed = false; - input->setNeeded(); - if (!input->hasData()) - { - continue; - } - input_chunks.emplace_back(input->pull(true)); - input->setNeeded(); - pending_rows += input_chunks.back().getNumRows(); - pending_chunks += 1; - has_input = true; - } - - if (has_input) - return Status::Ready; - - if (all_inputs_closed) [[unlikely]] - { - if (pending_rows) - { - has_input = true; - return Status::Ready; - } - else - { - for (auto & port : outputs) - { - if (!port.isFinished()) - { - port.finish(); - } - } - return Status::Finished; - } - } - - if (!has_input) - return Status::NeedData; - return Status::Ready; -} - -void InnerShuffleGatherTransform::work() -{ - if (has_input) - { - has_input = false; - if (pending_rows >= DEFAULT_BLOCK_SIZE || running_inputs.empty() || pending_chunks > 100) - { - output_chunk = generateOneChunk(); - has_output = true; - } - } -} - -Chunk InnerShuffleGatherTransform::generateOneChunk() -{ - Chunk head_chunk = std::move(input_chunks.front()); - input_chunks.pop_front(); - auto mutable_cols = head_chunk.mutateColumns(); - while (!input_chunks.empty()) - { - if (!input_chunks.front().hasRows()) [[unlikely]] - { - input_chunks.pop_front(); - continue; - } - const auto & cols = input_chunks.front().getColumns(); - auto rows = input_chunks.front().getNumRows(); - for (size_t i = 0, n = mutable_cols.size(); i < n; ++i) - { - auto src_col = recursiveRemoveSparse(cols[i]); - mutable_cols[i]->insertRangeFrom(*src_col, 0, rows); - } - input_chunks.pop_front(); - } - auto chunk = Chunk(std::move(mutable_cols), pending_rows); - input_chunks.clear(); - pending_rows = 0; - pending_chunks = 0; - return chunk; -} } diff --git a/src/Processors/Transforms/InnerShuffleTransform.h b/src/Processors/Transforms/InnerShuffleTransform.h index 24a6ccff1b6c..f9e1617c547c 100644 --- a/src/Processors/Transforms/InnerShuffleTransform.h +++ b/src/Processors/Transforms/InnerShuffleTransform.h @@ -10,17 +10,18 @@ class InnerShuffleScatterChunkInfo : public ChunkInfo public: size_t finished_streams = 0; size_t count = 0; + // scatter result from InnerShuffleScatterTransform. chunks.size() == num_streams std::vector chunks; }; // Split one chunk into multiple chunks according to the hash value of the specified columns. // And pass a list of chunks to InnerShuffleDispatchTransform. -class InnerShuffleScatterTransformV2 : public IProcessor +class InnerShuffleScatterTransform : public IProcessor { public: - InnerShuffleScatterTransformV2(size_t num_streams_, const Block & header_, const std::vector & hash_columns_); - ~InnerShuffleScatterTransformV2() override = default; - String getName() const override { return "InnerShuffleScatterTransformV2"; } + InnerShuffleScatterTransform(size_t num_streams_, const Block & header_, const std::vector & hash_columns_); + ~InnerShuffleScatterTransform() override = default; + String getName() const override { return "InnerShuffleScatterTransform"; } Status prepare() override; void work() override; private: @@ -33,8 +34,8 @@ class InnerShuffleScatterTransformV2 : public IProcessor Chunk input_chunk; }; -// Collect all splitted chunks from multiple InnerShuffleScatterTransformV2 and dispatch them into -// corresponding partitions. +// Collect all hash split chunks from multiple InnerShuffleScatterTransform and dispatch them +// into corresponding partitions. class InnerShuffleDispatchTransform : public IProcessor { public: @@ -52,12 +53,14 @@ class InnerShuffleDispatchTransform : public IProcessor std::vector> output_chunks; }; -class InnerShuffleGatherTransformV2 : public IProcessor +// Collect result from InnerShuffleDispatchTransforms, make sure that each stream will be handled by +// one thread. +class InnerShuffleGatherTransform : public IProcessor { public: - InnerShuffleGatherTransformV2(const Block & header_, size_t input_num_); - ~InnerShuffleGatherTransformV2() override = default; - String getName() const override { return "InnerShuffleGatherTransformV2"; } + InnerShuffleGatherTransform(const Block & header_, size_t input_num_); + ~InnerShuffleGatherTransform() override = default; + String getName() const override { return "InnerShuffleGatherTransform"; } Status prepare() override; void work() override; private: @@ -67,41 +70,4 @@ class InnerShuffleGatherTransformV2 : public IProcessor std::vector input_port_ptrs; size_t input_port_iter = 0; }; - -class InnerShuffleScatterTransform : public IProcessor -{ -public: - InnerShuffleScatterTransform(size_t num_streams_, const Block & header_, const std::vector & hash_columns_); - String getName() const override { return "InnerShuffleScatterTransform"; } - Status prepare() override; - void work() override; - -private: - size_t num_streams; - Block header; - std::vector hash_columns; - bool has_output = false; - std::list output_chunks; - std::vector> pending_output_chunks; - bool has_input = false; - Chunk input_chunk; -}; - -class InnerShuffleGatherTransform : public IProcessor -{ -public: - InnerShuffleGatherTransform(const Block & header_, size_t inputs_num_); - String getName() const override { return "InnerShuffleGatherTransform"; } - Status prepare() override; - void work() override; -private: - bool has_input = false; - bool has_output = false; - std::list input_chunks; - size_t pending_rows = 0; - size_t pending_chunks = 0; - Chunk output_chunk; - std::list running_inputs; - Chunk generateOneChunk(); -}; } diff --git a/src/Processors/Transforms/JoiningTransform.cpp b/src/Processors/Transforms/JoiningTransform.cpp index 0090d011fa67..bba8ec6fa163 100644 --- a/src/Processors/Transforms/JoiningTransform.cpp +++ b/src/Processors/Transforms/JoiningTransform.cpp @@ -305,10 +305,7 @@ void FillingRightJoinSideTransform::work() if (for_totals) join->setTotals(block); else - { - LOG_ERROR(&Poco::Logger::get("FillingRightJoinSideTransform"), "xxx FillingRightJoinSideTransform::work. {}/{}", fmt::ptr(this), fmt::ptr(join.get())); stop_reading = !join->addJoinedBlock(block); - } set_totals = for_totals; } diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index e259c35ace12..ceead0c5028c 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -576,13 +576,6 @@ std::unique_ptr QueryPipelineBuilder::joinPipelinesRightLe return left; } -static UInt32 toPowerOfTwo(UInt32 x) -{ - if (x <= 1) - return 1; - return static_cast(1) << (32 - std::countl_zero(x - 1)); -} - std::unique_ptr QueryPipelineBuilder::joinPipelinesRightLeftByShuffle( std::unique_ptr left, std::unique_ptr right, @@ -607,7 +600,7 @@ std::unique_ptr QueryPipelineBuilder::joinPipelinesRightLe size_t original_num_strems = left->getNumStreams(); size_t num_streams = original_num_strems; size_t default_max_shuffle_parts = 32; - size_t max_shuffle_parts = toPowerOfTwo(static_cast(max_streams)); + size_t max_shuffle_parts = InnerShuffleStep::alignStreamsNum(static_cast(max_streams)); if (max_shuffle_parts > max_streams && max_streams != 1) { max_shuffle_parts /= 2; @@ -632,11 +625,11 @@ std::unique_ptr QueryPipelineBuilder::joinPipelinesRightLe // from both side. DataStream right_datestream; right_datestream.header = right->getHeader(); - InnerShuffleStepV2 right_shuffle_step(right_datestream, join->getTableJoin().getOnlyClause().key_names_right); + InnerShuffleStep right_shuffle_step(right_datestream, join->getTableJoin().getOnlyClause().key_names_right); right_shuffle_step.transformPipeline(*right, BuildQueryPipelineSettings()); DataStream left_datastream; left_datastream.header = left->getHeader(); - InnerShuffleStepV2 left_shuffle_step(left_datastream, join->getTableJoin().getOnlyClause().key_names_left); + InnerShuffleStep left_shuffle_step(left_datastream, join->getTableJoin().getOnlyClause().key_names_left); left_shuffle_step.transformPipeline(*left, BuildQueryPipelineSettings());