From 9c7d027e304f8450cc86b1fed46e9ddee6dd4cb7 Mon Sep 17 00:00:00 2001 From: Winter Zhang Date: Mon, 21 Aug 2023 22:20:31 +0800 Subject: [PATCH] Revert "refactor(cluster): active upstream first in cluster scatter (#12481)" (#12532) This reverts commit 84b1cdc08cd12b4745085bc14feb9001033598bf. --- .../exchange/exchange_transform_shuffle.rs | 153 +++++++++--------- 1 file changed, 79 insertions(+), 74 deletions(-) diff --git a/src/query/service/src/api/rpc/exchange/exchange_transform_shuffle.rs b/src/query/service/src/api/rpc/exchange/exchange_transform_shuffle.rs index 6fbfa727fae61..9862b5e9c2c12 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_transform_shuffle.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_transform_shuffle.rs @@ -80,40 +80,48 @@ impl BlockMetaInfo for ExchangeShuffleMeta { } } +struct OutputBuffer { + blocks: VecDeque, +} + +impl OutputBuffer { + pub fn create(capacity: usize) -> OutputBuffer { + OutputBuffer { + blocks: VecDeque::with_capacity(capacity), + } + } +} + struct OutputsBuffer { - inner: Vec>, + capacity: usize, + inner: Vec, } impl OutputsBuffer { pub fn create(capacity: usize, outputs: usize) -> OutputsBuffer { - OutputsBuffer { - inner: vec![capacity; outputs] - .into_iter() - .map(VecDeque::with_capacity) - .collect::>(), + let mut inner = Vec::with_capacity(outputs); + + for _index in 0..outputs { + inner.push(OutputBuffer::create(capacity)) } - } - pub fn is_empty(&self) -> bool { - self.inner.iter().all(VecDeque::is_empty) + OutputsBuffer { inner, capacity } } - pub fn is_full(&self) -> bool { - self.inner.iter().any(|x| x.len() == x.capacity()) + pub fn is_empty(&self) -> bool { + self.inner.iter().all(|x| x.blocks.is_empty()) } - pub fn clear(&mut self, index: usize) -> usize { - let len = self.inner[index].len(); - self.inner[index].clear(); - len + pub fn is_fill(&self, index: usize) -> bool { + self.inner[index].blocks.len() == self.capacity } pub fn pop(&mut self, index: usize) -> Option { - self.inner[index].pop_front() + self.inner[index].blocks.pop_front() } pub fn push_back(&mut self, index: usize, block: DataBlock) { - self.inner[index].push_back(block) + self.inner[index].blocks.push_back(block) } } @@ -122,7 +130,6 @@ pub struct ExchangeShuffleTransform { outputs: Vec>, buffer: OutputsBuffer, - cur_input_index: usize, all_inputs_finished: bool, all_outputs_finished: bool, } @@ -143,8 +150,7 @@ impl ExchangeShuffleTransform { ExchangeShuffleTransform { inputs: inputs_port, outputs: outputs_port, - buffer: OutputsBuffer::create(inputs, outputs), - cur_input_index: 0, + buffer: OutputsBuffer::create(10, outputs), all_inputs_finished: false, all_outputs_finished: false, } @@ -169,47 +175,62 @@ impl Processor for ExchangeShuffleTransform { } fn event(&mut self) -> Result { - if !self.all_inputs_finished { - self.try_pull_inputs()?; - } + loop { + if !self.try_push_outputs() { + for input in &self.inputs { + input.set_need_data(); + } + + return Ok(Event::NeedConsume); + } - if !self.all_outputs_finished { - let consumed_buffer = self.try_push_outputs(); + if self.all_outputs_finished { + for input in &self.inputs { + input.finish(); + } - // try pull inputs again if consumed buffer. - if consumed_buffer && !self.all_outputs_finished && !self.all_inputs_finished { - self.try_pull_inputs()?; + return Ok(Event::Finished); } - } - if self.all_outputs_finished { - for input in &self.inputs { - input.finish(); + if let Some(mut data_block) = self.try_pull_inputs()? { + if let Some(block_meta) = data_block.take_meta() { + if let Some(shuffle_meta) = ExchangeShuffleMeta::downcast_from(block_meta) { + for (index, block) in shuffle_meta.blocks.into_iter().enumerate() { + if !block.is_empty() || block.get_meta().is_some() { + self.buffer.push_back(index, block); + } + } + + // Try push again. + continue; + } + } + + return Err(ErrorCode::Internal( + "ExchangeShuffleTransform only recv ExchangeShuffleMeta.", + )); } - return Ok(Event::Finished); - } + if self.all_inputs_finished && self.buffer.is_empty() { + for output in &self.outputs { + output.finish(); + } - if self.all_inputs_finished && self.buffer.is_empty() { - for output in &self.outputs { - output.finish(); + return Ok(Event::Finished); } - return Ok(Event::Finished); + return Ok(Event::NeedData); } - - Ok(Event::NeedData) } } impl ExchangeShuffleTransform { fn try_push_outputs(&mut self) -> bool { self.all_outputs_finished = true; - let mut consumed_buffer = false; + let mut pushed_all_outputs = true; for (index, output) in self.outputs.iter().enumerate() { if output.is_finished() { - consumed_buffer = self.buffer.clear(index) != 0; continue; } @@ -217,55 +238,39 @@ impl ExchangeShuffleTransform { if output.can_push() { if let Some(data_block) = self.buffer.pop(index) { - consumed_buffer = true; output.push_data(Ok(data_block)); } + + continue; + } + + if !output.can_push() && self.buffer.is_fill(index) { + pushed_all_outputs = false; } } - consumed_buffer + pushed_all_outputs } - fn try_pull_inputs(&mut self) -> Result<()> { + fn try_pull_inputs(&mut self) -> Result> { + let mut data_block = None; self.all_inputs_finished = true; - for index in (self.cur_input_index..self.inputs.len()).chain(0..self.cur_input_index) { - if self.inputs[index].is_finished() { + + for input_port in &self.inputs { + if input_port.is_finished() { continue; } + input_port.set_need_data(); self.all_inputs_finished = false; - if !self.inputs[index].has_data() { - self.inputs[index].set_need_data(); + if !input_port.has_data() || data_block.is_some() { continue; } - if self.buffer.is_full() { - self.cur_input_index = index; - return Ok(()); - } - - let mut data_block = self.inputs[index].pull_data().unwrap()?; - self.inputs[index].set_need_data(); - - if let Some(block_meta) = data_block.take_meta() { - if let Some(shuffle_meta) = ExchangeShuffleMeta::downcast_from(block_meta) { - for (index, block) in shuffle_meta.blocks.into_iter().enumerate() { - if !block.is_empty() || block.get_meta().is_some() { - self.buffer.push_back(index, block); - } - } - - continue; - } - } - - self.cur_input_index = index; - return Err(ErrorCode::Internal( - "ExchangeShuffleTransform only recv ExchangeShuffleMeta.", - )); + data_block = input_port.pull_data().transpose()?; } - Ok(()) + Ok(data_block) } }