Skip to content

Commit

Permalink
Merge pull request ClickHouse#59448 from nickitat/insert_with_max_ins…
Browse files Browse the repository at this point in the history
…ert_threads_into_remote_tables

More parallel insert-select pipeline
  • Loading branch information
nickitat authored Feb 27, 2024
2 parents 3cdd349 + 4e5cfd1 commit 90c9ae1
Show file tree
Hide file tree
Showing 15 changed files with 88 additions and 8 deletions.
8 changes: 7 additions & 1 deletion src/Interpreters/InterpreterInsertQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,13 @@ BlockIO InterpreterInsertQuery::execute()

if (settings.max_insert_threads > 1)
{
pre_streams_size = std::min(static_cast<size_t>(settings.max_insert_threads), pipeline.getNumStreams());
auto table_id = table->getStorageID();
auto views = DatabaseCatalog::instance().getDependentViews(table_id);

/// It breaks some views-related tests and we have dedicated `parallel_view_processing` for views, so let's just skip them.
const bool resize_to_max_insert_threads = !table->isView() && views.empty();
pre_streams_size = resize_to_max_insert_threads ? settings.max_insert_threads
: std::min<size_t>(settings.max_insert_threads, pipeline.getNumStreams());
if (table->supportsParallelInsert())
sink_streams_size = pre_streams_size;
}
Expand Down
1 change: 0 additions & 1 deletion src/Processors/Transforms/buildPushingToViewsChain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ struct ViewsData
StorageID source_storage_id;
StorageMetadataPtr source_metadata_snapshot;
StoragePtr source_storage;
/// This value is actually only for logs.
size_t max_threads = 1;

/// In case of exception happened while inserting into main table, it is pushed to pipeline.
Expand Down
2 changes: 2 additions & 0 deletions tests/queries/0_stateless/00047_stored_aggregates_complex.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
DROP TABLE IF EXISTS stored_aggregates;

set max_insert_threads = 1;

set allow_deprecated_syntax_for_merge_tree=1;
CREATE TABLE stored_aggregates
(
Expand Down
2 changes: 2 additions & 0 deletions tests/queries/0_stateless/00340_squashing_insert_select.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ SET max_block_size = 10000;
SET min_insert_block_size_rows = 1000000;
SET min_insert_block_size_bytes = 0;

set max_insert_threads = 1;

INSERT INTO numbers_squashed SELECT * FROM system.numbers LIMIT 10000000;
SELECT blockSize() AS b, count() / b AS c FROM numbers_squashed GROUP BY blockSize() ORDER BY c DESC;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ CREATE TABLE default_codec_synthetic
id UInt64 Codec(ZSTD(3))
) ENGINE MergeTree() ORDER BY tuple() SETTINGS min_bytes_for_wide_part = 0, compress_marks = false, compress_primary_key = false, ratio_of_defaults_for_sparse_serialization = 1;

set max_insert_threads = 1;

