Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] isLocalBucketShuffleJoin return wrong result (backport #51954) #51981

Closed
wants to merge 1 commit into from

Conversation

mergify[bot]
Copy link
Contributor

@mergify mergify bot commented Oct 16, 2024

Why I'm doing:

No data sent from the right side of Bucket Shuffle Right Join Build (ExchangeSink)

  1. First, the Bucket seqs 3, 5, 7, 8, 9 of the left table in the Bucket shuffle join were pruned.

  2. Generally, when bucket seqs are pruned, the instance id on the receiver side in the channel map is recorded as -1 to indicate no data is being sent. However, for right outer join, the NULL values from the build side need to be output, so for such cases, any valid instance id must be assigned to the pruned bucket seq.

for (int32_t channel_id : _channel_indices) {
    if (_channels[channel_id]->get_fragment_instance_id().lo == -1) {
        // dest bucket is not used, continue
        continue;
    }

    for (int32_t i = 0; i < _num_shuffles_per_channel; ++i) {
        int shuffle_id = channel_id * _num_shuffles_per_channel + i;
        int driver_sequence = _driver_sequence_per_shuffle[shuffle_id];

        size_t from = _channel_row_idx_start_points[shuffle_id];
        size_t size = _channel_row_idx_start_points[shuffle_id + 1] - from;
        if (size == 0) {
            // no data for this channel, continue;
            continue;
        }

        RETURN_IF_ERROR(_channels[channel_id]->add_rows_selective(send_chunk, 
driver_sequence,
                                                                  
_row_indexes.data(), from, size, state));
    }
}
  1. In practice, all the join column data on the right-hand side is NULL, and no valid instance id is assigned in the channel map, resulting in data being filtered.
    image

2. The issue where NULL values on the HashJoin Build side need to be output, and assigning any instance id to the pruned bucketSeqs

  1. Cases where NULL values on the build side need to be output:

    a. Right join, full join, Right Anti JOIN

    b. Null-safe-eq join: <=>. It seems this is not currently considered in the logic, further confirmation is required.

  2. Determine whether NULL values on the build side need to be output by isRightOrFullBucketShuffleFragment.

  3. isRightOrFullBucketShuffleFragment is expected to be true, but is actually false.

if (isRightOrFullBucketShuffleFragment && colocatedAssignment.isAllScanNodesAssigned()) {
    int bucketNum = colocatedAssignment.bucketNum;

    for (int bucketSeq = 0; bucketSeq < bucketNum; ++bucketSeq) {
        if (!bucketSeqToWorkerId.containsKey(bucketSeq)) {
            long workerId = workerProvider.selectNextWorker();
            bucketSeqToWorkerId.put(bucketSeq, workerId);
        }
        if (!bucketSeqToScanRange.containsKey(bucketSeq)) {
            bucketSeqToScanRange.put(bucketSeq, Maps.newHashMap());
            bucketSeqToScanRange.get(bucketSeq).put(scanNode.getId().asInt(), 
Lists.newArrayList());
        }
    }
}

Top-Down Determination of isRightOrFullBucketShuffle: depends on first visited

private boolean isLocalBucketShuffleJoin(PlanNode root) {
    if (root instanceof ExchangeNode) {
        return false;
    }

    if (root instanceof JoinNode) {
        JoinNode joinNode = (JoinNode) root;
        if (joinNode.isLocalHashBucket()) {
            isRightOrFullBucketShuffle = 
joinNode.getJoinOp().isFullOuterJoin() || joinNode.getJoinOp().isRightJoin();
            return true;
        }
    }

    boolean childHasBucketShuffle = false;
    for (PlanNode child : root.getChildren()) {
        childHasBucketShuffle |= isLocalBucketShuffleJoin(child);
    }

    return childHasBucketShuffle;
}

image

In the erroneous plan, there are three JOINS, and the above bucket shuffle left join is the first visited, directly determining isRightOrFullBucketShuffle = false.

What I'm doing:

Fix function isLocalBucketShuffleJoin to return correct result.

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.3
    • 3.2
    • 3.1
    • 3.0
    • 2.5

This is an automatic backport of pull request #51954 done by [Mergify](https://mergify.com). ## Why I'm doing:

No data sent from the right side of Bucket Shuffle Right Join Build (ExchangeSink)

  1. First, the Bucket seqs 3, 5, 7, 8, 9 of the left table in the Bucket shuffle join were pruned.

  2. Generally, when bucket seqs are pruned, the instance id on the receiver side in the channel map is recorded as -1 to indicate no data is being sent. However, for right outer join, the NULL values from the build side need to be output, so for such cases, any valid instance id must be assigned to the pruned bucket seq.

for (int32_t channel_id : _channel_indices) {
    if (_channels[channel_id]->get_fragment_instance_id().lo == -1) {
        // dest bucket is not used, continue
        continue;
    }

    for (int32_t i = 0; i < _num_shuffles_per_channel; ++i) {
        int shuffle_id = channel_id * _num_shuffles_per_channel + i;
        int driver_sequence = _driver_sequence_per_shuffle[shuffle_id];

        size_t from = _channel_row_idx_start_points[shuffle_id];
        size_t size = _channel_row_idx_start_points[shuffle_id + 1] - from;
        if (size == 0) {
            // no data for this channel, continue;
            continue;
        }

        RETURN_IF_ERROR(_channels[channel_id]->add_rows_selective(send_chunk, 
driver_sequence,
                                                                  
_row_indexes.data(), from, size, state));
    }
}
  1. In practice, all the join column data on the right-hand side is NULL, and no valid instance id is assigned in the channel map, resulting in data being filtered.
    image

2. The issue where NULL values on the HashJoin Build side need to be output, and assigning any instance id to the pruned bucketSeqs

  1. Cases where NULL values on the build side need to be output:

    a. Right join, full join, Right Anti JOIN

    b. Null-safe-eq join: <=>. It seems this is not currently considered in the logic, further confirmation is required.

  2. Determine whether NULL values on the build side need to be output by isRightOrFullBucketShuffleFragment.

  3. isRightOrFullBucketShuffleFragment is expected to be true, but is actually false.

if (isRightOrFullBucketShuffleFragment && colocatedAssignment.isAllScanNodesAssigned()) {
    int bucketNum = colocatedAssignment.bucketNum;

    for (int bucketSeq = 0; bucketSeq < bucketNum; ++bucketSeq) {
        if (!bucketSeqToWorkerId.containsKey(bucketSeq)) {
            long workerId = workerProvider.selectNextWorker();
            bucketSeqToWorkerId.put(bucketSeq, workerId);
        }
        if (!bucketSeqToScanRange.containsKey(bucketSeq)) {
            bucketSeqToScanRange.put(bucketSeq, Maps.newHashMap());
            bucketSeqToScanRange.get(bucketSeq).put(scanNode.getId().asInt(), 
Lists.newArrayList());
        }
    }
}

Top-Down Determination of isRightOrFullBucketShuffle: depends on first visited

private boolean isLocalBucketShuffleJoin(PlanNode root) {
    if (root instanceof ExchangeNode) {
        return false;
    }

    if (root instanceof JoinNode) {
        JoinNode joinNode = (JoinNode) root;
        if (joinNode.isLocalHashBucket()) {
            isRightOrFullBucketShuffle = 
joinNode.getJoinOp().isFullOuterJoin() || joinNode.getJoinOp().isRightJoin();
            return true;
        }
    }

    boolean childHasBucketShuffle = false;
    for (PlanNode child : root.getChildren()) {
        childHasBucketShuffle |= isLocalBucketShuffleJoin(child);
    }

    return childHasBucketShuffle;
}

image

In the erroneous plan, there are three JOINS, and the above bucket shuffle left join is the first visited, directly determining isRightOrFullBucketShuffle = false.

What I'm doing:

Fix function isLocalBucketShuffleJoin to return correct result.

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Signed-off-by: satanson <[email protected]>
(cherry picked from commit 1077a6d)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/ExecutionFragment.java
@mergify mergify bot added the conflicts label Oct 16, 2024
Copy link
Contributor Author

mergify bot commented Oct 16, 2024

Cherry-pick of 1077a6d has failed:

On branch mergify/bp/branch-2.5/pr-51954
Your branch is up to date with 'origin/branch-2.5'.

You are currently cherry-picking commit 1077a6df4f.
  (fix conflicts and run "git cherry-pick --continue")
  (use "git cherry-pick --skip" to skip this patch)
  (use "git cherry-pick --abort" to cancel the cherry-pick operation)

Changes to be committed:
	new file:   fe/fe-core/src/test/java/com/starrocks/qe/TestBucketShuffleRightJoin.java
	new file:   test/sql/test_bucket_shuffle_right_join/R/test_bucket_shuffle_right_join
	new file:   test/sql/test_bucket_shuffle_right_join/T/test_bucket_shuffle_right_join

Unmerged paths:
  (use "git add/rm <file>..." as appropriate to mark resolution)
	deleted by us:   fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/ExecutionFragment.java

To fix up this pull request, you can check it out locally. See documentation: https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/reviewing-changes-in-pull-requests/checking-out-pull-requests-locally

@wanpengfei-git wanpengfei-git enabled auto-merge (squash) October 16, 2024 08:40
@mergify mergify bot closed this Oct 16, 2024
auto-merge was automatically disabled October 16, 2024 08:40

Pull request was closed

Copy link
Contributor Author

mergify bot commented Oct 16, 2024

@mergify[bot]: Backport conflict, please reslove the conflict and resubmit the pr

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant