diff --git a/src/query/service/src/api/rpc/exchange/exchange_transform_shuffle.rs b/src/query/service/src/api/rpc/exchange/exchange_transform_shuffle.rs index 9862b5e9c2c1..35fbaf336de7 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_transform_shuffle.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_transform_shuffle.rs @@ -80,48 +80,40 @@ impl BlockMetaInfo for ExchangeShuffleMeta { } } -struct OutputBuffer { - blocks: VecDeque, -} - -impl OutputBuffer { - pub fn create(capacity: usize) -> OutputBuffer { - OutputBuffer { - blocks: VecDeque::with_capacity(capacity), - } - } -} - struct OutputsBuffer { - capacity: usize, - inner: Vec, + inner: Vec>, } impl OutputsBuffer { pub fn create(capacity: usize, outputs: usize) -> OutputsBuffer { - let mut inner = Vec::with_capacity(outputs); - - for _index in 0..outputs { - inner.push(OutputBuffer::create(capacity)) + OutputsBuffer { + inner: vec![capacity; outputs] + .into_iter() + .map(VecDeque::with_capacity) + .collect::>(), } - - OutputsBuffer { inner, capacity } } pub fn is_empty(&self) -> bool { - self.inner.iter().all(|x| x.blocks.is_empty()) + self.inner.iter().all(VecDeque::is_empty) + } + + pub fn is_full(&self) -> bool { + self.inner.iter().any(|x| x.len() == x.capacity()) } - pub fn is_fill(&self, index: usize) -> bool { - self.inner[index].blocks.len() == self.capacity + pub fn clear(&mut self, index: usize) -> usize { + let len = self.inner[index].len(); + self.inner[index].clear(); + len } pub fn pop(&mut self, index: usize) -> Option { - self.inner[index].blocks.pop_front() + self.inner[index].pop_front() } pub fn push_back(&mut self, index: usize, block: DataBlock) { - self.inner[index].blocks.push_back(block) + self.inner[index].push_back(block) } } @@ -130,6 +122,7 @@ pub struct ExchangeShuffleTransform { outputs: Vec>, buffer: OutputsBuffer, + cur_input_index: usize, all_inputs_finished: bool, all_outputs_finished: bool, } @@ -150,7 +143,8 @@ impl ExchangeShuffleTransform { ExchangeShuffleTransform { inputs: inputs_port, outputs: outputs_port, - buffer: OutputsBuffer::create(10, outputs), + buffer: OutputsBuffer::create(inputs, outputs), + cur_input_index: 0, all_inputs_finished: false, all_outputs_finished: false, } @@ -175,62 +169,47 @@ impl Processor for ExchangeShuffleTransform { } fn event(&mut self) -> Result { - loop { - if !self.try_push_outputs() { - for input in &self.inputs { - input.set_need_data(); - } - - return Ok(Event::NeedConsume); - } + if !self.all_inputs_finished { + self.try_pull_inputs()?; + } - if self.all_outputs_finished { - for input in &self.inputs { - input.finish(); - } + if !self.all_outputs_finished { + let consumed_buffer = self.try_push_outputs(); - return Ok(Event::Finished); + // try pull inputs again if consumed buffer. + if consumed_buffer && !self.all_outputs_finished && !self.all_inputs_finished { + self.try_pull_inputs()?; } + } - if let Some(mut data_block) = self.try_pull_inputs()? { - if let Some(block_meta) = data_block.take_meta() { - if let Some(shuffle_meta) = ExchangeShuffleMeta::downcast_from(block_meta) { - for (index, block) in shuffle_meta.blocks.into_iter().enumerate() { - if !block.is_empty() || block.get_meta().is_some() { - self.buffer.push_back(index, block); - } - } - - // Try push again. - continue; - } - } - - return Err(ErrorCode::Internal( - "ExchangeShuffleTransform only recv ExchangeShuffleMeta.", - )); + if self.all_outputs_finished { + for input in &self.inputs { + input.finish(); } - if self.all_inputs_finished && self.buffer.is_empty() { - for output in &self.outputs { - output.finish(); - } + return Ok(Event::Finished); + } - return Ok(Event::Finished); + if self.all_inputs_finished && self.buffer.is_empty() { + for output in &self.outputs { + output.finish(); } - return Ok(Event::NeedData); + return Ok(Event::Finished); } + + Ok(Event::NeedData) } } impl ExchangeShuffleTransform { fn try_push_outputs(&mut self) -> bool { self.all_outputs_finished = true; - let mut pushed_all_outputs = true; + let mut consumed_buffer = false; for (index, output) in self.outputs.iter().enumerate() { if output.is_finished() { + consumed_buffer |= self.buffer.clear(index) != 0; continue; } @@ -238,39 +217,55 @@ impl ExchangeShuffleTransform { if output.can_push() { if let Some(data_block) = self.buffer.pop(index) { + consumed_buffer = true; output.push_data(Ok(data_block)); } - - continue; - } - - if !output.can_push() && self.buffer.is_fill(index) { - pushed_all_outputs = false; } } - pushed_all_outputs + consumed_buffer } - fn try_pull_inputs(&mut self) -> Result> { - let mut data_block = None; + fn try_pull_inputs(&mut self) -> Result<()> { self.all_inputs_finished = true; - - for input_port in &self.inputs { - if input_port.is_finished() { + for index in (self.cur_input_index..self.inputs.len()).chain(0..self.cur_input_index) { + if self.inputs[index].is_finished() { continue; } - input_port.set_need_data(); self.all_inputs_finished = false; - if !input_port.has_data() || data_block.is_some() { + if !self.inputs[index].has_data() { + self.inputs[index].set_need_data(); continue; } - data_block = input_port.pull_data().transpose()?; + if self.buffer.is_full() { + self.cur_input_index = index; + return Ok(()); + } + + let mut data_block = self.inputs[index].pull_data().unwrap()?; + self.inputs[index].set_need_data(); + + if let Some(block_meta) = data_block.take_meta() { + if let Some(shuffle_meta) = ExchangeShuffleMeta::downcast_from(block_meta) { + for (index, block) in shuffle_meta.blocks.into_iter().enumerate() { + if !block.is_empty() || block.get_meta().is_some() { + self.buffer.push_back(index, block); + } + } + + continue; + } + } + + self.cur_input_index = index; + return Err(ErrorCode::Internal( + "ExchangeShuffleTransform only recv ExchangeShuffleMeta.", + )); } - Ok(data_block) + Ok(()) } }