Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of external aggregation with a lot of temporary f… #262

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/Common/HashTable/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,10 @@ class HashTable : private boost::noncopyable,
return !buf[place_value].isZero(*this);
}

bool ALWAYS_INLINE contains(const Key & x) const
{
return has(x);
}

void write(DB::WriteBuffer & wb) const
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,9 @@ GroupingAggregatedTransform::GroupingAggregatedTransform(
, num_inputs(num_inputs_)
, params(std::move(params_))
, last_bucket_number(num_inputs, -1)
, read_from_input(num_inputs, false)
{
}

void GroupingAggregatedTransform::readFromAllInputs()
{
auto in = inputs.begin();
read_from_all_inputs = true;

for (size_t i = 0; i < num_inputs; ++i, ++in)
{
if (in->isFinished())
continue;

if (read_from_input[i])
continue;

in->setNeeded();

if (!in->hasData())
{
read_from_all_inputs = false;
continue;
}

auto chunk = in->pull();
read_from_input[i] = true;
addChunk(std::move(chunk), i);
}
}

void GroupingAggregatedTransform::pushData(Chunks chunks, Int32 bucket, bool is_overflows)
{
auto & output = outputs.front();
Expand Down Expand Up @@ -116,7 +88,7 @@ bool GroupingAggregatedTransform::tryPushOverflowData()
return true;
}

IProcessor::Status GroupingAggregatedTransform::prepare()
IProcessor::Status GroupingAggregatedTransform::prepare(const PortNumbers & updated_input_ports, const PortNumbers &)
{
/// Check can output.
auto & output = outputs.front();
Expand All @@ -131,19 +103,51 @@ IProcessor::Status GroupingAggregatedTransform::prepare()
return Status::Finished;
}

/// Read first time from each input to understand if we have two-level aggregation.
if (!read_from_all_inputs)
if (!initialized_index_to_input)
{
readFromAllInputs();
if (!read_from_all_inputs)
return Status::NeedData;
initialized_index_to_input = true;
auto in = inputs.begin();
index_to_input.resize(num_inputs);

for (size_t i = 0; i < num_inputs; ++i, ++in)
index_to_input[i] = in;
}

/// Convert single level to two levels if have two-level input.
if (has_two_level && !single_level_chunks.empty())
return Status::Ready;
auto need_input = [this](size_t input_num)
{
if (last_bucket_number[input_num] < current_bucket)
return true;

return expect_several_chunks_for_single_bucket_per_source && last_bucket_number[input_num] == current_bucket;
};

if (!wait_input_ports_numbers.empty())
{
for (const auto & updated_input_port_number : updated_input_ports)
{
if (!wait_input_ports_numbers.contains(updated_input_port_number))
continue;

auto & input = index_to_input[updated_input_port_number];
if (!input->hasData())
{
wait_input_ports_numbers.erase(updated_input_port_number);
continue;
}

auto chunk = input->pull();
addChunk(std::move(chunk), updated_input_port_number);

if (!input->isFinished() && need_input(updated_input_port_number))
continue;

wait_input_ports_numbers.erase(updated_input_port_number);
}

if (!wait_input_ports_numbers.empty())
return Status::NeedData;
}

/// Check can push (to avoid data caching).
if (!output.canPush())
{
for (auto & input : inputs)
Expand All @@ -152,20 +156,16 @@ IProcessor::Status GroupingAggregatedTransform::prepare()
return Status::PortFull;
}

/// Convert single level to two levels if have two-level input.
if (has_two_level && !single_level_chunks.empty())
return Status::Ready;

bool pushed_to_output = false;

/// Output if has data.
if (has_two_level)
pushed_to_output = tryPushTwoLevelData();

auto need_input = [this](size_t input_num)
{
if (last_bucket_number[input_num] < current_bucket)
return true;

return expect_several_chunks_for_single_bucket_per_source && last_bucket_number[input_num] == current_bucket;
};

/// Read next bucket if can.
for (; ; ++current_bucket)
{
Expand All @@ -187,20 +187,24 @@ IProcessor::Status GroupingAggregatedTransform::prepare()

if (!in->hasData())
{
wait_input_ports_numbers.insert(input_num);
need_data = true;
continue;
}

auto chunk = in->pull();
addChunk(std::move(chunk), input_num);

if (has_two_level && !single_level_chunks.empty())
return Status::Ready;

if (!in->isFinished() && need_input(input_num))
{
wait_input_ports_numbers.insert(input_num);
need_data = true;
}
}

if (has_two_level && !single_level_chunks.empty())
return Status::Ready;

if (finished)
{
all_inputs_finished = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#pragma once
#include <Processors/IProcessor.h>

#include <Common/HashTable/HashSet.h>
#include <Interpreters/Aggregator.h>
#include <Processors/IProcessor.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/Transforms/AggregatingTransform.h>


namespace DB
Expand Down Expand Up @@ -66,7 +68,7 @@ class GroupingAggregatedTransform final : public IProcessor
void allowSeveralChunksForSingleBucketPerSource() { expect_several_chunks_for_single_bucket_per_source = true; }

protected:
Status prepare() override;
Status prepare(const PortNumbers & updated_input_ports, const PortNumbers &) override;
void work() override;

private:
Expand All @@ -82,15 +84,14 @@ class GroupingAggregatedTransform final : public IProcessor
bool has_two_level = false;

bool all_inputs_finished = false;
bool read_from_all_inputs = false;
std::vector<bool> read_from_input;
bool initialized_index_to_input = false;
std::vector<InputPorts::iterator> index_to_input;
HashSet<uint64_t> wait_input_ports_numbers;
yokofly marked this conversation as resolved.
Show resolved Hide resolved

bool expect_several_chunks_for_single_bucket_per_source = false;

/// Add chunk read from input to chunks_map, overflow_chunks or single_level_chunks according to it's chunk info.
void addChunk(Chunk chunk, size_t input);
/// Read from all inputs first chunk. It is needed to detect if any source has two-level aggregation.
void readFromAllInputs();
/// Push chunks if all inputs has single level.
bool tryPushSingleLevelData();
/// Push chunks from ready bucket if has one.
Expand Down
10 changes: 10 additions & 0 deletions tests/performance/aggregation_external.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<test>
<settings>
<max_threads>30</max_threads>
<max_bytes_before_external_group_by>10485760</max_bytes_before_external_group_by>
</settings>

<query>SELECT number, count() FROM numbers_mt(5000000) GROUP BY number FORMAT Null;</query>
<query>SELECT number, count() FROM numbers_mt(15000000) GROUP BY number FORMAT Null;</query>
<query>SELECT number, count() FROM numbers_mt(30000000) GROUP BY number FORMAT Null;</query>
</test>