Skip to content

Commit

Permalink
support append-only left all join append-only (#739)
Browse files Browse the repository at this point in the history
  • Loading branch information
yl-lisen authored Aug 13, 2024
1 parent 273900f commit 7a56aa8
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 7 deletions.
7 changes: 4 additions & 3 deletions src/Interpreters/Streaming/HashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ size_t insertFromBlockImpl(
const HashJoin::SupportMatrix HashJoin::support_matrix = {
/// <left_stroage_semantic, join_kind, join_strictness, right_storage_semantic> - supported
/// Append ...
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::Append}, true},
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::ChangelogKV}, true},
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::VersionedKV}, true},
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::Changelog}, true},
Expand Down Expand Up @@ -927,15 +928,15 @@ void HashJoin::init()

bidirectional_hash_join = !data_enrichment_join;

/// append-only inner join append-only on ... and date_diff_within(10s)
/// append-only inner/left join append-only on ... and date_diff_within(10s)
/// In case when emitChangeLog()
if (streaming_strictness == Strictness::Range
&& (left_data.join_stream_desc->data_stream_semantic != DataStreamSemantic::Append
|| right_data.join_stream_desc->data_stream_semantic != DataStreamSemantic::Append
|| streaming_kind != Kind::Inner))
|| (streaming_kind != Kind::Inner && streaming_kind != Kind::Left)))
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Only inner range join is supported and the left and right stream must be append-only streams in range join");
"Only inner/left range join is supported and the left and right stream must be append-only streams in range join");

range_bidirectional_hash_join = bidirectional_hash_join && (streaming_strictness == Strictness::Range);

Expand Down
114 changes: 113 additions & 1 deletion src/Interpreters/Streaming/tests/gtest_streaming_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ TEST(StreamingHashJoin, SimpleJoinTests)
context);

/// Additional range between
if (Streaming::isAppendStorage(left_data_stream_semantic) && kind == JoinKind::Inner && strictness == JoinStrictness::All
if (Streaming::isAppendStorage(left_data_stream_semantic) && (kind == JoinKind::Inner || kind == JoinKind::Left) && strictness == JoinStrictness::All
&& Streaming::isAppendStorage(right_data_stream_semantic))
{
commonTest(
Expand All @@ -826,6 +826,118 @@ TEST(StreamingHashJoin, SimpleJoinTests)
}
}

TEST(StreamingHashJoin, AppendLeftAllJoinAppend)
{
auto context = getContext().context;
Block left_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);
Block right_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);

/// stream(t1) left all join stream(t2) on t1.col_1 = t2.col_1
commonTest(
"left",
"all",
/*on_clause*/ "t1.col_1 = t2.col_1",
left_header,
Streaming::StorageSemantic::Append,
/*left_primary_key_column_indexes*/ std::nullopt,
right_header,
Streaming::StorageSemantic::Append,
/*right_primary_key_column_indexes*/ std::nullopt,
/*to_join_steps*/
{
{
/*to join pos*/ ToJoinStep::RIGHT,
/*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:00')", context),
/*expected join results*/ ExpectedJoinResults{},
},
{
/*to join pos*/ ToJoinStep::LEFT,
/*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:00')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')"
"(2, '2023-1-1 00:00:00', '1970-1-1 00:00:00')",
},
},
},
context);
}

TEST(StreamingHashJoin, AppendLeftRangeJoinAppend)
{
auto context = getContext().context;
Block left_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);
Block right_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);

commonTest(
"left",
"all",
/*on_clause*/ "t1.col_1 = t2.col_1 and date_diff_within(2s, t1.col_2, t2.col_2)",
left_header,
Streaming::StorageSemantic::Append,
/*left_primary_key_column_indexes*/ std::nullopt,
right_header,
Streaming::StorageSemantic::Append,
/*right_primary_key_column_indexes*/ std::nullopt,
/*to_join_steps*/
{
{
/*to join pos*/ ToJoinStep::RIGHT,
/*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:00')(1, '2023-1-1 00:00:01')", context),
/*expected join results*/ ExpectedJoinResults{},
},
{
/*to join pos*/ ToJoinStep::LEFT,
/*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:00')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')(1, '2023-1-1 00:00:00', '2023-1-1 00:00:01')"
"(2, '2023-1-1 00:00:00', '1970-1-1 00:00:00')",
},
},
{
/*to join pos*/ ToJoinStep::RIGHT,
/*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:02')(1, '2023-1-1 00:00:03')(2, '2023-1-1 00:00:02')(2, '2023-1-1 00:00:03')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
"(2, '2023-1-1 00:00:00', '2023-1-1 00:00:02')",
},
},
{
/*to join pos*/ ToJoinStep::LEFT,
/*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(1, '2023-1-1 00:00:02')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')"
"(1, '2023-1-1 00:00:00', '2023-1-1 00:00:01')"
"(1, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:00')"
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:01')"
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:02')"
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:03')",
},
},
{
/*to join pos*/ ToJoinStep::LEFT,
/*to join block*/ prepareBlockByHeader(left_header, "(2, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:02')(3, '2023-1-1 00:00:03')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(2, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
"(2, '2023-1-1 00:00:02', '2023-1-1 00:00:02')"
"(2, '2023-1-1 00:00:02', '2023-1-1 00:00:03')"
"(3, '2023-1-1 00:00:03', '1970-1-1 00:00:00')",
},
},
},
context);
}

TEST(StreamingHashJoin, AppendLeftAsofJoinAppend)
{
auto context = getContext().context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,19 @@
{"client":"python", "query_type": "stream", "query_id":"1056", "wait":1, "terminate":"manual", "query":"select i, k, j, kk from test10_append_left_stream left join test10_append_right_stream on k == kk;"},
{"client":"python", "query_type": "table", "depends_on":"1056", "wait":1, "query": "insert into test10_append_right_stream (j, kk) values (1, 'a') (1, 'b');"},
{"client":"python", "query_type": "table", "wait":1, "query": "insert into test10_append_left_stream (i, k) values (2, 'a');"},
{"client":"python", "query_type": "table", "wait":1, "query": "insert into test10_append_left_stream (i, k) values (2, 'b');"},
{"client":"python", "query_type": "table", "kill":"1056", "kill_wait":3, "wait":1, "query": "insert into test10_append_left_stream (i, k) values (2, 'c');"}
{"client":"python", "query_type": "table", "query": "insert into test10_append_left_stream (i, k) values (2, 'b');"},
{"client":"python", "query_type": "table", "kill":"1056", "kill_wait":3, "query": "insert into test10_append_left_stream (i, k) values (2, 'c');"}
]
}
],
"expected_results": [
{
"query_id":"1056",
"expected_results": "error_code:48"
"expected_results":[
[2, "a", 1, "a"],
[2, "b", 1, "b"],
[2, "c", 0, ""]
]
}
]
},
Expand Down

0 comments on commit 7a56aa8

Please sign in to comment.