Skip to content

Commit

Permalink
refactor(executor): use event cause to refactor shuffle processor
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Sep 12, 2024
1 parent 5d756fc commit 1d21645
Showing 1 changed file with 27 additions and 3 deletions.
30 changes: 27 additions & 3 deletions src/query/pipeline/core/src/processors/shuffle_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ impl Processor for ShuffleProcessor {

fn event_with_cause(&mut self, cause: EventCause) -> Result<Event> {
let (input, output) = match cause {
EventCause::Other => {
return Ok(Event::NeedConsume);
}
EventCause::Other => unreachable!(),
EventCause::Input(index) => {
(&self.inputs[index], &self.outputs[self.input2output[index]])
}
Expand All @@ -87,6 +85,19 @@ impl Processor for ShuffleProcessor {

if output.is_finished() {
input.finish();

for input in &self.inputs {
if !input.is_finished() {
return Ok(Event::NeedConsume);
}
}

for output in &self.outputs {
if !output.is_finished() {
return Ok(Event::NeedConsume);
}
}

return Ok(Event::Finished);
}

Expand All @@ -102,6 +113,19 @@ impl Processor for ShuffleProcessor {

if input.is_finished() {
output.finish();

for input in &self.inputs {
if !input.is_finished() {
return Ok(Event::NeedConsume);
}
}

for output in &self.outputs {
if !output.is_finished() {
return Ok(Event::NeedConsume);
}
}

return Ok(Event::Finished);
}

Expand Down

0 comments on commit 1d21645

Please sign in to comment.