diff --git a/Cargo.lock b/Cargo.lock index aca2915dc516..8329c8299f94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -456,6 +456,7 @@ dependencies = [ "arrow-schema", "flatbuffers", "lz4_flex", + "zstd 0.13.2", ] [[package]] @@ -3417,6 +3418,7 @@ name = "databend-common-exception" version = "0.1.0" dependencies = [ "anyhow", + "arrow-flight", "arrow-schema", "backtrace", "bincode 2.0.0-rc.3", @@ -5124,6 +5126,7 @@ name = "databend-query" version = "0.1.0" dependencies = [ "arrow-array", + "arrow-buffer", "arrow-cast", "arrow-flight", "arrow-ipc", @@ -5241,7 +5244,6 @@ dependencies = [ "naive-cityhash", "num", "num_cpus", - "once_cell", "opendal 0.49.2", "opensrv-mysql", "opentelemetry", diff --git a/Cargo.toml b/Cargo.toml index 36927b95c90c..4c2bc082553b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -352,8 +352,8 @@ mutable_key_type = "allow" result_large_err = "allow" ## DONT'T DELETE THIS: If we want best performance, we should use this profile but it will take longer time to compile. -## Test SQL: -## select sum(number) from numbers_mt(10000000000); ~ 3x performance +## Test SQL: +## select sum(number) from numbers_mt(10000000000); ~ 3x performance ## select max(number) from numbers_mt(10000000000); ~ 3x performance # [profile.release] # debug = 1 diff --git a/src/common/arrow/src/arrow/datatypes/field.rs b/src/common/arrow/src/arrow/datatypes/field.rs index 4b9bef772430..ef852887ce7a 100644 --- a/src/common/arrow/src/arrow/datatypes/field.rs +++ b/src/common/arrow/src/arrow/datatypes/field.rs @@ -71,11 +71,29 @@ impl Field { } } +// For databend's extension key +pub const EXTENSION_KEY: &str = "Extension"; + #[cfg(feature = "arrow")] impl From for arrow_schema::Field { fn from(value: Field) -> Self { - Self::new(value.name, value.data_type.into(), value.is_nullable) - .with_metadata(value.metadata.into_iter().collect()) + (&value).into() + } +} + +#[cfg(feature = "arrow")] +impl From<&Field> for arrow_schema::Field { + fn from(value: &Field) -> Self { + let mut metadata = value.metadata.clone(); + let ty = if let DataType::Extension(extension_type, ty, _) = &value.data_type { + metadata.insert(EXTENSION_KEY.to_string(), extension_type.clone()); + ty.as_ref().clone() + } else { + value.data_type.clone() + }; + + Self::new(value.name.clone(), ty.into(), value.is_nullable) + .with_metadata(metadata.into_iter().collect()) } } @@ -89,12 +107,15 @@ impl From for Field { #[cfg(feature = "arrow")] impl From<&arrow_schema::Field> for Field { fn from(value: &arrow_schema::Field) -> Self { - let data_type = value.data_type().clone().into(); - let metadata = value + let mut data_type = value.data_type().clone().into(); + let mut metadata: Metadata = value .metadata() .iter() .map(|(k, v)| (k.clone(), v.clone())) .collect(); + if let Some(v) = metadata.remove(EXTENSION_KEY) { + data_type = DataType::Extension(v, Box::new(data_type), None); + } Self::new(value.name(), data_type, value.is_nullable()).with_metadata(metadata) } } diff --git a/src/common/exception/Cargo.toml b/src/common/exception/Cargo.toml index 84080a4ebe46..ac0872ba5d65 100644 --- a/src/common/exception/Cargo.toml +++ b/src/common/exception/Cargo.toml @@ -15,6 +15,7 @@ databend-common-arrow = { workspace = true } databend-common-ast = { workspace = true } anyhow = { workspace = true } +arrow-flight = { workspace = true } arrow-schema = { workspace = true } backtrace = { workspace = true } bincode = { workspace = true } diff --git a/src/common/exception/src/exception_flight.rs b/src/common/exception/src/exception_flight.rs index 284ed1932e66..39ad2a3e8c89 100644 --- a/src/common/exception/src/exception_flight.rs +++ b/src/common/exception/src/exception_flight.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use databend_common_arrow::arrow_format::flight::data::FlightData; +use arrow_flight::FlightData; use crate::ErrorCode; use crate::Result; @@ -24,9 +24,9 @@ impl From for FlightData { serde_json::to_vec::(&SerializedError::from(&error)).unwrap(); FlightData { - data_body: serialized_error, - app_metadata: vec![0x02], - data_header: error.code().to_be_bytes().to_vec(), + data_body: serialized_error.into(), + app_metadata: vec![0x02].into(), + data_header: error.code().to_be_bytes().to_vec().into(), flight_descriptor: None, } } diff --git a/src/common/exception/tests/it/exception_flight.rs b/src/common/exception/tests/it/exception_flight.rs index 1a8bf8346e2a..cf360a397b83 100644 --- a/src/common/exception/tests/it/exception_flight.rs +++ b/src/common/exception/tests/it/exception_flight.rs @@ -14,8 +14,8 @@ use std::sync::Arc; +use arrow_flight::FlightData; use backtrace::Backtrace; -use databend_common_arrow::arrow_format::flight::data::FlightData; use databend_common_exception::exception::ErrorCodeBacktrace; use databend_common_exception::ErrorCode; use databend_common_exception::Result; diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index 866f976644f3..ce909f97b5f3 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -28,6 +28,7 @@ use crate::types::AnyType; use crate::types::DataType; use crate::Column; use crate::ColumnBuilder; +use crate::DataField; use crate::DataSchemaRef; use crate::Domain; use crate::Scalar; @@ -589,6 +590,16 @@ impl DataBlock { debug_assert!(self.columns.last().unwrap().value.as_column().is_some()); self.columns.last().unwrap().value.as_column().unwrap() } + + pub fn infer_schema(&self) -> DataSchema { + let fields = self + .columns() + .iter() + .enumerate() + .map(|(index, e)| DataField::new(&format!("col_{index}"), e.data_type.clone())) + .collect(); + DataSchema::new(fields) + } } impl TryFrom for ArrowChunk { diff --git a/src/query/expression/src/converts/arrow/from.rs b/src/query/expression/src/converts/arrow/from.rs index 53a50ef28343..e644777caae9 100644 --- a/src/query/expression/src/converts/arrow/from.rs +++ b/src/query/expression/src/converts/arrow/from.rs @@ -15,15 +15,12 @@ use std::sync::Arc; use arrow_array::RecordBatch; -use arrow_schema::DataType as ArrowDataType; -use arrow_schema::Field as ArrowField; +use arrow_schema::Field; use arrow_schema::Schema as ArrowSchema; -use databend_common_arrow::arrow::datatypes::DataType as Arrow2DataType; use databend_common_arrow::arrow::datatypes::Field as Arrow2Field; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use super::EXTENSION_KEY; use crate::types::DataType; use crate::Column; use crate::DataBlock; @@ -32,17 +29,30 @@ use crate::DataSchema; use crate::TableField; use crate::TableSchema; -impl TryFrom<&ArrowSchema> for DataSchema { +impl TryFrom<&Field> for DataField { + type Error = ErrorCode; + fn try_from(arrow_f: &Field) -> Result { + Ok(DataField::from(&TableField::try_from(arrow_f)?)) + } +} + +impl TryFrom<&Field> for TableField { type Error = ErrorCode; + fn try_from(arrow_f: &Field) -> Result { + TableField::try_from(&Arrow2Field::from(arrow_f)) + } +} +impl TryFrom<&ArrowSchema> for DataSchema { + type Error = ErrorCode; fn try_from(schema: &ArrowSchema) -> Result { let fields = schema .fields .iter() .map(|arrow_f| { - Ok(DataField::from(&TableField::try_from( - &arrow2_field_from_arrow_field(arrow_f), - )?)) + Ok(DataField::from(&TableField::try_from(&Arrow2Field::from( + arrow_f, + ))?)) }) .collect::>>()?; Ok(DataSchema::new_from( @@ -54,12 +64,11 @@ impl TryFrom<&ArrowSchema> for DataSchema { impl TryFrom<&ArrowSchema> for TableSchema { type Error = ErrorCode; - fn try_from(schema: &ArrowSchema) -> Result { let fields = schema .fields .iter() - .map(|arrow_f| TableField::try_from(&arrow2_field_from_arrow_field(arrow_f))) + .map(|arrow_f| TableField::try_from(&Arrow2Field::from(arrow_f))) .collect::>>()?; Ok(TableSchema::new_from( fields, @@ -108,28 +117,3 @@ impl Column { Column::from_arrow(arrow2_array.as_ref(), data_type) } } - -fn arrow2_field_from_arrow_field(field: &ArrowField) -> Arrow2Field { - let mut data_type = match field.data_type() { - ArrowDataType::List(f) => Arrow2DataType::List(Box::new(arrow2_field_from_arrow_field(f))), - ArrowDataType::LargeList(f) => { - Arrow2DataType::LargeList(Box::new(arrow2_field_from_arrow_field(f))) - } - ArrowDataType::FixedSizeList(f, size) => { - Arrow2DataType::FixedSizeList(Box::new(arrow2_field_from_arrow_field(f)), *size as _) - } - ArrowDataType::Map(f, ordered) => { - Arrow2DataType::Map(Box::new(arrow2_field_from_arrow_field(f)), *ordered) - } - ArrowDataType::Struct(f) => { - Arrow2DataType::Struct(f.iter().map(|f| arrow2_field_from_arrow_field(f)).collect()) - } - other => other.clone().into(), - }; - - if let Some(extension_type) = field.metadata().get(EXTENSION_KEY) { - data_type = Arrow2DataType::Extension(extension_type.clone(), Box::new(data_type), None); - } - - Arrow2Field::new(field.name(), data_type, field.is_nullable()) -} diff --git a/src/query/expression/src/converts/arrow/to.rs b/src/query/expression/src/converts/arrow/to.rs index 391d2893f596..3860fc64d503 100644 --- a/src/query/expression/src/converts/arrow/to.rs +++ b/src/query/expression/src/converts/arrow/to.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use arrow_array::cast::AsArray; @@ -20,16 +19,15 @@ use arrow_array::Array; use arrow_array::LargeListArray; use arrow_array::MapArray; use arrow_array::RecordBatch; +use arrow_array::RecordBatchOptions; use arrow_array::StructArray; use arrow_schema::DataType as ArrowDataType; use arrow_schema::Field as ArrowField; use arrow_schema::Fields; use arrow_schema::Schema as ArrowSchema; -use databend_common_arrow::arrow::datatypes::DataType as Arrow2DataType; use databend_common_arrow::arrow::datatypes::Field as Arrow2Field; use databend_common_exception::Result; -use super::EXTENSION_KEY; use crate::infer_table_schema; use crate::Column; use crate::DataBlock; @@ -43,7 +41,7 @@ impl From<&DataSchema> for ArrowSchema { let fields = schema .fields .iter() - .map(|f| arrow_field_from_arrow2_field(Arrow2Field::from(f))) + .map(|f| ArrowField::from(Arrow2Field::from(f))) .collect::>(); ArrowSchema { fields: Fields::from(fields), @@ -57,7 +55,7 @@ impl From<&TableSchema> for ArrowSchema { let fields = schema .fields .iter() - .map(|f| arrow_field_from_arrow2_field(Arrow2Field::from(f))) + .map(|f| ArrowField::from(Arrow2Field::from(f))) .collect::>(); ArrowSchema { fields: Fields::from(fields), @@ -70,7 +68,7 @@ pub fn table_schema_to_arrow_schema(schema: &TableSchema) -> ArrowSchema { let fields = schema .fields .iter() - .map(|f| arrow_field_from_arrow2_field(f.into())) + .map(|f| ArrowField::from(Arrow2Field::from(f))) .collect::>(); ArrowSchema { fields: Fields::from(fields), @@ -80,13 +78,13 @@ pub fn table_schema_to_arrow_schema(schema: &TableSchema) -> ArrowSchema { impl From<&TableField> for ArrowField { fn from(field: &TableField) -> Self { - arrow_field_from_arrow2_field(Arrow2Field::from(field)) + ArrowField::from(Arrow2Field::from(field)) } } impl From<&DataField> for ArrowField { fn from(field: &DataField) -> Self { - arrow_field_from_arrow2_field(Arrow2Field::from(field)) + ArrowField::from(Arrow2Field::from(field)) } } @@ -98,6 +96,14 @@ impl DataBlock { } pub fn to_record_batch(self, table_schema: &TableSchema) -> Result { + if table_schema.num_fields() == 0 { + return Ok(RecordBatch::try_new_with_options( + Arc::new(ArrowSchema::empty()), + vec![], + &RecordBatchOptions::default().with_row_count(Some(self.num_rows())), + )?); + } + let arrow_schema = table_schema_to_arrow_schema(table_schema); let mut arrays = Vec::with_capacity(self.columns().len()); for (entry, arrow_field) in self @@ -166,34 +172,3 @@ impl Column { arrow_array } } - -fn arrow_field_from_arrow2_field(field: Arrow2Field) -> ArrowField { - let mut metadata = HashMap::new(); - - let arrow2_data_type = if let Arrow2DataType::Extension(extension_type, ty, _) = field.data_type - { - metadata.insert(EXTENSION_KEY.to_string(), extension_type.clone()); - *ty - } else { - field.data_type - }; - - let data_type = match arrow2_data_type { - Arrow2DataType::List(f) => ArrowDataType::List(Arc::new(arrow_field_from_arrow2_field(*f))), - Arrow2DataType::LargeList(f) => { - ArrowDataType::LargeList(Arc::new(arrow_field_from_arrow2_field(*f))) - } - Arrow2DataType::FixedSizeList(f, size) => { - ArrowDataType::FixedSizeList(Arc::new(arrow_field_from_arrow2_field(*f)), size as _) - } - Arrow2DataType::Map(f, ordered) => { - ArrowDataType::Map(Arc::new(arrow_field_from_arrow2_field(*f)), ordered) - } - Arrow2DataType::Struct(f) => { - ArrowDataType::Struct(f.into_iter().map(arrow_field_from_arrow2_field).collect()) - } - other => other.into(), - }; - - ArrowField::new(field.name, data_type, field.is_nullable).with_metadata(metadata) -} diff --git a/src/query/expression/src/converts/arrow2/from.rs b/src/query/expression/src/converts/arrow2/from.rs index 0a264afce0d0..d354cb9c13af 100644 --- a/src/query/expression/src/converts/arrow2/from.rs +++ b/src/query/expression/src/converts/arrow2/from.rs @@ -176,20 +176,12 @@ impl Column { (DataType::Null, ArrowDataType::Null) => Column::Null { len: arrow_col.len(), }, - (DataType::EmptyArray, ArrowDataType::Extension(name, _, _)) - if name == ARROW_EXT_TYPE_EMPTY_ARRAY => - { - Column::EmptyArray { - len: arrow_col.len(), - } - } - (DataType::EmptyMap, ArrowDataType::Extension(name, _, _)) - if name == ARROW_EXT_TYPE_EMPTY_MAP => - { - Column::EmptyMap { - len: arrow_col.len(), - } - } + (DataType::EmptyArray, _) => Column::EmptyArray { + len: arrow_col.len(), + }, + (DataType::EmptyMap, _) => Column::EmptyMap { + len: arrow_col.len(), + }, (DataType::Number(NumberDataType::UInt8), ArrowDataType::UInt8) => { Column::Number(NumberColumn::UInt8( arrow_col diff --git a/src/query/expression/src/converts/arrow2/to.rs b/src/query/expression/src/converts/arrow2/to.rs index 5d01b76fe873..f248a1a38c0d 100644 --- a/src/query/expression/src/converts/arrow2/to.rs +++ b/src/query/expression/src/converts/arrow2/to.rs @@ -83,12 +83,12 @@ fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType { TableDataType::Null => ArrowDataType::Null, TableDataType::EmptyArray => ArrowDataType::Extension( ARROW_EXT_TYPE_EMPTY_ARRAY.to_string(), - Box::new(ArrowDataType::Null), + Box::new(ArrowDataType::Boolean), None, ), TableDataType::EmptyMap => ArrowDataType::Extension( ARROW_EXT_TYPE_EMPTY_MAP.to_string(), - Box::new(ArrowDataType::Null), + Box::new(ArrowDataType::Boolean), None, ), TableDataType::Boolean => ArrowDataType::Boolean, @@ -149,7 +149,8 @@ fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType { ArrowField::new( name.as_str(), table_type_to_arrow_type(ty), - ty.is_nullable(), + // null in tuple must be nullable + ty.is_nullable_or_null(), ) }) .collect(); @@ -185,10 +186,20 @@ impl Column { databend_common_arrow::arrow::array::NullArray::new_null(arrow_type, *len), ), Column::EmptyArray { len } => Box::new( - databend_common_arrow::arrow::array::NullArray::new_null(arrow_type, *len), + databend_common_arrow::arrow::array::BooleanArray::try_new( + arrow_type, + Bitmap::new_constant(true, *len), + None, + ) + .unwrap(), ), Column::EmptyMap { len } => Box::new( - databend_common_arrow::arrow::array::NullArray::new_null(arrow_type, *len), + databend_common_arrow::arrow::array::BooleanArray::try_new( + arrow_type, + Bitmap::new_constant(true, *len), + None, + ) + .unwrap(), ), Column::Number(NumberColumn::UInt8(col)) => Box::new( databend_common_arrow::arrow::array::PrimitiveArray::::try_new( diff --git a/src/query/expression/src/utils/arrow.rs b/src/query/expression/src/utils/arrow.rs index 3c7b7fbd75c4..c73f45933533 100644 --- a/src/query/expression/src/utils/arrow.rs +++ b/src/query/expression/src/utils/arrow.rs @@ -19,7 +19,7 @@ use std::io::Write; use std::sync::Arc; use arrow_array::RecordBatch; -use arrow_ipc::reader::FileReader; +use arrow_ipc::reader::FileReaderBuilder; use arrow_ipc::writer::FileWriter; use arrow_ipc::writer::IpcWriteOptions; use arrow_ipc::CompressionType; @@ -28,7 +28,6 @@ use databend_common_arrow::arrow::array::Array; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::bitmap::MutableBitmap; use databend_common_arrow::arrow::buffer::Buffer; -use databend_common_arrow::arrow::io::ipc::read::read_file_metadata; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -100,17 +99,16 @@ pub fn deserialize_column(bytes: &[u8]) -> Result { } pub fn read_column(r: &mut R) -> Result { - let metadata = read_file_metadata(r)?; - let f = metadata.schema.fields[0].clone(); - let data_field = DataField::try_from(&f)?; + let mut reader = FileReaderBuilder::new().build(r)?; + let schema = reader.schema(); + let f = DataField::try_from(schema.field(0))?; - let mut reader = FileReader::try_new(r, None)?; let col = reader .next() .ok_or_else(|| ErrorCode::Internal("expected one arrow array"))?? .remove_column(0); - Column::from_arrow_rs(col, data_field.data_type()) + Column::from_arrow_rs(col, f.data_type()) } /// Convert a column to a arrow array. diff --git a/src/query/expression/tests/it/types.rs b/src/query/expression/tests/it/types.rs index d098600f2148..bfeb2b9e792d 100644 --- a/src/query/expression/tests/it/types.rs +++ b/src/query/expression/tests/it/types.rs @@ -12,8 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use arrow_schema::Schema; use chrono_tz::Tz; +use databend_common_expression::arrow::deserialize_column; +use databend_common_expression::arrow::serialize_column; use databend_common_expression::types::timestamp::timestamp_to_string; +use databend_common_expression::DataField; +use databend_common_expression::DataSchema; + +use crate::get_all_test_data_types; +use crate::rand_block_for_all_types; #[test] fn test_timestamp_to_string_formats() { @@ -26,3 +34,28 @@ fn test_timestamp_to_string_formats() { "2024-01-01 01:02:03.000000" ); } + +#[test] +fn test_convert_types() { + let all_types = get_all_test_data_types(); + let all_fields = all_types + .iter() + .enumerate() + .map(|(idx, data_type)| DataField::new(&format!("column_{idx}"), data_type.clone())) + .collect::>(); + + let schema = DataSchema::new(all_fields); + let arrow_schema = Schema::from(&schema); + let schema2 = DataSchema::try_from(&arrow_schema).unwrap(); + assert_eq!(schema, schema2); + + let random_block = rand_block_for_all_types(1024); + + for c in random_block.columns() { + let c = c.value.as_column().unwrap().clone(); + let data = serialize_column(&c); + let c2 = deserialize_column(&data).unwrap(); + + assert_eq!(c, c2); + } +} diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index a13b5364b33f..4ce4711dff6f 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -30,8 +30,9 @@ enable_queries_executor = [] [dependencies] arrow-array = { workspace = true } +arrow-buffer = { workspace = true } arrow-flight = { workspace = true } -arrow-ipc = { workspace = true } +arrow-ipc = { workspace = true, features = ["lz4", "zstd"] } arrow-schema = { workspace = true } arrow-udf-js = { workspace = true } arrow-udf-python = { workspace = true, optional = true } @@ -139,7 +140,6 @@ match-template = { workspace = true } md-5 = "0.10.5" naive-cityhash = "0.2.0" num_cpus = "1.16.0" -once_cell = { workspace = true } opendal = { workspace = true } opensrv-mysql = { version = "0.7.0", features = ["tls"] } opentelemetry = { workspace = true } diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index 7186bbb82848..d4b46ee4cee4 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use std::time::Duration; use std::time::Instant; -use databend_common_arrow::arrow_format::flight::service::flight_service_client::FlightServiceClient; +use arrow_flight::flight_service_client::FlightServiceClient; use databend_common_base::base::tokio::sync::Mutex; use databend_common_base::base::tokio::sync::Notify; use databend_common_base::base::tokio::task::JoinHandle; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/mod.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/mod.rs index a22653d6b126..ae0b795b92a3 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/mod.rs @@ -35,17 +35,12 @@ pub use transform_group_by_spill_writer::*; pub use transform_spill_reader::*; pub mod exchange_defines { - use databend_common_arrow::arrow::datatypes::Field; - use databend_common_arrow::arrow::datatypes::Schema as ArrowSchema; - use databend_common_arrow::arrow::io::flight::default_ipc_fields; - use databend_common_arrow::arrow::io::flight::WriteOptions; - use databend_common_arrow::arrow::io::ipc::IpcField; - use databend_common_arrow::arrow::io::ipc::IpcSchema; + use arrow_ipc::writer::IpcWriteOptions; + use arrow_schema::Schema; use databend_common_expression::types::DataType; use databend_common_expression::types::NumberDataType; use databend_common_expression::DataField; use databend_common_expression::DataSchema; - use once_cell::sync::OnceCell; pub fn spilled_schema() -> DataSchema { DataSchema::new(vec![ @@ -59,44 +54,12 @@ pub mod exchange_defines { ]) } - pub fn spilled_fields() -> &'static [Field] { - static IPC_SCHEMA: OnceCell> = OnceCell::new(); - - IPC_SCHEMA.get_or_init(|| { - let schema = spilled_schema(); - - ArrowSchema::from(&schema).fields - }) - } - - pub fn spilled_ipc_schema() -> &'static IpcSchema { - static IPC_SCHEMA: OnceCell = OnceCell::new(); - - IPC_SCHEMA.get_or_init(|| { - let schema = spilled_schema(); - - let arrow_schema = ArrowSchema::from(&schema); - let ipc_fields = default_ipc_fields(&arrow_schema.fields); - - IpcSchema { - fields: ipc_fields, - is_little_endian: true, - } - }) - } - - pub fn spilled_ipc_fields() -> &'static [IpcField] { - static IPC_FIELDS: OnceCell> = OnceCell::new(); - - IPC_FIELDS.get_or_init(|| { - let schema = spilled_schema(); - let arrow_schema = ArrowSchema::from(&schema); - default_ipc_fields(&arrow_schema.fields) - }) + pub fn spilled_arrow_schema() -> Schema { + let schema = spilled_schema(); + Schema::from(&schema) } - pub fn spilled_write_options() -> &'static WriteOptions { - static WRITE_OPTIONS: WriteOptions = WriteOptions { compression: None }; - &WRITE_OPTIONS + pub fn spilled_write_options() -> IpcWriteOptions { + IpcWriteOptions::default() } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs index 98abcec37315..2f86eef99a47 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs @@ -15,13 +15,7 @@ use std::marker::PhantomData; use std::sync::Arc; -use databend_common_arrow::arrow::datatypes::Field; -use databend_common_arrow::arrow::datatypes::Schema as ArrowSchema; -use databend_common_arrow::arrow::io::flight::default_ipc_fields; -use databend_common_arrow::arrow::io::flight::deserialize_batch; -use databend_common_arrow::arrow::io::flight::deserialize_dictionary; -use databend_common_arrow::arrow::io::ipc::read::Dictionaries; -use databend_common_arrow::arrow::io::ipc::IpcSchema; +use arrow_schema::Schema as ArrowSchema; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::ArrayType; @@ -46,13 +40,13 @@ use crate::pipelines::processors::transforms::aggregator::AggregateSerdeMeta; use crate::pipelines::processors::transforms::aggregator::BucketSpilledPayload; use crate::pipelines::processors::transforms::aggregator::BUCKET_TYPE; use crate::pipelines::processors::transforms::group_by::HashMethodBounds; +use crate::servers::flight::v1::exchange::serde::deserialize_block; use crate::servers::flight::v1::exchange::serde::ExchangeDeserializeMeta; use crate::servers::flight::v1::packets::DataPacket; use crate::servers::flight::v1::packets::FragmentData; pub struct TransformDeserializer { schema: DataSchemaRef, - ipc_schema: IpcSchema, arrow_schema: Arc, _phantom: PhantomData<(Method, V)>, } @@ -64,17 +58,11 @@ impl TransformDeserializer Result { let arrow_schema = ArrowSchema::from(schema.as_ref()); - let ipc_fields = default_ipc_fields(&arrow_schema.fields); - let ipc_schema = IpcSchema { - fields: ipc_fields, - is_little_endian: true, - }; Ok(ProcessorPtr::create(BlockMetaTransformer::create( input, output, TransformDeserializer:: { - ipc_schema, arrow_schema: Arc::new(arrow_schema), schema: schema.clone(), _phantom: Default::default(), @@ -95,42 +83,36 @@ impl TransformDeserializer { - self.deserialize_data_block(dict, &fragment_data, fields, schema, &self.schema)? + deserialize_block(dict, fragment_data, &self.schema, self.arrow_schema.clone())? } Some(meta) => match AggregateSerdeMeta::downcast_ref_from(meta) { None => { - self.deserialize_data_block(dict, &fragment_data, fields, schema, &self.schema)? + deserialize_block(dict, fragment_data, &self.schema, self.arrow_schema.clone())? } Some(meta) => { return match meta.typ == BUCKET_TYPE { true => Ok(DataBlock::empty_with_meta( AggregateMeta::::create_serialized( meta.bucket, - self.deserialize_data_block( + deserialize_block( dict, - &fragment_data, - fields, - schema, + fragment_data, &self.schema, + self.arrow_schema.clone(), )?, meta.max_partition_count, ), )), false => { - let fields = exchange_defines::spilled_fields(); - let schema = exchange_defines::spilled_ipc_schema(); let data_schema = Arc::new(exchange_defines::spilled_schema()); - let data_block = self.deserialize_data_block( + let arrow_schema = Arc::new(exchange_defines::spilled_arrow_schema()); + let data_block = deserialize_block( dict, - &fragment_data, - fields, - schema, + fragment_data, &data_schema, + arrow_schema.clone(), )?; let columns = data_block @@ -183,28 +165,6 @@ impl TransformDeserializer data_block.add_meta(meta), } } - - fn deserialize_data_block( - &self, - dict: Vec, - fragment_data: &FragmentData, - arrow_fields: &[Field], - ipc_schema: &IpcSchema, - data_schema: &DataSchemaRef, - ) -> Result { - let mut dictionaries = Dictionaries::new(); - - for dict_packet in dict { - if let DataPacket::Dictionary(flight_data) = dict_packet { - deserialize_dictionary(&flight_data, arrow_fields, ipc_schema, &mut dictionaries)?; - } - } - - let batch = - deserialize_batch(&fragment_data.data, arrow_fields, ipc_schema, &dictionaries)?; - - DataBlock::from_arrow_chunk(&batch, data_schema) - } } impl BlockMetaTransform for TransformDeserializer diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 7e38f9ec41e0..aa0e3feba5cc 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -15,11 +15,8 @@ use std::sync::Arc; use std::time::Instant; -use databend_common_arrow::arrow::datatypes::Schema as ArrowSchema; -use databend_common_arrow::arrow::io::flight::default_ipc_fields; -use databend_common_arrow::arrow::io::flight::WriteOptions; -use databend_common_arrow::arrow::io::ipc::write::Compression; -use databend_common_arrow::arrow::io::ipc::IpcField; +use arrow_ipc::writer::IpcWriteOptions; +use arrow_ipc::CompressionType; use databend_common_base::base::GlobalUniqName; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; @@ -71,8 +68,7 @@ pub struct TransformExchangeAggregateSerializer { ctx: Arc, method: Method, local_pos: usize, - options: WriteOptions, - ipc_fields: Vec, + options: IpcWriteOptions, operator: Operator, location_prefix: String, @@ -90,16 +86,14 @@ impl TransformExchangeAggregateSerializer { location_prefix: String, params: Arc, compression: Option, - schema: DataSchemaRef, + _schema: DataSchemaRef, local_pos: usize, ) -> Box { - let arrow_schema = ArrowSchema::from(schema.as_ref()); - let ipc_fields = default_ipc_fields(&arrow_schema.fields); let compression = match compression { None => None, Some(compression) => match compression { - FlightCompression::Lz4 => Some(Compression::LZ4), - FlightCompression::Zstd => Some(Compression::ZSTD), + FlightCompression::Lz4 => Some(CompressionType::LZ4_FRAME), + FlightCompression::Zstd => Some(CompressionType::ZSTD), }, }; @@ -112,8 +106,9 @@ impl TransformExchangeAggregateSerializer { operator, location_prefix, local_pos, - ipc_fields, - options: WriteOptions { compression }, + options: IpcWriteOptions::default() + .try_with_compression(compression) + .unwrap(), }) } } @@ -201,7 +196,7 @@ impl BlockMetaTransform c.replace_meta(meta); } - let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?; + let c = serialize_block(bucket, c, &self.options)?; serialized_blocks.push(FlightSerialized::DataBlock(c)); } } @@ -231,7 +226,7 @@ impl BlockMetaTransform c.replace_meta(meta); } - let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?; + let c = serialize_block(bucket, c, &self.options)?; serialized_blocks.push(FlightSerialized::DataBlock(c)); } } @@ -358,9 +353,8 @@ fn agg_spilling_aggregate_payload( partition_count, )))?; - let ipc_fields = exchange_defines::spilled_ipc_fields(); let write_options = exchange_defines::spilled_write_options(); - return serialize_block(-1, data_block, ipc_fields, write_options); + return serialize_block(-1, data_block, &write_options); } Ok(DataBlock::empty()) @@ -481,9 +475,8 @@ fn spilling_aggregate_payload( vec![], )))?; - let ipc_fields = exchange_defines::spilled_ipc_fields(); let write_options = exchange_defines::spilled_write_options(); - return serialize_block(-1, data_block, ipc_fields, write_options); + return serialize_block(-1, data_block, &write_options); } Ok(DataBlock::empty()) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs index d68a956d1ec9..a5a7777ad422 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs @@ -17,11 +17,8 @@ use std::fmt::Formatter; use std::sync::Arc; use std::time::Instant; -use databend_common_arrow::arrow::datatypes::Schema as ArrowSchema; -use databend_common_arrow::arrow::io::flight::default_ipc_fields; -use databend_common_arrow::arrow::io::flight::WriteOptions; -use databend_common_arrow::arrow::io::ipc::write::Compression; -use databend_common_arrow::arrow::io::ipc::IpcField; +use arrow_ipc::writer::IpcWriteOptions; +use arrow_ipc::CompressionType; use databend_common_base::base::GlobalUniqName; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; @@ -74,8 +71,7 @@ pub struct TransformExchangeGroupBySerializer { ctx: Arc, method: Method, local_pos: usize, - options: WriteOptions, - ipc_fields: Vec, + options: IpcWriteOptions, operator: Operator, location_prefix: String, @@ -90,17 +86,15 @@ impl TransformExchangeGroupBySerializer { method: Method, operator: Operator, location_prefix: String, - schema: DataSchemaRef, + _schema: DataSchemaRef, local_pos: usize, compression: Option, ) -> Box { - let arrow_schema = ArrowSchema::from(schema.as_ref()); - let ipc_fields = default_ipc_fields(&arrow_schema.fields); let compression = match compression { None => None, Some(compression) => match compression { - FlightCompression::Lz4 => Some(Compression::LZ4), - FlightCompression::Zstd => Some(Compression::ZSTD), + FlightCompression::Lz4 => Some(CompressionType::LZ4_FRAME), + FlightCompression::Zstd => Some(CompressionType::ZSTD), }, }; @@ -112,9 +106,10 @@ impl TransformExchangeGroupBySerializer { method, operator, local_pos, - ipc_fields, location_prefix, - options: WriteOptions { compression }, + options: IpcWriteOptions::default() + .try_with_compression(compression) + .unwrap(), }, ) } @@ -252,7 +247,7 @@ impl BlockMetaTransform c.replace_meta(meta); } - let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?; + let c = serialize_block(bucket, c, &self.options)?; serialized_blocks.push(FlightSerialized::DataBlock(c)); } } @@ -280,7 +275,7 @@ impl BlockMetaTransform c.replace_meta(meta); } - let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?; + let c = serialize_block(bucket, c, &self.options)?; serialized_blocks.push(FlightSerialized::DataBlock(c)); } } @@ -410,9 +405,8 @@ fn agg_spilling_group_by_payload( partition_count, )))?; - let ipc_fields = exchange_defines::spilled_ipc_fields(); let write_options = exchange_defines::spilled_write_options(); - return serialize_block(-1, data_block, ipc_fields, write_options); + return serialize_block(-1, data_block, &write_options); } Ok(DataBlock::empty()) @@ -531,9 +525,8 @@ fn spilling_group_by_payload( vec![], )))?; - let ipc_fields = exchange_defines::spilled_ipc_fields(); let write_options = exchange_defines::spilled_write_options(); - return serialize_block(-1, data_block, ipc_fields, write_options); + return serialize_block(-1, data_block, &write_options); } Ok(DataBlock::empty()) diff --git a/src/query/service/src/servers/flight/flight_client.rs b/src/query/service/src/servers/flight/flight_client.rs index 5c3b1012f050..a226dcf35d33 100644 --- a/src/query/service/src/servers/flight/flight_client.rs +++ b/src/query/service/src/servers/flight/flight_client.rs @@ -15,12 +15,12 @@ use std::str::FromStr; use std::sync::Arc; +use arrow_flight::flight_service_client::FlightServiceClient; +use arrow_flight::Action; +use arrow_flight::FlightData; +use arrow_flight::Ticket; use async_channel::Receiver; use async_channel::Sender; -use databend_common_arrow::arrow_format::flight::data::Action; -use databend_common_arrow::arrow_format::flight::data::FlightData; -use databend_common_arrow::arrow_format::flight::data::Ticket; -use databend_common_arrow::arrow_format::flight::service::flight_service_client::FlightServiceClient; use databend_common_base::base::tokio::time::Duration; use databend_common_base::runtime::drop_guard; use databend_common_exception::ErrorCode; @@ -82,7 +82,7 @@ impl FlightClient { drop(message); let mut request = databend_common_tracing::inject_span_to_tonic_request(Request::new(Action { - body, + body: body.into(), r#type: path.to_string(), })); diff --git a/src/query/service/src/servers/flight/flight_service.rs b/src/query/service/src/servers/flight/flight_service.rs index 063d61f1ceb1..f7742eb96b42 100644 --- a/src/query/service/src/servers/flight/flight_service.rs +++ b/src/query/service/src/servers/flight/flight_service.rs @@ -16,7 +16,7 @@ use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; -use databend_common_arrow::arrow_format::flight::service::flight_service_server::FlightServiceServer; +use arrow_flight::flight_service_server::FlightServiceServer; use databend_common_base::base::tokio; use databend_common_base::base::tokio::sync::Notify; use databend_common_config::InnerConfig; diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index b3ff2fd1f465..a05b9a48c306 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -21,9 +21,9 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; +use arrow_flight::flight_service_client::FlightServiceClient; +use arrow_flight::FlightData; use async_channel::Receiver; -use databend_common_arrow::arrow_format::flight::data::FlightData; -use databend_common_arrow::arrow_format::flight::service::flight_service_client::FlightServiceClient; use databend_common_base::base::GlobalInstance; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::Thread; diff --git a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_deserializer.rs b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_deserializer.rs index 999f7d990a5d..eac0f0e31c05 100644 --- a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_deserializer.rs +++ b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_deserializer.rs @@ -12,21 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; -use databend_common_arrow::arrow::datatypes::Schema as ArrowSchema; -use databend_common_arrow::arrow::io::flight::default_ipc_fields; -use databend_common_arrow::arrow::io::flight::deserialize_batch; -use databend_common_arrow::arrow::io::flight::deserialize_dictionary; -use databend_common_arrow::arrow::io::ipc::read::Dictionaries; -use databend_common_arrow::arrow::io::ipc::IpcSchema; +use arrow_buffer::Buffer; +use arrow_flight::utils::flight_data_to_arrow_batch; +use arrow_ipc::root_as_message; +use arrow_schema::Schema as ArrowSchema; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfo; use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::DataBlock; +use databend_common_expression::DataSchema; use databend_common_expression::DataSchemaRef; use databend_common_io::prelude::bincode_deserialize_from_slice; use databend_common_io::prelude::BinaryRead; @@ -44,7 +44,6 @@ use crate::servers::flight::v1::packets::FragmentData; pub struct TransformExchangeDeserializer { schema: DataSchemaRef, - ipc_schema: IpcSchema, arrow_schema: Arc, } @@ -55,17 +54,11 @@ impl TransformExchangeDeserializer { schema: &DataSchemaRef, ) -> ProcessorPtr { let arrow_schema = ArrowSchema::from(schema.as_ref()); - let ipc_fields = default_ipc_fields(&arrow_schema.fields); - let ipc_schema = IpcSchema { - fields: ipc_fields, - is_little_endian: true, - }; ProcessorPtr::create(BlockMetaTransformer::create( input, output, TransformExchangeDeserializer { - ipc_schema, arrow_schema: Arc::new(arrow_schema), schema: schema.clone(), }, @@ -84,36 +77,45 @@ impl TransformExchangeDeserializer { return Ok(DataBlock::new_with_meta(vec![], 0, meta)); } - let mut dictionaries = Dictionaries::new(); - - for dict_packet in dict { - if let DataPacket::Dictionary(ff) = dict_packet { - deserialize_dictionary( - &ff, - &self.arrow_schema.fields, - &self.ipc_schema, - &mut dictionaries, - )?; - } - } - - let batch = deserialize_batch( - &fragment_data.data, - &self.arrow_schema.fields, - &self.ipc_schema, - &dictionaries, - )?; - - let data_block = DataBlock::from_arrow_chunk(&batch, &self.schema)?; - + let data_block = + deserialize_block(dict, fragment_data, &self.schema, self.arrow_schema.clone())?; if data_block.num_columns() == 0 { return Ok(DataBlock::new_with_meta(vec![], row_count as usize, meta)); } - data_block.add_meta(meta) } } +pub fn deserialize_block( + dict: Vec, + fragment_data: FragmentData, + schema: &DataSchema, + arrow_schema: Arc, +) -> Result { + let mut dictionaries_by_id = HashMap::new(); + for dict_packet in dict { + if let DataPacket::Dictionary(data) = dict_packet { + let message = + root_as_message(&data.data_header[..]).expect("Error parsing first message"); + let buffer = Buffer::from_bytes(data.data_body.into()); + arrow_ipc::reader::read_dictionary( + &buffer, + message + .header_as_dictionary_batch() + .expect("Error parsing dictionary"), + &arrow_schema, + &mut dictionaries_by_id, + &message.version(), + ) + .expect("Error reading dictionary"); + } + } + + let batch = flight_data_to_arrow_batch(&fragment_data.data, arrow_schema, &dictionaries_by_id)?; + let (data_block, _) = DataBlock::from_record_batch(schema, &batch)?; + Ok(data_block) +} + impl BlockMetaTransform for TransformExchangeDeserializer { const UNKNOWN_MODE: UnknownMode = UnknownMode::Pass; const NAME: &'static str = "TransformExchangeDeserializer"; diff --git a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs index ce5a829558c5..9b2275be5524 100644 --- a/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs +++ b/src/query/service/src/servers/flight/v1/exchange/serde/exchange_serializer.rs @@ -16,13 +16,17 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; -use databend_common_arrow::arrow::chunk::Chunk; -use databend_common_arrow::arrow::datatypes::Schema as ArrowSchema; -use databend_common_arrow::arrow::io::flight::default_ipc_fields; -use databend_common_arrow::arrow::io::flight::serialize_batch; -use databend_common_arrow::arrow::io::flight::WriteOptions; -use databend_common_arrow::arrow::io::ipc::write::Compression; -use databend_common_arrow::arrow::io::ipc::IpcField; +use arrow_array::RecordBatch; +use arrow_array::RecordBatchOptions; +use arrow_flight::FlightData; +use arrow_flight::SchemaAsIpc; +use arrow_ipc::writer::DictionaryTracker; +use arrow_ipc::writer::IpcDataGenerator; +use arrow_ipc::writer::IpcWriteOptions; +use arrow_ipc::CompressionType; +use arrow_schema::ArrowError; +use arrow_schema::Schema as ArrowSchema; +use bytes::Bytes; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_exception::ErrorCode; @@ -96,24 +100,21 @@ impl BlockMetaInfo for ExchangeSerializeMeta { } pub struct TransformExchangeSerializer { - options: WriteOptions, - ipc_fields: Vec, + options: IpcWriteOptions, } impl TransformExchangeSerializer { pub fn create( input: Arc, output: Arc, - params: &MergeExchangeParams, + _params: &MergeExchangeParams, compression: Option, ) -> Result { - let arrow_schema = ArrowSchema::from(params.schema.as_ref()); - let ipc_fields = default_ipc_fields(&arrow_schema.fields); let compression = match compression { None => None, Some(compression) => match compression { - FlightCompression::Lz4 => Some(Compression::LZ4), - FlightCompression::Zstd => Some(Compression::ZSTD), + FlightCompression::Lz4 => Some(CompressionType::LZ4_FRAME), + FlightCompression::Zstd => Some(CompressionType::ZSTD), }, }; @@ -121,8 +122,7 @@ impl TransformExchangeSerializer { input, output, TransformExchangeSerializer { - ipc_fields, - options: WriteOptions { compression }, + options: IpcWriteOptions::default().try_with_compression(compression)?, }, ))) } @@ -133,14 +133,13 @@ impl Transform for TransformExchangeSerializer { fn transform(&mut self, data_block: DataBlock) -> Result { Profile::record_usize_profile(ProfileStatisticsName::ExchangeRows, data_block.num_rows()); - serialize_block(0, data_block, &self.ipc_fields, &self.options) + serialize_block(0, data_block, &self.options) } } pub struct TransformScatterExchangeSerializer { local_pos: usize, - options: WriteOptions, - ipc_fields: Vec, + options: IpcWriteOptions, } impl TransformScatterExchangeSerializer { @@ -151,13 +150,11 @@ impl TransformScatterExchangeSerializer { params: &ShuffleExchangeParams, ) -> Result { let local_id = ¶ms.executor_id; - let arrow_schema = ArrowSchema::from(params.schema.as_ref()); - let ipc_fields = default_ipc_fields(&arrow_schema.fields); let compression = match compression { None => None, Some(compression) => match compression { - FlightCompression::Lz4 => Some(Compression::LZ4), - FlightCompression::Zstd => Some(Compression::ZSTD), + FlightCompression::Lz4 => Some(CompressionType::LZ4_FRAME), + FlightCompression::Zstd => Some(CompressionType::ZSTD), }, }; @@ -165,8 +162,7 @@ impl TransformScatterExchangeSerializer { input, output, TransformScatterExchangeSerializer { - ipc_fields, - options: WriteOptions { compression }, + options: IpcWriteOptions::default().try_with_compression(compression)?, local_pos: params .destination_ids .iter() @@ -191,7 +187,7 @@ impl BlockMetaTransform for TransformScatterExchangeSeriali new_blocks.push(match self.local_pos == index { true => block, - false => serialize_block(0, block, &self.ipc_fields, &self.options)?, + false => serialize_block(0, block, &self.options)?, }); } @@ -204,8 +200,7 @@ impl BlockMetaTransform for TransformScatterExchangeSeriali pub fn serialize_block( block_num: isize, data_block: DataBlock, - ipc_field: &[IpcField], - options: &WriteOptions, + options: &IpcWriteOptions, ) -> Result { if data_block.is_empty() && data_block.get_meta().is_none() { return Ok(DataBlock::empty_with_meta(ExchangeSerializeMeta::create( @@ -219,22 +214,66 @@ pub fn serialize_block( bincode_serialize_into_buf(&mut meta, &data_block.get_meta()) .map_err(|_| ErrorCode::BadBytes("block meta serialize error when exchange"))?; - let (dict, values) = match data_block.is_empty() { - true => serialize_batch(&Chunk::new(vec![]), &[], options)?, + let (_, dict, values) = match data_block.is_empty() { + true => batches_to_flight_data_with_options( + &ArrowSchema::empty(), + vec![ + RecordBatch::try_new_with_options( + Arc::new(ArrowSchema::empty()), + vec![], + &RecordBatchOptions::new().with_row_count(Some(0)), + ) + .unwrap(), + ], + options, + )?, false => { - let chunks = data_block.try_into()?; - serialize_batch(&chunks, ipc_field, options)? + let schema = data_block.infer_schema(); + let arrow_schema = ArrowSchema::from(&schema); + let batch = data_block.to_record_batch_with_dataschema(&schema)?; + batches_to_flight_data_with_options(&arrow_schema, vec![batch], options)? } }; - let mut packet = Vec::with_capacity(dict.len() + 1); - + let mut packet = Vec::with_capacity(dict.len() + values.len()); for dict_flight in dict { packet.push(DataPacket::Dictionary(dict_flight)); } - packet.push(DataPacket::FragmentData(FragmentData::create(meta, values))); + let meta: Bytes = meta.into(); + for value in values { + packet.push(DataPacket::FragmentData(FragmentData::create( + meta.clone(), + value, + ))); + } + Ok(DataBlock::empty_with_meta(ExchangeSerializeMeta::create( block_num, packet, ))) } + +/// Convert `RecordBatch`es to wire protocol `FlightData`s +/// Returns schema, dictionaries and flight data +pub fn batches_to_flight_data_with_options( + schema: &ArrowSchema, + batches: Vec, + options: &IpcWriteOptions, +) -> std::result::Result<(FlightData, Vec, Vec), ArrowError> { + let schema_flight_data: FlightData = SchemaAsIpc::new(schema, options).into(); + let mut dictionaries = Vec::with_capacity(batches.len()); + let mut flight_data = Vec::with_capacity(batches.len()); + + let data_gen = IpcDataGenerator::default(); + let mut dictionary_tracker = + DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + + for batch in batches.iter() { + let (encoded_dictionaries, encoded_batch) = + data_gen.encoded_batch(batch, &mut dictionary_tracker, options)?; + + dictionaries.extend(encoded_dictionaries.into_iter().map(Into::into)); + flight_data.push(encoded_batch.into()); + } + Ok((schema_flight_data, dictionaries, flight_data)) +} diff --git a/src/query/service/src/servers/flight/v1/exchange/serde/mod.rs b/src/query/service/src/servers/flight/v1/exchange/serde/mod.rs index 900f7b5bb639..7349b2f46b0c 100644 --- a/src/query/service/src/servers/flight/v1/exchange/serde/mod.rs +++ b/src/query/service/src/servers/flight/v1/exchange/serde/mod.rs @@ -15,6 +15,7 @@ mod exchange_deserializer; mod exchange_serializer; +pub use exchange_deserializer::deserialize_block; pub use exchange_deserializer::ExchangeDeserializeMeta; pub use exchange_deserializer::TransformExchangeDeserializer; pub use exchange_serializer::serialize_block; diff --git a/src/query/service/src/servers/flight/v1/flight_service.rs b/src/query/service/src/servers/flight/v1/flight_service.rs index 2d8c763de701..ec90f9e1e4ba 100644 --- a/src/query/service/src/servers/flight/v1/flight_service.rs +++ b/src/query/service/src/servers/flight/v1/flight_service.rs @@ -14,20 +14,21 @@ use std::pin::Pin; -use databend_common_arrow::arrow_format::flight::data::Action; -use databend_common_arrow::arrow_format::flight::data::ActionType; -use databend_common_arrow::arrow_format::flight::data::Criteria; -use databend_common_arrow::arrow_format::flight::data::Empty; -use databend_common_arrow::arrow_format::flight::data::FlightData; -use databend_common_arrow::arrow_format::flight::data::FlightDescriptor; -use databend_common_arrow::arrow_format::flight::data::FlightInfo; -use databend_common_arrow::arrow_format::flight::data::HandshakeRequest; -use databend_common_arrow::arrow_format::flight::data::HandshakeResponse; -use databend_common_arrow::arrow_format::flight::data::PutResult; -use databend_common_arrow::arrow_format::flight::data::Result as FlightResult; -use databend_common_arrow::arrow_format::flight::data::SchemaResult; -use databend_common_arrow::arrow_format::flight::data::Ticket; -use databend_common_arrow::arrow_format::flight::service::flight_service_server::FlightService; +use arrow_flight::flight_service_server::FlightService; +use arrow_flight::Action; +use arrow_flight::ActionType; +use arrow_flight::Criteria; +use arrow_flight::Empty; +use arrow_flight::FlightData; +use arrow_flight::FlightDescriptor; +use arrow_flight::FlightInfo; +use arrow_flight::HandshakeRequest; +use arrow_flight::HandshakeResponse; +use arrow_flight::PollInfo; +use arrow_flight::PutResult; +use arrow_flight::Result as FlightResult; +use arrow_flight::SchemaResult; +use arrow_flight::Ticket; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use fastrace::func_path; @@ -89,6 +90,13 @@ impl FlightService for DatabendQueryFlightService { )) } + #[async_backtrace::framed] + async fn poll_flight_info(&self, _request: Request) -> Response { + Err(Status::unimplemented( + "DatabendQuery does not implement poll_flight_info.", + )) + } + #[async_backtrace::framed] async fn get_schema(&self, _: Request) -> Response { Err(Status::unimplemented( @@ -170,7 +178,7 @@ impl FlightService for DatabendQueryFlightService { { Err(cause) => Err(cause.into()), Ok(body) => Ok(RawResponse::new( - Box::pin(tokio_stream::once(Ok(FlightResult { body }))) + Box::pin(tokio_stream::once(Ok(FlightResult { body: body.into() }))) as FlightStream, )), } diff --git a/src/query/service/src/servers/flight/v1/packets/packet_data.rs b/src/query/service/src/servers/flight/v1/packets/packet_data.rs index 2ca07a7a8dcd..9d48ffff5978 100644 --- a/src/query/service/src/servers/flight/v1/packets/packet_data.rs +++ b/src/query/service/src/servers/flight/v1/packets/packet_data.rs @@ -15,12 +15,14 @@ use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; +use std::ops::Deref; use std::vec; +use arrow_flight::FlightData; use byteorder::BigEndian; use byteorder::ReadBytesExt; use byteorder::WriteBytesExt; -use databend_common_arrow::arrow_format::flight::data::FlightData; +use bytes::Bytes; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetricValues; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -32,7 +34,7 @@ use log::error; use crate::servers::flight::v1::packets::ProgressInfo; pub struct FragmentData { - meta: Vec, + meta: Bytes, pub data: FlightData, } @@ -41,7 +43,7 @@ impl FragmentData { &self.meta[0..self.meta.len() - 1] } - pub fn create(meta: Vec, data: FlightData) -> FragmentData { + pub fn create(meta: Bytes, data: FlightData) -> FragmentData { FragmentData { meta, data } } } @@ -93,9 +95,9 @@ impl TryFrom for FlightData { } DataPacket::FragmentData(fragment_data) => FlightData::from(fragment_data), DataPacket::QueryProfiles(profiles) => FlightData { - app_metadata: vec![0x03], - data_body: serde_json::to_vec(&profiles)?, - data_header: vec![], + app_metadata: vec![0x03].into(), + data_body: serde_json::to_vec(&profiles)?.into(), + data_header: Default::default(), flight_descriptor: None, }, DataPacket::SerializeProgress(progress) => { @@ -108,32 +110,34 @@ impl TryFrom for FlightData { } FlightData { - data_body, - data_header: vec![], + data_body: data_body.into(), + data_header: Default::default(), flight_descriptor: None, - app_metadata: vec![0x04], + app_metadata: vec![0x04].into(), } } DataPacket::Dictionary(mut flight_data) => { - flight_data.app_metadata.push(0x05); + let mut app_metadata = flight_data.app_metadata.to_vec(); + app_metadata.push(0x05); + flight_data.app_metadata = app_metadata.into(); flight_data } DataPacket::CopyStatus(status) => FlightData { - app_metadata: vec![0x06], - data_body: serde_json::to_vec(&status)?, - data_header: vec![], + app_metadata: vec![0x06].into(), + data_body: serde_json::to_vec(&status)?.into(), + data_header: Default::default(), flight_descriptor: None, }, DataPacket::MutationStatus(status) => FlightData { - app_metadata: vec![0x07], - data_body: serde_json::to_vec(&status)?, - data_header: vec![], + app_metadata: vec![0x07].into(), + data_body: serde_json::to_vec(&status)?.into(), + data_header: Default::default(), flight_descriptor: None, }, DataPacket::DataCacheMetrics(metrics) => FlightData { - app_metadata: vec![0x08], - data_body: serde_json::to_vec(&metrics)?, - data_header: vec![], + app_metadata: vec![0x08].into(), + data_body: serde_json::to_vec(&metrics)?.into(), + data_header: Default::default(), flight_descriptor: None, }, }) @@ -141,10 +145,11 @@ impl TryFrom for FlightData { } impl From for FlightData { - fn from(mut data: FragmentData) -> Self { - data.meta.push(0x01); + fn from(data: FragmentData) -> Self { + let mut metadata = data.meta.to_vec(); + metadata.push(0x01); FlightData { - app_metadata: data.meta, + app_metadata: metadata.into(), data_body: data.data.data_body, data_header: data.data.data_header, flight_descriptor: None, @@ -171,7 +176,7 @@ impl TryFrom for DataPacket { Ok(DataPacket::QueryProfiles(status)) } 0x04 => { - let mut bytes = flight_data.data_body.as_slice(); + let mut bytes = flight_data.data_body.deref(); let progress_size = bytes.read_u64::()?; // Progress. @@ -206,7 +211,7 @@ impl TryFrom for FragmentData { fn try_from(flight_data: FlightData) -> Result { Ok(FragmentData::create(flight_data.app_metadata, FlightData { - app_metadata: vec![], + app_metadata: Default::default(), flight_descriptor: None, data_body: flight_data.data_body, data_header: flight_data.data_header, diff --git a/src/query/sql/src/planner/binder/copy_into_location.rs b/src/query/sql/src/planner/binder/copy_into_location.rs index 54c43cb150cd..8a88aebf8432 100644 --- a/src/query/sql/src/planner/binder/copy_into_location.rs +++ b/src/query/sql/src/planner/binder/copy_into_location.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_ast::ast::quote::display_ident; use databend_common_ast::ast::CopyIntoLocationSource; use databend_common_ast::ast::CopyIntoLocationStmt; use databend_common_ast::ast::Statement; @@ -63,8 +64,14 @@ impl<'a> Binder { .with_options .as_ref() .map_or(String::new(), |with_options| format!(" {with_options}")); + + let quoted_ident_case_sensitive = + self.ctx.get_settings().get_quoted_ident_case_sensitive()?; let subquery = format!( - "SELECT * FROM \"{catalog_name}\".\"{database_name}\".\"{table_name}\"{with_options_str}" + "SELECT * FROM {}.{}.{}{with_options_str}", + display_ident(&catalog_name, quoted_ident_case_sensitive, self.dialect), + display_ident(&database_name, quoted_ident_case_sensitive, self.dialect), + display_ident(&table_name, quoted_ident_case_sensitive, self.dialect), ); let tokens = tokenize_sql(&subquery)?; let sub_stmt_msg = parse_sql(&tokens, self.dialect)?; diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index 17ef9a188256..52bd8d5d62c0 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -14,7 +14,7 @@ test = true ahash = "0.8.3" arrow = { workspace = true } arrow-array = { workspace = true } -arrow-ipc = { workspace = true } +arrow-ipc = { workspace = true, features = ["lz4", "zstd"] } async-backtrace = { workspace = true } async-channel = "1.7.1" async-trait = { workspace = true } diff --git a/src/query/storages/system/src/dictionaries_table.rs b/src/query/storages/system/src/dictionaries_table.rs index 9cc0c61fafcd..39cd6644b82a 100644 --- a/src/query/storages/system/src/dictionaries_table.rs +++ b/src/query/storages/system/src/dictionaries_table.rs @@ -70,8 +70,8 @@ impl AsyncSystemTable for DictionariesTable { let mut sources = vec![]; let mut comments = vec![]; - let mut created_ons = vec![]; - let mut updated_ons = vec![]; + let mut created_on_values = vec![]; + let mut updated_on_values = vec![]; let catalog = ctx.get_default_catalog().unwrap(); let databases = catalog.list_databases(&tenant).await?; @@ -91,12 +91,12 @@ impl AsyncSystemTable for DictionariesTable { comments.push(comment); let created_on = dict_meta.created_on.timestamp_micros(); - created_ons.push(created_on); + created_on_values.push(created_on); let updated_on = match dict_meta.updated_on { Some(updated_on) => updated_on.timestamp_micros(), None => created_on, }; - updated_ons.push(updated_on); + updated_on_values.push(updated_on); let schema = dict_meta.schema; let fields = &schema.fields; @@ -150,8 +150,8 @@ impl AsyncSystemTable for DictionariesTable { attribute_types_builder.build(), StringType::from_data(sources), StringType::from_data(comments), - TimestampType::from_data(created_ons), - TimestampType::from_data(updated_ons), + TimestampType::from_data(created_on_values), + TimestampType::from_data(updated_on_values), ])); } } diff --git a/tests/sqllogictests/suites/query/functions/02_0010_function_if.test b/tests/sqllogictests/suites/query/functions/02_0010_function_if.test index 9d8ee61cdd8e..20269ff5c552 100644 --- a/tests/sqllogictests/suites/query/functions/02_0010_function_if.test +++ b/tests/sqllogictests/suites/query/functions/02_0010_function_if.test @@ -1,3 +1,27 @@ +statement ok +set max_block_size = 3 + +query T +SELECT CASE WHEN number > 3 THEN [{}, {}, {}] WHEN number > 5 THEN [{}] END FROM numbers(10) order by number; +---- +NULL +NULL +NULL +NULL +[{},{},{}] +[{},{},{}] +[{},{},{}] +[{},{},{}] +[{},{},{}] +[{},{},{}] + +query T +select [{}] from numbers(3) +---- +[{}] +[{}] +[{}] + query B select if(number>1, true, false) from numbers(3) order by number ---- @@ -99,7 +123,7 @@ NULL 1.0 query T -select if(number = 4, 3::VARIANT, number = 5, null, number = 6, '"a"'::VARIANT, null) from numbers(10) +select if(number = 4, 3::VARIANT, number = 5, null, number = 6, '"a"'::VARIANT, null) from numbers(10) order by number ---- NULL NULL