-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
shuffle by #258
shuffle by #258
Conversation
2049fc4
to
2edf66e
Compare
if (elem->as<ASTIdentifier>()) | ||
shuffle_by_columns.push_back(elem->getColumnName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If shuffle by keys are an expression or constant, ignore them ?
I guess the shuffle by keys should be calculated before light shuffling, so we can always use shuffle_by_columns.push_back(elem->getColumnName())
even if it is not an identifier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will still do the work i think and it shall be fine just like regular columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meaning if end user do something like aliasing a constant or an expression / function as an identifier, we will do the shuffle work as expected.
When the logic reaches here, the ast of the shuffle by can be asserted to ASTIdentifier since it is already validated in the AST parser.
if (shuffle_by_expression_list) | ||
{ | ||
for (const auto & elem : shuffle_by_expression_list->children) | ||
if (!elem->as<ASTIdentifier>()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Identifier may be an alias of expression/function.
If want to validate whether is column, we can call IdentifierSemantic::getMembership
after TranslateQualifiedNamesVisitor
in TreeRewriter,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, thought about this. For now, probably it is fine and we may extend this soon to support expression for shuffle by.
const auto & key_col = columns[key_column_position]->convertToFullColumnIfConst(); | ||
const auto & key_col_no_lc = recursiveRemoveLowCardinality(recursiveRemoveSparse(key_col)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need this code, since ColumnConst/ColumnLowCardinality/ColumnSparse
have been implemented this method updateWeakHash32
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just checked the code. I am not quite sure if they will produce same hash results. Have you tested it out ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing mean, 2 or more different low cardinality blocks for example, they contain some overlapping lower cardinality rows but with other rows different, when we do the hash, we will need make sure same LC rows across different blocks will shuffled to same shard (having the same hash). Same for sparse column.
{ | ||
if (!input.isFinished() && input.hasData()) | ||
{ | ||
shuffled_chunks.push_back(input.pull()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE, for streaming aggregating, we need to use pull(/*set_not_needed*/true)
, since the inputs can receive heartbeat chunk, then trigger upstream again, next loop above actions, which will cause pipeline stuck
, there are some reasons:
- The upstream is triggered first, resulting in no thread calling the downstream processor.
- Missing propagate heartbeat chunk can also cause pipeline stuck.
Of course, it works in historical aggregating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pure historical data. Streaming/AggregatingTransform.cpp is not covered yet
2edf66e
to
621497a
Compare
PR checklist:
proton: starts/ends
for new code in existing community code base ?Please write user-readable short description of the changes:
Closed #256.
Streaming part will be done in a separate PR