Skip to content

Commit

Permalink
refactor(cluster): active upstream first for cluster exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Aug 21, 2023
1 parent 67bfa76 commit 2213a36
Showing 1 changed file with 74 additions and 79 deletions.
153 changes: 74 additions & 79 deletions src/query/service/src/api/rpc/exchange/exchange_transform_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,48 +80,40 @@ impl BlockMetaInfo for ExchangeShuffleMeta {
}
}

struct OutputBuffer {
blocks: VecDeque<DataBlock>,
}

impl OutputBuffer {
pub fn create(capacity: usize) -> OutputBuffer {
OutputBuffer {
blocks: VecDeque::with_capacity(capacity),
}
}
}

struct OutputsBuffer {
capacity: usize,
inner: Vec<OutputBuffer>,
inner: Vec<VecDeque<DataBlock>>,
}

impl OutputsBuffer {
pub fn create(capacity: usize, outputs: usize) -> OutputsBuffer {
let mut inner = Vec::with_capacity(outputs);

for _index in 0..outputs {
inner.push(OutputBuffer::create(capacity))
OutputsBuffer {
inner: vec![capacity; outputs]
.into_iter()
.map(VecDeque::with_capacity)
.collect::<Vec<_>>(),
}

OutputsBuffer { inner, capacity }
}

pub fn is_empty(&self) -> bool {
self.inner.iter().all(|x| x.blocks.is_empty())
self.inner.iter().all(VecDeque::is_empty)
}

pub fn is_full(&self) -> bool {
self.inner.iter().any(|x| x.len() == x.capacity())
}

pub fn is_fill(&self, index: usize) -> bool {
self.inner[index].blocks.len() == self.capacity
pub fn clear(&mut self, index: usize) -> usize {
let len = self.inner[index].len();
self.inner[index].clear();
len
}

pub fn pop(&mut self, index: usize) -> Option<DataBlock> {
self.inner[index].blocks.pop_front()
self.inner[index].pop_front()
}

pub fn push_back(&mut self, index: usize, block: DataBlock) {
self.inner[index].blocks.push_back(block)
self.inner[index].push_back(block)
}
}

Expand All @@ -130,6 +122,7 @@ pub struct ExchangeShuffleTransform {
outputs: Vec<Arc<OutputPort>>,

buffer: OutputsBuffer,
cur_input_index: usize,
all_inputs_finished: bool,
all_outputs_finished: bool,
}
Expand All @@ -150,7 +143,8 @@ impl ExchangeShuffleTransform {
ExchangeShuffleTransform {
inputs: inputs_port,
outputs: outputs_port,
buffer: OutputsBuffer::create(10, outputs),
buffer: OutputsBuffer::create(inputs, outputs),
cur_input_index: 0,
all_inputs_finished: false,
all_outputs_finished: false,
}
Expand All @@ -175,102 +169,103 @@ impl Processor for ExchangeShuffleTransform {
}

fn event(&mut self) -> Result<Event> {
loop {
if !self.try_push_outputs() {
for input in &self.inputs {
input.set_need_data();
}

return Ok(Event::NeedConsume);
}
if !self.all_inputs_finished {
self.try_pull_inputs()?;
}

if self.all_outputs_finished {
for input in &self.inputs {
input.finish();
}
if !self.all_outputs_finished {
let consumed_buffer = self.try_push_outputs();

return Ok(Event::Finished);
// try pull inputs again if consumed buffer.
if consumed_buffer && !self.all_outputs_finished && !self.all_inputs_finished {
self.try_pull_inputs()?;
}
}

if let Some(mut data_block) = self.try_pull_inputs()? {
if let Some(block_meta) = data_block.take_meta() {
if let Some(shuffle_meta) = ExchangeShuffleMeta::downcast_from(block_meta) {
for (index, block) in shuffle_meta.blocks.into_iter().enumerate() {
if !block.is_empty() || block.get_meta().is_some() {
self.buffer.push_back(index, block);
}
}

// Try push again.
continue;
}
}

return Err(ErrorCode::Internal(
"ExchangeShuffleTransform only recv ExchangeShuffleMeta.",
));
if self.all_outputs_finished {
for input in &self.inputs {
input.finish();
}

if self.all_inputs_finished && self.buffer.is_empty() {
for output in &self.outputs {
output.finish();
}
return Ok(Event::Finished);
}

return Ok(Event::Finished);
if self.all_inputs_finished && self.buffer.is_empty() {
for output in &self.outputs {
output.finish();
}

return Ok(Event::NeedData);
return Ok(Event::Finished);
}

Ok(Event::NeedData)
}
}

impl ExchangeShuffleTransform {
fn try_push_outputs(&mut self) -> bool {
self.all_outputs_finished = true;
let mut pushed_all_outputs = true;
let mut consumed_buffer = false;

for (index, output) in self.outputs.iter().enumerate() {
if output.is_finished() {
consumed_buffer |= self.buffer.clear(index) != 0;
continue;
}

self.all_outputs_finished = false;

if output.can_push() {
if let Some(data_block) = self.buffer.pop(index) {
consumed_buffer = true;
output.push_data(Ok(data_block));
}

continue;
}

if !output.can_push() && self.buffer.is_fill(index) {
pushed_all_outputs = false;
}
}

pushed_all_outputs
consumed_buffer
}

fn try_pull_inputs(&mut self) -> Result<Option<DataBlock>> {
let mut data_block = None;
fn try_pull_inputs(&mut self) -> Result<()> {
self.all_inputs_finished = true;

for input_port in &self.inputs {
if input_port.is_finished() {
for index in (self.cur_input_index..self.inputs.len()).chain(0..self.cur_input_index) {
if self.inputs[index].is_finished() {
continue;
}

input_port.set_need_data();
self.all_inputs_finished = false;
if !input_port.has_data() || data_block.is_some() {
if !self.inputs[index].has_data() {
self.inputs[index].set_need_data();
continue;
}

data_block = input_port.pull_data().transpose()?;
if self.buffer.is_full() {
self.cur_input_index = index;
return Ok(());
}

let mut data_block = self.inputs[index].pull_data().unwrap()?;
self.inputs[index].set_need_data();

if let Some(block_meta) = data_block.take_meta() {
if let Some(shuffle_meta) = ExchangeShuffleMeta::downcast_from(block_meta) {
for (index, block) in shuffle_meta.blocks.into_iter().enumerate() {
if !block.is_empty() || block.get_meta().is_some() {
self.buffer.push_back(index, block);
}
}

continue;
}
}

self.cur_input_index = index;
return Err(ErrorCode::Internal(
"ExchangeShuffleTransform only recv ExchangeShuffleMeta.",
));
}

Ok(data_block)
Ok(())
}
}

Expand Down

0 comments on commit 2213a36

Please sign in to comment.