Skip to content

Commit

Permalink
Merge branch 'main' into fix_order_by
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 authored Sep 14, 2024
2 parents 44384aa + 26b1a17 commit 80cdcfc
Show file tree
Hide file tree
Showing 63 changed files with 1,334 additions and 1,206 deletions.
1 change: 1 addition & 0 deletions src/common/base/src/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

pub const HEADER_TENANT: &str = "X-DATABEND-TENANT";
pub const HEADER_QUERY_ID: &str = "X-DATABEND-QUERY-ID";
pub const HEADER_SESSION_ID: &str = "X-DATABEND-SESSION-ID";

pub const HEADER_USER: &str = "X-DATABEND-USER";

Expand Down
27 changes: 27 additions & 0 deletions src/meta/app/src/principal/user_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Display;

use databend_common_exception::ErrorCode;
use serde::Deserialize;
use serde::Serialize;

Expand All @@ -26,6 +29,30 @@ pub enum TokenType {
Session = 2,
}

impl Display for TokenType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", match self {
TokenType::Refresh => 'r',
TokenType::Session => 's',
})
}
}

impl TryFrom<u8> for TokenType {
type Error = ErrorCode;

fn try_from(value: u8) -> Result<Self, Self::Error> {
let value = value as char;
match value {
'r' => Ok(TokenType::Refresh),
's' => Ok(TokenType::Session),
_ => Err(ErrorCode::AuthenticateFailure(format!(
"invalid token type '{value}'"
))),
}
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct QueryTokenInfo {
pub token_type: TokenType,
Expand Down
16 changes: 2 additions & 14 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,8 @@ pub trait Table: Sync + Send {
}

/// Assembly the pipeline of appending data to storage
fn append_data(
&self,
ctx: Arc<dyn TableContext>,
pipeline: &mut Pipeline,
append_mode: AppendMode,
) -> Result<()> {
let (_, _, _) = (ctx, pipeline, append_mode);
fn append_data(&self, ctx: Arc<dyn TableContext>, pipeline: &mut Pipeline) -> Result<()> {
let (_, _) = (ctx, pipeline);

Err(ErrorCode::Unimplemented(format!(
"The 'append_data' operation is not available for the table '{}'. Current table engine: '{}'.",
Expand Down Expand Up @@ -539,13 +534,6 @@ pub enum CompactTarget {
Segments,
}

pub enum AppendMode {
// From INSERT and RECUSTER operation
Normal,
// From COPY, Streaming load operation
Copy,
}

pub trait ColumnStatisticsProvider: Send {
// returns the statistics of the given column, if any.
// column_id is just the index of the column in table's schema
Expand Down
13 changes: 8 additions & 5 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,19 @@ impl DataBlock {
res
}

pub fn split_by_rows_if_needed_no_tail(&self, min_rows_per_block: usize) -> Vec<Self> {
let max_rows_per_block = min_rows_per_block * 2;
pub fn split_by_rows_if_needed_no_tail(&self, rows_per_block: usize) -> Vec<Self> {
// Since rows_per_block represents the expected number of rows per block,
// and the minimum number of rows per block is 0.8 * rows_per_block,
// the maximum is taken as 1.8 * rows_per_block.
let max_rows_per_block = (rows_per_block * 9).div_ceil(5);
let mut res = vec![];
let mut offset = 0;
let mut remain_rows = self.num_rows;
while remain_rows >= max_rows_per_block {
let cut = self.slice(offset..(offset + min_rows_per_block));
let cut = self.slice(offset..(offset + rows_per_block));
res.push(cut);
offset += min_rows_per_block;
remain_rows -= min_rows_per_block;
offset += rows_per_block;
remain_rows -= rows_per_block;
}
res.push(self.slice(offset..(offset + remain_rows)));
res
Expand Down
49 changes: 31 additions & 18 deletions src/query/expression/src/utils/block_thresholds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,41 @@ impl BlockThresholds {
total_rows <= self.max_rows_per_block && total_bytes <= self.max_bytes_per_block
}

#[inline]
pub fn calc_rows_per_block(&self, total_bytes: usize, total_rows: usize) -> usize {
let mut block_num = std::cmp::max(total_bytes / self.max_bytes_per_block, 1);
let mut rows_per_block = total_rows.div_ceil(block_num);
if self.check_for_compact(total_rows, total_bytes) {
return total_rows;
}

let max_bytes_per_block = if rows_per_block < self.max_rows_per_block / 10 {
// If block rows < 100_000, max_bytes_per_block set to 200M
2 * self.max_bytes_per_block
} else if rows_per_block < self.max_rows_per_block / 2 {
// If block rows < 500_000, max_bytes_per_block set to 150M
3 * self.max_bytes_per_block / 2
} else if rows_per_block < self.min_rows_per_block {
// If block rows < 800_000, max_bytes_per_block set to 125M
5 * self.max_bytes_per_block / 4
} else {
self.max_bytes_per_block
let block_num_by_size = std::cmp::max(total_bytes / self.max_bytes_per_block, 1);
let block_num_by_rows = std::cmp::max(total_rows / self.min_rows_per_block, 1);
if block_num_by_rows >= block_num_by_size {
return self.max_rows_per_block;
}

let mut rows_per_block = total_rows.div_ceil(block_num_by_size);
let max_bytes_per_block = match rows_per_block {
v if v < self.max_rows_per_block / 10 => {
// If block rows < 100_000, max_bytes_per_block set to 200M
2 * self.max_bytes_per_block
}
v if v < self.max_rows_per_block / 2 => {
// If block rows < 500_000, max_bytes_per_block set to 150M
3 * self.max_bytes_per_block / 2
}
v if v < self.min_rows_per_block => {
// If block rows < 800_000, max_bytes_per_block set to 125M
5 * self.max_bytes_per_block / 4
}
_ => self.max_bytes_per_block,
};

if block_num > 1 && max_bytes_per_block > self.max_bytes_per_block {
block_num = std::cmp::max(total_bytes / max_bytes_per_block, 1);
rows_per_block = total_rows.div_ceil(block_num);
if max_bytes_per_block > self.max_bytes_per_block {
rows_per_block = std::cmp::max(
total_rows / (std::cmp::max(total_bytes / max_bytes_per_block, 1)),
1,
);
}

rows_per_block.min(self.max_rows_per_block)
rows_per_block
}
}
12 changes: 6 additions & 6 deletions src/query/pipeline/transforms/src/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ mod transform;
mod transform_accumulating;
mod transform_accumulating_async;
mod transform_async;
mod transform_block_compact;
mod transform_block_compact_for_copy;
mod transform_blocking;
mod transform_compact;
mod transform_compact_block;
mod transform_compact_builder;
mod transform_compact_no_split_builder;
mod transform_dummy;
mod transform_multi_sort_merge;
mod transform_pipeline_helper;
Expand All @@ -34,10 +34,10 @@ pub use transform::*;
pub use transform_accumulating::*;
pub use transform_accumulating_async::*;
pub use transform_async::*;
pub use transform_block_compact::*;
pub use transform_block_compact_for_copy::*;
pub use transform_blocking::*;
pub use transform_compact::*;
pub use transform_compact_block::*;
pub use transform_compact_builder::*;
pub use transform_compact_no_split_builder::*;
pub use transform_dummy::*;
pub use transform_multi_sort_merge::try_add_multi_sort_merge;
pub use transform_pipeline_helper::TransformPipelineHelper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ pub trait BlockMetaTransform<B: BlockMetaInfo>: Send + 'static {
fn on_finish(&mut self) -> Result<()> {
Ok(())
}

fn interrupt(&self) {}
}

pub struct BlockMetaTransformer<B: BlockMetaInfo, T: BlockMetaTransform<B>> {
Expand Down Expand Up @@ -315,4 +317,8 @@ impl<B: BlockMetaInfo, T: BlockMetaTransform<B>> Processor for BlockMetaTransfor

Ok(())
}

fn interrupt(&self) {
self.transform.interrupt();
}
}

This file was deleted.

Loading

0 comments on commit 80cdcfc

Please sign in to comment.