Skip to content

Commit

Permalink
Merge branch 'main' into spill-parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
Dousir9 authored Oct 16, 2024
2 parents ffba39b + 86cd608 commit cd8304a
Show file tree
Hide file tree
Showing 30 changed files with 638 additions and 90 deletions.
29 changes: 0 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 13 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,26 @@ useless_format = "allow"
mutable_key_type = "allow"
result_large_err = "allow"

## DONT'T DELETE THIS: If we want best performance, we should use this profile but it will take longer time to compile.
## Test SQL:
## select sum(number) from numbers_mt(10000000000); ~ 3x performance
## select max(number) from numbers_mt(10000000000); ~ 3x performance
# [profile.release]
# debug = 1
# lto = "thin"
# overflow-checks = false
# incremental = false
# codegen-units = 1

[profile.release]
debug = 1
lto = "thin"
overflow-checks = false
opt-level = "s" ## defaults to be 3
incremental = false
opt-level = "s"

# codegen-units = 1 # Reduce number of codegen units to increase optimizations.

# [profile.release.package]
# arrow2 = { codegen-units = 4 }
# common-functions = { codegen-units = 16 }
# databend-common-arrow = { codegen-units = 16 }
# databend-query = { codegen-units = 4 }
# databend-binaries = { codegen-units = 4 }

Expand Down
20 changes: 20 additions & 0 deletions src/query/ast/src/ast/statements/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::fmt::Formatter;
use derive_visitor::Drive;
use derive_visitor::DriveMut;

use super::ShowLimit;
use crate::ast::write_comma_separated_list;
use crate::ast::write_dot_separated_list;
use crate::ast::write_space_separated_string_map;
Expand Down Expand Up @@ -123,3 +124,22 @@ impl Display for ShowCreateDictionaryStmt {
)
}
}

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct ShowDictionariesStmt {
pub database: Option<Identifier>,
pub limit: Option<ShowLimit>,
}

