Skip to content

Commit

Permalink
refactor(executor): use event cause to refactor shuffle processor
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Sep 12, 2024
1 parent 1d21645 commit 03c5b42
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 30 deletions.
64 changes: 35 additions & 29 deletions src/query/pipeline/core/src/processors/shuffle_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ pub struct ShuffleProcessor {
input2output: Vec<usize>,
output2input: Vec<usize>,

inputs: Vec<Arc<InputPort>>,
outputs: Vec<Arc<OutputPort>>,
finished_port: usize,
inputs: Vec<(bool, Arc<InputPort>)>,
outputs: Vec<(bool, Arc<OutputPort>)>,
}

impl ShuffleProcessor {
Expand Down Expand Up @@ -56,8 +57,9 @@ impl ShuffleProcessor {
ShuffleProcessor {
input2output,
output2input,
inputs,
outputs,
finished_port: 0,
inputs: inputs.into_iter().map(|x| (false, x)).collect(),
outputs: outputs.into_iter().map(|x| (false, x)).collect(),
}
}
}
Expand All @@ -73,32 +75,35 @@ impl Processor for ShuffleProcessor {
}

fn event_with_cause(&mut self, cause: EventCause) -> Result<Event> {
let (input, output) = match cause {
let ((input_finished, input), (output_finished, output)) = match cause {
EventCause::Other => unreachable!(),
EventCause::Input(index) => {
(&self.inputs[index], &self.outputs[self.input2output[index]])
}
EventCause::Output(index) => {
(&self.inputs[self.output2input[index]], &self.outputs[index])
}
EventCause::Input(index) => (
&mut self.inputs[index],
&mut self.outputs[self.input2output[index]],
),
EventCause::Output(index) => (
&mut self.inputs[self.output2input[index]],
&mut self.outputs[index],
),
};

if output.is_finished() {
input.finish();

for input in &self.inputs {
if !input.is_finished() {
return Ok(Event::NeedConsume);
}
if !*input_finished {
*input_finished = true;
self.finished_port += 1;
}

for output in &self.outputs {
if !output.is_finished() {
return Ok(Event::NeedConsume);
}
if !*output_finished {
*output_finished = true;
self.finished_port += 1;
}

return Ok(Event::Finished);
return match self.finished_port == (self.inputs.len() + self.outputs.len()) {
true => Ok(Event::Finished),
false => Ok(Event::NeedConsume),
};
}

if !output.can_push() {
Expand All @@ -114,19 +119,20 @@ impl Processor for ShuffleProcessor {
if input.is_finished() {
output.finish();

for input in &self.inputs {
if !input.is_finished() {
return Ok(Event::NeedConsume);
}
if !*input_finished {
*input_finished = true;
self.finished_port += 1;
}

for output in &self.outputs {
if !output.is_finished() {
return Ok(Event::NeedConsume);
}
if !*output_finished {
*output_finished = true;
self.finished_port += 1;
}

return Ok(Event::Finished);
return match self.finished_port == (self.inputs.len() + self.outputs.len()) {
true => Ok(Event::Finished),
false => Ok(Event::NeedConsume),
};
}

input.set_need_data();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn test_shuffle_output_finish() -> Result<()> {

assert!(matches!(
processor.event_with_cause(EventCause::Output(0))?,
Event::Finished
Event::NeedConsume
));
assert!(input1.is_finished());
assert!(!input2.is_finished());
Expand Down

0 comments on commit 03c5b42

Please sign in to comment.