Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(query): use arrow-flight to exchange datas #16657

Merged
merged 18 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ mutable_key_type = "allow"
result_large_err = "allow"

## DONT'T DELETE THIS: If we want best performance, we should use this profile but it will take longer time to compile.
## Test SQL:
## select sum(number) from numbers_mt(10000000000); ~ 3x performance
## Test SQL:
## select sum(number) from numbers_mt(10000000000); ~ 3x performance
## select max(number) from numbers_mt(10000000000); ~ 3x performance
# [profile.release]
# debug = 1
Expand Down
29 changes: 25 additions & 4 deletions src/common/arrow/src/arrow/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,29 @@ impl Field {
}
}

// For databend's extension key
pub const EXTENSION_KEY: &str = "Extension";

#[cfg(feature = "arrow")]
impl From<Field> for arrow_schema::Field {
fn from(value: Field) -> Self {
Self::new(value.name, value.data_type.into(), value.is_nullable)
.with_metadata(value.metadata.into_iter().collect())
(&value).into()
}
}

#[cfg(feature = "arrow")]
impl From<&Field> for arrow_schema::Field {
fn from(value: &Field) -> Self {
let mut metadata = value.metadata.clone();
let ty = if let DataType::Extension(extension_type, ty, _) = &value.data_type {
metadata.insert(EXTENSION_KEY.to_string(), extension_type.clone());
ty.as_ref().clone()
} else {
value.data_type.clone()
};

Self::new(value.name.clone(), ty.into(), value.is_nullable)
.with_metadata(metadata.into_iter().collect())
}
}

