Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: split if block too big during append #16435

Merged
merged 15 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,8 @@ pub trait Table: Sync + Send {
}

/// Assembly the pipeline of appending data to storage
fn append_data(
&self,
ctx: Arc<dyn TableContext>,
pipeline: &mut Pipeline,
append_mode: AppendMode,
) -> Result<()> {
let (_, _, _) = (ctx, pipeline, append_mode);
fn append_data(&self, ctx: Arc<dyn TableContext>, pipeline: &mut Pipeline) -> Result<()> {
let (_, _) = (ctx, pipeline);

Err(ErrorCode::Unimplemented(format!(
"The 'append_data' operation is not available for the table '{}'. Current table engine: '{}'.",
Expand Down Expand Up @@ -539,13 +534,6 @@ pub enum CompactTarget {
Segments,
}

pub enum AppendMode {
// From INSERT and RECUSTER operation
Normal,
// From COPY, Streaming load operation
Copy,
}

pub trait ColumnStatisticsProvider: Send {
// returns the statistics of the given column, if any.
// column_id is just the index of the column in table's schema
Expand Down
13 changes: 8 additions & 5 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,19 @@ impl DataBlock {
res
}

pub fn split_by_rows_if_needed_no_tail(&self, min_rows_per_block: usize) -> Vec<Self> {
let max_rows_per_block = min_rows_per_block * 2;
pub fn split_by_rows_if_needed_no_tail(&self, rows_per_block: usize) -> Vec<Self> {
// Since rows_per_block represents the expected number of rows per block,
// and the minimum number of rows per block is 0.8 * rows_per_block,
// the maximum is taken as 1.8 * rows_per_block.
let max_rows_per_block = (rows_per_block * 9).div_ceil(5);
let mut res = vec![];
let mut offset = 0;
let mut remain_rows = self.num_rows;
while remain_rows >= max_rows_per_block {
let cut = self.slice(offset..(offset + min_rows_per_block));
let cut = self.slice(offset..(offset + rows_per_block));
res.push(cut);
offset += min_rows_per_block;
remain_rows -= min_rows_per_block;
offset += rows_per_block;
remain_rows -= rows_per_block;
}
res.push(self.slice(offset..(offset + remain_rows)));
res
Expand Down
49 changes: 31 additions & 18 deletions src/query/expression/src/utils/block_thresholds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,41 @@ impl BlockThresholds {
total_rows <= self.max_rows_per_block && total_bytes <= self.max_bytes_per_block
}

#[inline]
pub fn calc_rows_per_block(&self, total_bytes: usize, total_rows: usize) -> usize {
let mut block_num = std::cmp::max(total_bytes / self.max_bytes_per_block, 1);
let mut rows_per_block = total_rows.div_ceil(block_num);
if self.check_for_compact(total_rows, total_bytes) {
return total_rows;
}

let max_bytes_per_block = if rows_per_block < self.max_rows_per_block / 10 {
// If block rows < 100_000, max_bytes_per_block set to 200M
2 * self.max_bytes_per_block
} else if rows_per_block < self.max_rows_per_block / 2 {
// If block rows < 500_000, max_bytes_per_block set to 150M
3 * self.max_bytes_per_block / 2
} else if rows_per_block < self.min_rows_per_block {
// If block rows < 800_000, max_bytes_per_block set to 125M
5 * self.max_bytes_per_block / 4
} else {
self.max_bytes_per_block
let block_num_by_size = std::cmp::max(total_bytes / self.max_bytes_per_block, 1);
let block_num_by_rows = std::cmp::max(total_rows / self.min_rows_per_block, 1);
if block_num_by_rows >= block_num_by_size {
return self.max_rows_per_block;
}

let mut rows_per_block = total_rows.div_ceil(block_num_by_size);
let max_bytes_per_block = match rows_per_block {
v if v < self.max_rows_per_block / 10 => {
// If block rows < 100_000, max_bytes_per_block set to 200M
2 * self.max_bytes_per_block
}
v if v < self.max_rows_per_block / 2 => {
// If block rows < 500_000, max_bytes_per_block set to 150M
3 * self.max_bytes_per_block / 2
}
v if v < self.min_rows_per_block => {
// If block rows < 800_000, max_bytes_per_block set to 125M
5 * self.max_bytes_per_block / 4
}
_ => self.max_bytes_per_block,
};

if block_num > 1 && max_bytes_per_block > self.max_bytes_per_block {
block_num = std::cmp::max(total_bytes / max_bytes_per_block, 1);
rows_per_block = total_rows.div_ceil(block_num);
if max_bytes_per_block > self.max_bytes_per_block {
rows_per_block = std::cmp::max(
total_rows / (std::cmp::max(total_bytes / max_bytes_per_block, 1)),
1,
);
}

rows_per_block.min(self.max_rows_per_block)
rows_per_block
}
}
12 changes: 6 additions & 6 deletions src/query/pipeline/transforms/src/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ mod transform;
mod transform_accumulating;
mod transform_accumulating_async;
mod transform_async;
mod transform_block_compact;
mod transform_block_compact_for_copy;
mod transform_blocking;
mod transform_compact;
mod transform_compact_block;
mod transform_compact_builder;
mod transform_compact_no_split_builder;
mod transform_dummy;
mod transform_multi_sort_merge;
mod transform_pipeline_helper;
Expand All @@ -34,10 +34,10 @@ pub use transform::*;
pub use transform_accumulating::*;
pub use transform_accumulating_async::*;
pub use transform_async::*;
pub use transform_block_compact::*;
pub use transform_block_compact_for_copy::*;
pub use transform_blocking::*;
pub use transform_compact::*;
pub use transform_compact_block::*;
pub use transform_compact_builder::*;
pub use transform_compact_no_split_builder::*;
pub use transform_dummy::*;
pub use transform_multi_sort_merge::try_add_multi_sort_merge;
pub use transform_pipeline_helper::TransformPipelineHelper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ pub trait BlockMetaTransform<B: BlockMetaInfo>: Send + 'static {
fn on_finish(&mut self) -> Result<()> {
Ok(())
}

fn interrupt(&self) {}
}

pub struct BlockMetaTransformer<B: BlockMetaInfo, T: BlockMetaTransform<B>> {
Expand Down Expand Up @@ -315,4 +317,8 @@ impl<B: BlockMetaInfo, T: BlockMetaTransform<B>> Processor for BlockMetaTransfor

Ok(())
}

fn interrupt(&self) {
self.transform.interrupt();
}
}

This file was deleted.

Loading
Loading