Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make random stream eps be query time settings #277

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, javascript_uda_max_concurrency, 1, "Control the concurrency of JavaScript UDA in a query", 0) \
M(Float, replay_speed, 0., "Control the replay speed..0 < replay_speed < 1, means replay slower.replay_speed == 1, means replay by actual ingest interval.1 < replay_speed < <max_limit>, means replay faster", 0) \
M(UInt64, max_events, 0, "Total events to generate for random stream", 0) \
M(Int64, generate_eps, -1, "control the random stream eps in query time, defalut value is -1, if it is 0 means no limit.", 0) \
/** proton: ends. */
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.
Expand Down
13 changes: 8 additions & 5 deletions src/Storages/Streaming/StorageRandom.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -698,9 +698,12 @@ Pipe StorageRandom::read(
auto max_events = context->getSettingsRef().max_events;
auto events_share = max_events / num_streams;
auto events_remainder = max_events % num_streams;
if (events_per_second < num_streams)

/// setting random stream eps in query time, if generate_eps is not defalut value, use generate_eps as eps first.
UInt64 eps = context->getSettingsRef().generate_eps < 0 ? events_per_second : static_cast<UInt64>(context->getSettingsRef().generate_eps);
if (eps < num_streams)
{
if (events_per_second == 0)
if (eps == 0)
{
/// special case eps = 0: means no limit
for (size_t i = 0; i < num_streams - 1; i++)
Expand Down Expand Up @@ -737,16 +740,16 @@ Pipe StorageRandom::read(
block_header,
our_columns,
context,
events_per_second,
eps,
1000,
query_info.syntax_analyzer_result->streaming,
max_events));
}
}
else
{
size_t eps_thread = events_per_second / num_streams;
size_t remainder = events_per_second % num_streams;
size_t eps_thread = eps / num_streams;
size_t remainder = eps % num_streams;
/// number of data generated per second is bigger than the number of thread;
for (size_t i = 0; i < num_streams - 1; i++)
{
Expand Down
68 changes: 67 additions & 1 deletion tests/stream/test_stream_smoke/0021_random_stream.json
lizhou1111 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,72 @@
]
}
]
},
{
"id": 17,
"tags": ["random stream table query"],
"name": "test random stream table query, set max_events=1000",
"description": "test random stream table query",
"steps":[
{
"statements": [
{"client":"python", "query_type": "table", "wait":2, "query": "drop stream if exists test22_create_random"},
{"client":"python", "query_type": "table", "wait":2, "query": "create random stream test22_create_random(id int default rand()%4) engine Random() settings eps=100000"},
{"client":"python", "query_type": "table", "query_id":"2226", "wait":2, "query":"select count(1) from table(test22_create_random) settings max_events=1000"}
]
}
],
"expected_results": [
{
"query_id":"2226", "expected_results":[
["1000"]
]
}
]
},
{
"id": 18,
"tags": ["random stream table query"],
"name": "test random stream stream query, set max_events=1000",
"description": "test random stream stream query",
"steps":[
{
"statements": [
{"client":"python", "query_type": "table", "wait":2, "query": "drop stream if exists test22_create_random"},
{"client":"python", "query_type": "table", "wait":2, "query": "create random stream test22_create_random(id int default rand()%4) engine Random() settings eps=1000"},
{"client":"python", "query_type": "stream", "terminate" : "auto", "query_end_timer" : 7, "query_id":"2227", "wait":2, "query":"select count(1) from table(test22_create_random) settings max_events=5000"}
]
}
],
"expected_results": [
{
"query_id":"2227", "expected_results":[
["5000"]
]
}
]
},
{
"id": 19,
"tags": ["create random", "unstable"],
"name": "set generate_eps, test eps",
"description": "set generate_eps, test eps",
"steps":[
{
"statements": [
{"client":"python", "query_type": "table", "wait":2, "query": "drop stream if exists test22_create_random"},
{"client":"python", "query_type": "table", "wait":2, "query": "create random stream test22_create_random(id int default rand()%4) engine Random() settings eps=1000"},
{"client":"python", "query_type": "table", "query_id":"2228", "wait":2, "query":"select count(1) from tumble(test22_create_random, 1s) group by window_start limit 5 settings generate_eps=10000"}
]
}
],
"expected_results": [
{
"query_id":"2228", "expected_results":[
["10000"], ["10000"], ["10000"], ["10000"], ["10000"]
]
}
]
}
]
}
}