Expand All @@ -89,12 +107,15 @@ impl From<arrow_schema::Field> for Field {
#[cfg(feature = "arrow")]
impl From<&arrow_schema::Field> for Field {
fn from(value: &arrow_schema::Field) -> Self {
let data_type = value.data_type().clone().into();
let metadata = value
let mut data_type = value.data_type().clone().into();
let mut metadata: Metadata = value
.metadata()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
if let Some(v) = metadata.remove(EXTENSION_KEY) {
data_type = DataType::Extension(v, Box::new(data_type), None);
}
Self::new(value.name(), data_type, value.is_nullable()).with_metadata(metadata)
}
}
Expand Down
1 change: 1 addition & 0 deletions src/common/exception/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ databend-common-arrow = { workspace = true }
databend-common-ast = { workspace = true }

anyhow = { workspace = true }
arrow-flight = { workspace = true }
arrow-schema = { workspace = true }
backtrace = { workspace = true }
bincode = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions src/common/exception/src/exception_flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_arrow::arrow_format::flight::data::FlightData;
use arrow_flight::FlightData;

use crate::ErrorCode;
use crate::Result;
Expand All @@ -24,9 +24,9 @@ impl From<ErrorCode> for FlightData {
serde_json::to_vec::<SerializedError>(&SerializedError::from(&error)).unwrap();

FlightData {
data_body: serialized_error,
app_metadata: vec![0x02],
data_header: error.code().to_be_bytes().to_vec(),
data_body: serialized_error.into(),
app_metadata: vec![0x02].into(),
data_header: error.code().to_be_bytes().to_vec().into(),
flight_descriptor: None,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/exception/tests/it/exception_flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use std::sync::Arc;

use arrow_flight::FlightData;
use backtrace::Backtrace;
use databend_common_arrow::arrow_format::flight::data::FlightData;
use databend_common_exception::exception::ErrorCodeBacktrace;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down
11 changes: 11 additions & 0 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::types::AnyType;
use crate::types::DataType;
use crate::Column;
use crate::ColumnBuilder;
use crate::DataField;
use crate::DataSchemaRef;
use crate::Domain;
use crate::Scalar;
Expand Down Expand Up @@ -589,6 +590,16 @@ impl DataBlock {
debug_assert!(self.columns.last().unwrap().value.as_column().is_some());
self.columns.last().unwrap().value.as_column().unwrap()
}

pub fn infer_schema(&self) -> DataSchema {
let fields = self
.columns()
.iter()
.enumerate()
.map(|(index, e)| DataField::new(&format!("col_{index}"), e.data_type.clone()))
.collect();
DataSchema::new(fields)
}
}

impl TryFrom<DataBlock> for ArrowChunk<ArrayRef> {
Expand Down
54 changes: 19 additions & 35 deletions src/query/expression/src/converts/arrow/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::DataType as ArrowDataType;
use arrow_schema::Field as ArrowField;
use arrow_schema::Field;
use arrow_schema::Schema as ArrowSchema;
use databend_common_arrow::arrow::datatypes::DataType as Arrow2DataType;
use databend_common_arrow::arrow::datatypes::Field as Arrow2Field;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

use super::EXTENSION_KEY;
use crate::types::DataType;
use crate::Column;
use crate::DataBlock;
Expand All @@ -32,17 +29,30 @@ use crate::DataSchema;
use crate::TableField;
use crate::TableSchema;

impl TryFrom<&ArrowSchema> for DataSchema {
impl TryFrom<&Field> for DataField {
type Error = ErrorCode;
fn try_from(arrow_f: &Field) -> Result<DataField> {
Ok(DataField::from(&TableField::try_from(arrow_f)?))
}
}

impl TryFrom<&Field> for TableField {
type Error = ErrorCode;
fn try_from(arrow_f: &Field) -> Result<TableField> {
TableField::try_from(&Arrow2Field::from(arrow_f))
}
}

impl TryFrom<&ArrowSchema> for DataSchema {
type Error = ErrorCode;
fn try_from(schema: &ArrowSchema) -> Result<DataSchema> {
let fields = schema
.fields
.iter()
.map(|arrow_f| {
Ok(DataField::from(&TableField::try_from(
&arrow2_field_from_arrow_field(arrow_f),
)?))
Ok(DataField::from(&TableField::try_from(&Arrow2Field::from(
arrow_f,
))?))
})
.collect::<Result<Vec<_>>>()?;
Ok(DataSchema::new_from(
Expand All @@ -54,12 +64,11 @@ impl TryFrom<&ArrowSchema> for DataSchema {

impl TryFrom<&ArrowSchema> for TableSchema {
type Error = ErrorCode;

fn try_from(schema: &ArrowSchema) -> Result<TableSchema> {
let fields = schema
.fields
.iter()
.map(|arrow_f| TableField::try_from(&arrow2_field_from_arrow_field(arrow_f)))
.map(|arrow_f| TableField::try_from(&Arrow2Field::from(arrow_f)))
.collect::<Result<Vec<_>>>()?;
Ok(TableSchema::new_from(
fields,
Expand Down Expand Up @@ -108,28 +117,3 @@ impl Column {
Column::from_arrow(arrow2_array.as_ref(), data_type)
}
}

fn arrow2_field_from_arrow_field(field: &ArrowField) -> Arrow2Field {
let mut data_type = match field.data_type() {
ArrowDataType::List(f) => Arrow2DataType::List(Box::new(arrow2_field_from_arrow_field(f))),
ArrowDataType::LargeList(f) => {
Arrow2DataType::LargeList(Box::new(arrow2_field_from_arrow_field(f)))
}
ArrowDataType::FixedSizeList(f, size) => {
Arrow2DataType::FixedSizeList(Box::new(arrow2_field_from_arrow_field(f)), *size as _)
}
ArrowDataType::Map(f, ordered) => {
Arrow2DataType::Map(Box::new(arrow2_field_from_arrow_field(f)), *ordered)
}
ArrowDataType::Struct(f) => {
Arrow2DataType::Struct(f.iter().map(|f| arrow2_field_from_arrow_field(f)).collect())
}
other => other.clone().into(),
};

if let Some(extension_type) = field.metadata().get(EXTENSION_KEY) {
data_type = Arrow2DataType::Extension(extension_type.clone(), Box::new(data_type), None);
}

Arrow2Field::new(field.name(), data_type, field.is_nullable())
}
53 changes: 14 additions & 39 deletions src/query/expression/src/converts/arrow/to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_array::LargeListArray;
use arrow_array::MapArray;
use arrow_array::RecordBatch;
use arrow_array::RecordBatchOptions;
use arrow_array::StructArray;
use arrow_schema::DataType as ArrowDataType;
use arrow_schema::Field as ArrowField;
use arrow_schema::Fields;
use arrow_schema::Schema as ArrowSchema;
use databend_common_arrow::arrow::datatypes::DataType as Arrow2DataType;
use databend_common_arrow::arrow::datatypes::Field as Arrow2Field;
use databend_common_exception::Result;

use super::EXTENSION_KEY;
use crate::infer_table_schema;
use crate::Column;
use crate::DataBlock;
Expand All @@ -43,7 +41,7 @@ impl From<&DataSchema> for ArrowSchema {
let fields = schema
.fields
.iter()
.map(|f| arrow_field_from_arrow2_field(Arrow2Field::from(f)))
.map(|f| ArrowField::from(Arrow2Field::from(f)))
.collect::<Vec<_>>();
ArrowSchema {
fields: Fields::from(fields),
Expand All @@ -57,7 +55,7 @@ impl From<&TableSchema> for ArrowSchema {
let fields = schema
.fields
.iter()
.map(|f| arrow_field_from_arrow2_field(Arrow2Field::from(f)))
.map(|f| ArrowField::from(Arrow2Field::from(f)))
.collect::<Vec<_>>();
ArrowSchema {
fields: Fields::from(fields),
Expand All @@ -70,7 +68,7 @@ pub fn table_schema_to_arrow_schema(schema: &TableSchema) -> ArrowSchema {
let fields = schema
.fields
.iter()
.map(|f| arrow_field_from_arrow2_field(f.into()))
.map(|f| ArrowField::from(Arrow2Field::from(f)))
.collect::<Vec<_>>();
ArrowSchema {
fields: Fields::from(fields),
Expand All @@ -80,13 +78,13 @@ pub fn table_schema_to_arrow_schema(schema: &TableSchema) -> ArrowSchema {

impl From<&TableField> for ArrowField {
fn from(field: &TableField) -> Self {
arrow_field_from_arrow2_field(Arrow2Field::from(field))
ArrowField::from(Arrow2Field::from(field))
}
}

impl From<&DataField> for ArrowField {
fn from(field: &DataField) -> Self {
arrow_field_from_arrow2_field(Arrow2Field::from(field))
ArrowField::from(Arrow2Field::from(field))
}
}

Expand All @@ -98,6 +96,14 @@ impl DataBlock {
}

pub fn to_record_batch(self, table_schema: &TableSchema) -> Result<RecordBatch> {
if table_schema.num_fields() == 0 {
return Ok(RecordBatch::try_new_with_options(
Arc::new(ArrowSchema::empty()),
vec![],
&RecordBatchOptions::default().with_row_count(Some(self.num_rows())),
)?);
}

let arrow_schema = table_schema_to_arrow_schema(table_schema);
let mut arrays = Vec::with_capacity(self.columns().len());
for (entry, arrow_field) in self
Expand Down Expand Up @@ -166,34 +172,3 @@ impl Column {
arrow_array
}
}

fn arrow_field_from_arrow2_field(field: Arrow2Field) -> ArrowField {
let mut metadata = HashMap::new();

let arrow2_data_type = if let Arrow2DataType::Extension(extension_type, ty, _) = field.data_type
{
metadata.insert(EXTENSION_KEY.to_string(), extension_type.clone());
*ty
} else {
field.data_type
};

let data_type = match arrow2_data_type {
Arrow2DataType::List(f) => ArrowDataType::List(Arc::new(arrow_field_from_arrow2_field(*f))),
Arrow2DataType::LargeList(f) => {
ArrowDataType::LargeList(Arc::new(arrow_field_from_arrow2_field(*f)))
}
Arrow2DataType::FixedSizeList(f, size) => {
ArrowDataType::FixedSizeList(Arc::new(arrow_field_from_arrow2_field(*f)), size as _)
}
Arrow2DataType::Map(f, ordered) => {
ArrowDataType::Map(Arc::new(arrow_field_from_arrow2_field(*f)), ordered)
}
Arrow2DataType::Struct(f) => {
ArrowDataType::Struct(f.into_iter().map(arrow_field_from_arrow2_field).collect())
}
other => other.into(),
};

ArrowField::new(field.name, data_type, field.is_nullable).with_metadata(metadata)
}
20 changes: 6 additions & 14 deletions src/query/expression/src/converts/arrow2/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,12 @@ impl Column {
(DataType::Null, ArrowDataType::Null) => Column::Null {
len: arrow_col.len(),
},
(DataType::EmptyArray, ArrowDataType::Extension(name, _, _))
if name == ARROW_EXT_TYPE_EMPTY_ARRAY =>
{
Column::EmptyArray {
len: arrow_col.len(),
}
}
(DataType::EmptyMap, ArrowDataType::Extension(name, _, _))
if name == ARROW_EXT_TYPE_EMPTY_MAP =>
{
Column::EmptyMap {
len: arrow_col.len(),
}
}
(DataType::EmptyArray, _) => Column::EmptyArray {
len: arrow_col.len(),
},
(DataType::EmptyMap, _) => Column::EmptyMap {
len: arrow_col.len(),
},
(DataType::Number(NumberDataType::UInt8), ArrowDataType::UInt8) => {
Column::Number(NumberColumn::UInt8(
arrow_col
Expand Down
Loading
Loading