Skip to content

Commit

Permalink
refactor(cluster): remove useless code
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Aug 17, 2023
1 parent ba206ab commit 6e50d2b
Showing 1 changed file with 16 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ pub struct ExchangeShuffleTransform {
outputs: Vec<Arc<OutputPort>>,

buffer: OutputsBuffer,
cur_input_index: usize,
all_inputs_finished: bool,
all_outputs_finished: bool,
}
Expand All @@ -141,6 +142,7 @@ impl ExchangeShuffleTransform {
inputs: inputs_port,
outputs: outputs_port,
buffer: OutputsBuffer::create(inputs, outputs),
cur_input_index: 0,
all_inputs_finished: false,
all_outputs_finished: false,
}
Expand All @@ -165,17 +167,12 @@ impl Processor for ExchangeShuffleTransform {
}

fn event(&mut self) -> Result<Event> {
self.try_pull_inputs()?;

if !self.all_outputs_finished && !self.try_push_outputs() {
// buffer is full and cannot push any output port
// if !self.all_outputs_finished && self.buffer.is_full() {
// for input in &self.inputs {
// input.set_not_need_data();
// }
if !self.all_inputs_finished {
self.try_pull_inputs()?;
}

// return Ok(Event::NeedConsume);
// }
if !self.all_outputs_finished {
self.try_push_outputs();
}

if self.all_outputs_finished {
Expand All @@ -199,9 +196,8 @@ impl Processor for ExchangeShuffleTransform {
}

impl ExchangeShuffleTransform {
fn try_push_outputs(&mut self) -> bool {
fn try_push_outputs(&mut self) {
self.all_outputs_finished = true;
let mut pushed_any_output = false;

for (index, output) in self.outputs.iter().enumerate() {
if output.is_finished() {
Expand All @@ -213,39 +209,32 @@ impl ExchangeShuffleTransform {

if output.can_push() {
if let Some(data_block) = self.buffer.pop(index) {
pushed_any_output = true;
output.push_data(Ok(data_block));
}
}
}

pushed_any_output
}

fn try_pull_inputs(&mut self) -> Result<()> {
if self.all_inputs_finished {
return Ok(());
}

self.all_inputs_finished = true;
for input_port in &self.inputs {
if input_port.is_finished() {
for index in (self.cur_input_index..self.inputs.len()).chain(0..self.cur_input_index) {
if self.inputs[index].is_finished() {
continue;
}

self.all_inputs_finished = false;

if !input_port.has_data() {
input_port.set_need_data();
if !self.inputs[index].has_data() {
self.inputs[index].set_need_data();
continue;
}

if self.buffer.is_full() {
self.cur_input_index = index;
return Ok(());
}

let mut data_block = input_port.pull_data().unwrap()?;
input_port.set_need_data();
let mut data_block = self.inputs[index].pull_data().unwrap()?;
self.inputs[index].set_need_data();

if let Some(block_meta) = data_block.take_meta() {
if let Some(shuffle_meta) = ExchangeShuffleMeta::downcast_from(block_meta) {
Expand All @@ -259,6 +248,7 @@ impl ExchangeShuffleTransform {
}
}

self.cur_input_index = index;
return Err(ErrorCode::Internal(
"ExchangeShuffleTransform only recv ExchangeShuffleMeta.",
));
Expand Down

0 comments on commit 6e50d2b

Please sign in to comment.