impl Display for ShowDictionariesStmt {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "SHOW DICTIONARIES")?;
if let Some(database) = &self.database {
write!(f, " FROM {database}")?;
}
if let Some(limit) = &self.limit {
write!(f, " {limit}")?;
}
Ok(())
}
}
11 changes: 2 additions & 9 deletions src/query/ast/src/ast/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ pub enum Statement {
CreateDictionary(CreateDictionaryStmt),
DropDictionary(DropDictionaryStmt),
ShowCreateDictionary(ShowCreateDictionaryStmt),
ShowDictionaries {
show_options: Option<ShowOptions>,
},
ShowDictionaries(ShowDictionariesStmt),

// Columns
ShowColumns(ShowColumnsStmt),
Expand Down Expand Up @@ -613,12 +611,7 @@ impl Display for Statement {
Statement::CreateDictionary(stmt) => write!(f, "{stmt}")?,
Statement::DropDictionary(stmt) => write!(f, "{stmt}")?,
Statement::ShowCreateDictionary(stmt) => write!(f, "{stmt}")?,
Statement::ShowDictionaries { show_options } => {
write!(f, "SHOW DICTIONARIES")?;
if let Some(show_options) = show_options {
write!(f, " {show_options}")?;
}
}
Statement::ShowDictionaries(stmt) => write!(f, "{stmt}")?,
Statement::CreateView(stmt) => write!(f, "{stmt}")?,
Statement::AlterView(stmt) => write!(f, "{stmt}")?,
Statement::DropView(stmt) => write!(f, "{stmt}")?,
Expand Down
10 changes: 8 additions & 2 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -957,9 +957,15 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
);
let show_dictionaries = map(
rule! {
SHOW ~ DICTIONARIES ~ #show_options?
SHOW ~ DICTIONARIES ~ ((FROM|IN) ~ #ident)? ~ #show_limit?
},
|(_, _, db, limit)| {
let database = match db {
Some((_, d)) => Some(d),
_ => None,
};
Statement::ShowDictionaries(ShowDictionariesStmt { database, limit })
},
|(_, _, show_options)| Statement::ShowDictionaries { show_options },
);
let show_create_dictionary = map(
rule! {
Expand Down
2 changes: 2 additions & 0 deletions src/query/expression/src/aggregate/aggregate_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct AggregateHashTable {
// use for append rows directly during deserialize
pub direct_append: bool,
pub config: HashTableConfig,

current_radix_bits: u64,
entries: Vec<Entry>,
count: usize,
Expand Down Expand Up @@ -585,6 +586,7 @@ impl AggregateHashTable {
.iter()
.map(|arena| arena.allocated_bytes())
.sum::<usize>()
+ self.entries.len() * std::mem::size_of::<Entry>()
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,18 @@ impl DataBlock {
self.columns().iter().map(|entry| entry.memory_size()).sum()
}

pub fn consume_convert_to_full(self) -> Self {
if self
.columns()
.iter()
.all(|entry| entry.value.as_column().is_some())
{
return self;
}

self.convert_to_full()
}

pub fn convert_to_full(&self) -> Self {
let columns = self
.columns()
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/converts/arrow/to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl DataBlock {
let arrow_schema = table_schema_to_arrow_schema(table_schema);
let mut arrays = Vec::with_capacity(self.columns().len());
for (entry, arrow_field) in self
.convert_to_full()
.consume_convert_to_full()
.columns()
.iter()
.zip(arrow_schema.fields())
Expand Down
50 changes: 50 additions & 0 deletions src/query/expression/tests/it/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use databend_common_expression::FromData;
use databend_common_expression::SortColumnDescription;

use crate::common::new_block;
use crate::rand_block_for_all_types;

#[test]
fn test_block_sort() -> Result<()> {
Expand Down Expand Up @@ -201,3 +202,52 @@ fn test_block_sort() -> Result<()> {

Ok(())
}

#[test]
fn sort_concat() {
// Sort(Sort A || Sort B) = Sort (A || B)
use databend_common_expression::DataBlock;
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::Rng;

let mut rng = rand::thread_rng();
let num_blocks = 100;

for _i in 0..num_blocks {
let block_a = rand_block_for_all_types(rng.gen_range(0..100));
let block_b = rand_block_for_all_types(rng.gen_range(0..100));

let mut sort_index: Vec<usize> = (0..block_a.num_columns()).collect();
sort_index.shuffle(&mut rng);

let sort_desc = sort_index
.iter()
.map(|i| SortColumnDescription {
offset: *i,
asc: rng.gen_bool(0.5),
nulls_first: rng.gen_bool(0.5),
is_nullable: rng.gen_bool(0.5),
})
.collect_vec();

let concat_ab_0 = DataBlock::concat(&[block_a.clone(), block_b.clone()]).unwrap();

let sort_a = DataBlock::sort(&block_a, &sort_desc, None).unwrap();
let sort_b = DataBlock::sort(&block_b, &sort_desc, None).unwrap();
let concat_ab_1 = DataBlock::concat(&[sort_a, sort_b]).unwrap();

let block_1 = DataBlock::sort(&concat_ab_0, &sort_desc, None).unwrap();
let block_2 = DataBlock::sort(&concat_ab_1, &sort_desc, None).unwrap();

assert_eq!(block_1.num_columns(), block_2.num_columns());
assert_eq!(block_1.num_rows(), block_2.num_rows());

let columns_1 = block_1.columns();
let columns_2 = block_2.columns();
for idx in 0..columns_1.len() {
assert_eq!(columns_1[idx].data_type, columns_2[idx].data_type);
assert_eq!(columns_1[idx].value, columns_2[idx].value);
}
}
}
1 change: 0 additions & 1 deletion src/query/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ lexical-core = "0.8.5"
libm = "0.2.6"
match-template = { workspace = true }
md-5 = "0.10.5"
multiversion = "0.7.4"
naive-cityhash = "0.2.0"
num-traits = "0.2.15"
once_cell = { workspace = true }
Expand Down
31 changes: 31 additions & 0 deletions src/query/functions/src/aggregates/aggregate_min_max_any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use borsh::BorshDeserialize;
use borsh::BorshSerialize;
use databend_common_arrow::arrow::bitmap::Bitmap;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::decimal::*;
Expand Down Expand Up @@ -92,6 +93,36 @@ where
Ok(())
}

fn add_batch(
&mut self,
other: T::Column,
validity: Option<&Bitmap>,
function_data: Option<&dyn FunctionData>,
) -> Result<()> {
let column_len = T::column_len(&other);
if column_len == 0 {
return Ok(());
}

let column_iter = T::iter_column(&other);
if let Some(validity) = validity {
if validity.unset_bits() == column_len {
return Ok(());
}
for (data, valid) in column_iter.zip(validity.iter()) {
if valid {
let _ = self.add(data, function_data);
}
}
} else {
let v = column_iter.reduce(|l, r| if !C::change_if(&l, &r) { l } else { r });
if let Some(v) = v {
let _ = self.add(v, function_data);
}
}
Ok(())
}

fn merge(&mut self, rhs: &Self) -> Result<()> {
if let Some(v) = &rhs.value {
self.add(T::to_scalar_ref(v), None)?;
Expand Down
42 changes: 29 additions & 13 deletions src/query/functions/src/aggregates/aggregate_sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use borsh::BorshDeserialize;
use borsh::BorshSerialize;
use databend_common_arrow::arrow::bitmap::Bitmap;
use databend_common_arrow::arrow::buffer::Buffer;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::decimal::*;
Expand Down Expand Up @@ -80,21 +81,33 @@ where
}
}

#[multiversion::multiversion(targets("x86_64+avx", "x86_64+sse"))]
fn sum_batch<T, N>(other: T::Column) -> N::Scalar
// #[multiversion::multiversion(targets("x86_64+avx", "x86_64+sse"))]
#[inline]
pub fn sum_batch<T, TSum>(inner: Buffer<T>, validity: Option<&Bitmap>) -> TSum
where
T: ValueType + Sync + Send,
N: ValueType,
T::Scalar: Number + AsPrimitive<N::Scalar>,
N::Scalar: Number + AsPrimitive<f64> + std::ops::AddAssign,
for<'a> T::ScalarRef<'a>: Number + AsPrimitive<N::Scalar>,
T: Number + AsPrimitive<TSum>,
TSum: Number + std::ops::AddAssign,
{
// use temp variable to hint the compiler to unroll the loop
let mut sum = N::Scalar::default();
for value in T::iter_column(&other) {
sum += value.as_();
match validity {
Some(v) if v.unset_bits() > 0 => {
let mut sum = TSum::default();
inner.iter().zip(v.iter()).for_each(|(t, b)| {
if b {
sum += t.as_();
}
});

sum
}
_ => {
let mut sum = TSum::default();
inner.iter().for_each(|t| {
sum += t.as_();
});

sum
}
}
sum
}

impl<T, N> UnaryState<T, N> for NumberSumState<N>
Expand All @@ -117,9 +130,12 @@ where
fn add_batch(
&mut self,
other: T::Column,
validity: Option<&Bitmap>,
_function_data: Option<&dyn FunctionData>,
) -> Result<()> {
self.value += sum_batch::<T, N>(other);
let col = T::upcast_column(other);
let buffer = NumberType::<T::Scalar>::try_downcast_column(&col).unwrap();
self.value += sum_batch::<T::Scalar, N::Scalar>(buffer, validity);
Ok(())
}

Expand Down
Loading

0 comments on commit cd8304a

Please sign in to comment.