INSERT INTO delta_codec_synthetic SELECT number FROM system.numbers LIMIT 5000000;
INSERT INTO default_codec_synthetic SELECT number FROM system.numbers LIMIT 5000000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ CREATE TABLE buffer_ (key UInt64) Engine=Buffer(currentDatabase(), null_,

SET max_memory_usage=10e6;
SET max_block_size=100e3;
SET max_insert_threads=1;

-- Check that max_memory_usage is ignored only on flush and not on squash
SET min_insert_block_size_bytes=9e6;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ AS
minState( toUInt64(-1) ) as fastest_session,
maxState( toUInt64(0) ) as biggest_inactivity_period
FROM numbers(50000)
GROUP BY id;
GROUP BY id
SETTINGS max_insert_threads=1;

-- source table #1

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
-- Tags: long

DROP TABLE IF EXISTS t;

CREATE TABLE t
Expand All @@ -8,7 +10,7 @@ ENGINE = MergeTree
ORDER BY number
SETTINGS index_granularity = 128, ratio_of_defaults_for_sparse_serialization = 1.0, index_granularity_bytes = '10Mi';

SET min_insert_block_size_rows = 0, min_insert_block_size_bytes = 0;
SET min_insert_block_size_rows = 0, min_insert_block_size_bytes = 0, max_insert_threads = 1;
INSERT INTO t SELECT number FROM numbers(10000000);

SET max_threads = 1, max_block_size = 12345;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
drop table if exists test;
create table test (x AggregateFunction(uniq, UInt64), y Int64) engine=Memory;
set max_insert_threads = 1;
insert into test select uniqState(number) as x, number as y from numbers(10) group by number order by x, y;
select uniqStateMap(map(1, x)) OVER (PARTITION BY y) from test;
select uniqStateForEach([x]) OVER (PARTITION BY y) from test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&send_progress_in_http_headers=1&http_headers_progress_interval_ms=0" -d @- <<< "insert into function null('_ Int') select * from numbers(5) settings max_block_size=1" -v |& {
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&send_progress_in_http_headers=1&http_headers_progress_interval_ms=0" -d @- <<< "insert into function null('_ Int') select * from numbers(5) settings max_block_size=1, max_insert_threads=1" -v |& {
grep -F -e X-ClickHouse-Progress: -e X-ClickHouse-Summary: | sed 's/,\"elapsed_ns[^}]*//'
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ create table rmt1 (n int) engine=ReplicatedMergeTree('/test/02468/{database}', '
system stop cleanup rmt;
system stop merges rmt1;

insert into rmt select * from numbers(10) settings max_block_size=1;
insert into rmt select * from numbers(10) settings max_block_size=1, max_insert_threads=1;

alter table rmt drop partition id '0';
truncate table rmt1;
Expand All @@ -31,7 +31,7 @@ create table rmt2 (n int) engine=ReplicatedMergeTree('/test/02468/{database}2',

system stop cleanup rmt;
system stop merges rmt1;
insert into rmt select * from numbers(10) settings max_block_size=1;
insert into rmt select * from numbers(10) settings max_block_size=1, max_insert_threads=1;
system sync replica rmt1 lightweight;

alter table rmt replace partition id '0' from rmt2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ $CLICKHOUSE_CLIENT -q "SYSTEM STOP MERGES test_02521_insert_delay"
for i in {0..4}
do
query_id="${CLICKHOUSE_DATABASE}_02521_${i}_$RANDOM$RANDOM"
$CLICKHOUSE_CLIENT --query_id="$query_id" -q "INSERT INTO test_02521_insert_delay SELECT number, toString(number) FROM numbers(${i}, 1)"
$CLICKHOUSE_CLIENT --query_id="$query_id" --max_insert_threads 1 -q "INSERT INTO test_02521_insert_delay SELECT number, toString(number) FROM numbers(${i}, 1)"
$CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS"
$CLICKHOUSE_CLIENT --param_query_id="$query_id" -q "select ProfileEvents['DelayedInsertsMilliseconds'] as delay from system.query_log where event_date >= yesterday() and current_database = '$CLICKHOUSE_DATABASE' and query_id = {query_id:String} order by delay desc limit 1"
done
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
inserting into a remote table from local with concurrency equal to max_insert_threads
9
inserting into a remote table from remote with concurrency max_insert_threads
9
inserting into a remote table from remote (reading with parallel replicas) with concurrency max_insert_threads
9
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env bash
# Tags: no-random-settings

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh


${CLICKHOUSE_CLIENT} -nq """
CREATE TABLE t1_local
(
n UInt64,
)
ENGINE = MergeTree
ORDER BY n;
CREATE TABLE t3_dist
(
n UInt64,
)
ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), 't1_local', rand());
CREATE TABLE t4_pr
(
n UInt64,
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/02981_insert_select', '1')
ORDER BY n;
SYSTEM STOP MERGES t1_local;
INSERT INTO t1_local SELECT * FROM numbers_mt(1e6);
"""

max_insert_threads=9

echo "inserting into a remote table from local with concurrency equal to max_insert_threads"
${CLICKHOUSE_CLIENT} --max_insert_threads "$max_insert_threads" -q """
EXPLAIN PIPELINE
INSERT INTO t3_dist
SELECT * FROM t1_local;
""" | grep -v EmptySink | grep -c Sink

echo "inserting into a remote table from remote with concurrency max_insert_threads"
${CLICKHOUSE_CLIENT} --max_insert_threads "$max_insert_threads" --parallel_distributed_insert_select 0 -q """
EXPLAIN PIPELINE
INSERT INTO t3_dist
SELECT * FROM t3_dist;
""" | grep -v EmptySink | grep -c Sink

echo "inserting into a remote table from remote (reading with parallel replicas) with concurrency max_insert_threads"
${CLICKHOUSE_CLIENT} --max_insert_threads "$max_insert_threads" --allow_experimental_parallel_reading_from_replicas 2 --cluster_for_parallel_replicas 'parallel_replicas' --max_parallel_replicas 3 -q """
EXPLAIN PIPELINE
INSERT INTO t3_dist
SELECT * FROM t4_pr;
""" | grep -v EmptySink | grep -c Sink
2 changes: 2 additions & 0 deletions utils/check-style/check-style
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ for test_case in "${tests_with_query_log[@]}"; do
} || echo "Queries to system.query_log/system.query_thread_log does not have current_database = currentDatabase() condition in $test_case"
done

grep -iE 'SYSTEM STOP MERGES;?$' -R $ROOT_PATH/tests/queries && echo "Merges cannot be disabled globally in fast/stateful/stateless tests, because it will break concurrently running queries"

# There shouldn't be large jumps between test numbers (since they should be consecutive)
max_diff=$(
find $ROOT_PATH/tests/queries -iname '*.sql' -or -iname '*.sh' -or -iname '*.py' -or -iname '*.j2' |
Expand Down

0 comments on commit 90c9ae1

Please sign in to comment.