Skip to content

Commit

Permalink
Revert "refactor(cluster): active upstream first in cluster scatter (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 authored and andylokandy committed Nov 27, 2023
1 parent dfead6f commit 9c7d027
Showing 1 changed file with 79 additions and 74 deletions.
153 changes: 79 additions & 74 deletions src/query/service/src/api/rpc/exchange/exchange_transform_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,40 +80,48 @@ impl BlockMetaInfo for ExchangeShuffleMeta {
}
}

struct OutputBuffer {
blocks: VecDeque<DataBlock>,
}

impl OutputBuffer {
pub fn create(capacity: usize) -> OutputBuffer {
OutputBuffer {
blocks: VecDeque::with_capacity(capacity),
}
}
}

struct OutputsBuffer {
inner: Vec<VecDeque<DataBlock>>,
capacity: usize,
inner: Vec<OutputBuffer>,
}

impl OutputsBuffer {
pub fn create(capacity: usize, outputs: usize) -> OutputsBuffer {
OutputsBuffer {
inner: vec![capacity; outputs]
.into_iter()
.map(VecDeque::with_capacity)
.collect::<Vec<_>>(),
let mut inner = Vec::with_capacity(outputs);

for _index in 0..outputs {
inner.push(OutputBuffer::create(capacity))
}
}

pub fn is_empty(&self) -> bool {
self.inner.iter().all(VecDeque::is_empty)
OutputsBuffer { inner, capacity }
}

pub fn is_full(&self) -> bool {
self.inner.iter().any(|x| x.len() == x.capacity())
pub fn is_empty(&self) -> bool {
self.inner.iter().all(|x| x.blocks.is_empty())
}

pub fn clear(&mut self, index: usize) -> usize {
let len = self.inner[index].len();
self.inner[index].clear();
len
pub fn is_fill(&self, index: usize) -> bool {
self.inner[index].blocks.len() == self.capacity
}

pub fn pop(&mut self, index: usize) -> Option<DataBlock> {
self.inner[index].pop_front()
self.inner[index].blocks.pop_front()
}

pub fn push_back(&mut self, index: usize, block: DataBlock) {
self.inner[index].push_back(block)
self.inner[index].blocks.push_back(block)
}
}

Expand All @@ -122,7 +130,6 @@ pub struct ExchangeShuffleTransform {
outputs: Vec<Arc<OutputPort>>,

buffer: OutputsBuffer,
cur_input_index: usize,
all_inputs_finished: bool,
all_outputs_finished: bool,
}
Expand All @@ -143,8 +150,7 @@ impl ExchangeShuffleTransform {
ExchangeShuffleTransform {
inputs: inputs_port,
outputs: outputs_port,
buffer: OutputsBuffer::create(inputs, outputs),
cur_input_index: 0,
buffer: OutputsBuffer::create(10, outputs),
all_inputs_finished: false,
all_outputs_finished: false,
}
Expand All @@ -169,103 +175,102 @@ impl Processor for ExchangeShuffleTransform {
}

fn event(&mut self) -> Result<Event> {
if !self.all_inputs_finished {
self.try_pull_inputs()?;
}
loop {
if !self.try_push_outputs() {
for input in &self.inputs {
input.set_need_data();
}

return Ok(Event::NeedConsume);
}

if !self.all_outputs_finished {
let consumed_buffer = self.try_push_outputs();
if self.all_outputs_finished {
for input in &self.inputs {
input.finish();
}

// try pull inputs again if consumed buffer.
if consumed_buffer && !self.all_outputs_finished && !self.all_inputs_finished {
self.try_pull_inputs()?;
return Ok(Event::Finished);
}
}

if self.all_outputs_finished {
for input in &self.inputs {
input.finish();
if let Some(mut data_block) = self.try_pull_inputs()? {
if let Some(block_meta) = data_block.take_meta() {
if let Some(shuffle_meta) = ExchangeShuffleMeta::downcast_from(block_meta) {
for (index, block) in shuffle_meta.blocks.into_iter().enumerate() {
if !block.is_empty() || block.get_meta().is_some() {
self.buffer.push_back(index, block);
}
}

// Try push again.
continue;
}
}

return Err(ErrorCode::Internal(
"ExchangeShuffleTransform only recv ExchangeShuffleMeta.",
));
}

return Ok(Event::Finished);
}
if self.all_inputs_finished && self.buffer.is_empty() {
for output in &self.outputs {
output.finish();
}

if self.all_inputs_finished && self.buffer.is_empty() {
for output in &self.outputs {
output.finish();
return Ok(Event::Finished);
}

return Ok(Event::Finished);
return Ok(Event::NeedData);
}

Ok(Event::NeedData)
}
}

impl ExchangeShuffleTransform {
fn try_push_outputs(&mut self) -> bool {
self.all_outputs_finished = true;
let mut consumed_buffer = false;
let mut pushed_all_outputs = true;

for (index, output) in self.outputs.iter().enumerate() {
if output.is_finished() {
consumed_buffer = self.buffer.clear(index) != 0;
continue;
}

self.all_outputs_finished = false;

if output.can_push() {
if let Some(data_block) = self.buffer.pop(index) {
consumed_buffer = true;
output.push_data(Ok(data_block));
}

continue;
}

if !output.can_push() && self.buffer.is_fill(index) {
pushed_all_outputs = false;
}
}

consumed_buffer
pushed_all_outputs
}

fn try_pull_inputs(&mut self) -> Result<()> {
fn try_pull_inputs(&mut self) -> Result<Option<DataBlock>> {
let mut data_block = None;
self.all_inputs_finished = true;
for index in (self.cur_input_index..self.inputs.len()).chain(0..self.cur_input_index) {
if self.inputs[index].is_finished() {

for input_port in &self.inputs {
if input_port.is_finished() {
continue;
}

input_port.set_need_data();
self.all_inputs_finished = false;
if !self.inputs[index].has_data() {
self.inputs[index].set_need_data();
if !input_port.has_data() || data_block.is_some() {
continue;
}

if self.buffer.is_full() {
self.cur_input_index = index;
return Ok(());
}

let mut data_block = self.inputs[index].pull_data().unwrap()?;
self.inputs[index].set_need_data();

if let Some(block_meta) = data_block.take_meta() {
if let Some(shuffle_meta) = ExchangeShuffleMeta::downcast_from(block_meta) {
for (index, block) in shuffle_meta.blocks.into_iter().enumerate() {
if !block.is_empty() || block.get_meta().is_some() {
self.buffer.push_back(index, block);
}
}

continue;
}
}

self.cur_input_index = index;
return Err(ErrorCode::Internal(
"ExchangeShuffleTransform only recv ExchangeShuffleMeta.",
));
data_block = input_port.pull_data().transpose()?;
}

Ok(())
Ok(data_block)
}
}

Expand Down

0 comments on commit 9c7d027

Please sign in to comment.