diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 3f0947d7f588..08085e6fc1f8 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -98,6 +98,20 @@ pub(crate) struct SortPreservingMergeStream { cursors: Vec>>, /// Configuration parameter to enable round-robin selection of tied winners of loser tree. + /// + /// To address the issue of unbalanced polling between partitions due to tie-breakers being based + /// on partition index, especially in cases of low cardinality, we are making changes to the winner + /// selection mechanism. Previously, partitions with smaller indices were consistently chosen as the winners, + /// leading to an uneven distribution of polling. This caused upstream operator buffers for the other partitions + /// to grow excessively, as they continued receiving data without consuming it. + /// + /// For example, an upstream operator like a repartition execution would keep sending data to certain partitions, + /// but those partitions wouldn't consume the data if they weren't selected as winners. This resulted in inefficient buffer usage. + /// + /// To resolve this, we are modifying the tie-breaking logic. Instead of always choosing the partition with the smallest index, + /// we now select the partition that has the fewest poll counts for the same value. + /// This ensures that multiple partitions with the same value are chosen equally, distributing the polling load in a round-robin fashion. + /// This approach balances the workload more effectively across partitions and avoids excessive buffer growth.Round robin tie breaker enable_round_robin_tie_breaker: bool, /// Flag indicating whether we are in the mode of round-robin