Skip to content

Commit

Permalink
Merge branch 'main' into fix-virtual-column-write
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li authored Oct 23, 2024
2 parents 1480c5a + a6be8e7 commit 0788ff3
Show file tree
Hide file tree
Showing 54 changed files with 680 additions and 617 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ mutable_key_type = "allow"
result_large_err = "allow"

## DONT'T DELETE THIS: If we want best performance, we should use this profile but it will take longer time to compile.
## Test SQL:
## select sum(number) from numbers_mt(10000000000); ~ 3x performance
## Test SQL:
## select sum(number) from numbers_mt(10000000000); ~ 3x performance
## select max(number) from numbers_mt(10000000000); ~ 3x performance
# [profile.release]
# debug = 1
Expand Down
29 changes: 25 additions & 4 deletions src/common/arrow/src/arrow/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,29 @@ impl Field {
}
}

// For databend's extension key
pub const EXTENSION_KEY: &str = "Extension";

#[cfg(feature = "arrow")]
impl From<Field> for arrow_schema::Field {
fn from(value: Field) -> Self {
Self::new(value.name, value.data_type.into(), value.is_nullable)
.with_metadata(value.metadata.into_iter().collect())
(&value).into()
}
}

#[cfg(feature = "arrow")]
impl From<&Field> for arrow_schema::Field {
fn from(value: &Field) -> Self {
let mut metadata = value.metadata.clone();
let ty = if let DataType::Extension(extension_type, ty, _) = &value.data_type {
metadata.insert(EXTENSION_KEY.to_string(), extension_type.clone());
ty.as_ref().clone()
} else {
value.data_type.clone()
};

Self::new(value.name.clone(), ty.into(), value.is_nullable)
.with_metadata(metadata.into_iter().collect())
}
}

Expand All @@ -89,12 +107,15 @@ impl From<arrow_schema::Field> for Field {
#[cfg(feature = "arrow")]
impl From<&arrow_schema::Field> for Field {
fn from(value: &arrow_schema::Field) -> Self {
let data_type = value.data_type().clone().into();
let metadata = value
let mut data_type = value.data_type().clone().into();
let mut metadata: Metadata = value
.metadata()
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
if let Some(v) = metadata.remove(EXTENSION_KEY) {
data_type = DataType::Extension(v, Box::new(data_type), None);
}
Self::new(value.name(), data_type, value.is_nullable()).with_metadata(metadata)
}
}
Expand Down
1 change: 1 addition & 0 deletions src/common/exception/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ databend-common-arrow = { workspace = true }
databend-common-ast = { workspace = true }

anyhow = { workspace = true }
arrow-flight = { workspace = true }
arrow-schema = { workspace = true }
backtrace = { workspace = true }
bincode = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions src/common/exception/src/exception_flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_arrow::arrow_format::flight::data::FlightData;
use arrow_flight::FlightData;

use crate::ErrorCode;
use crate::Result;
Expand All @@ -24,9 +24,9 @@ impl From<ErrorCode> for FlightData {
serde_json::to_vec::<SerializedError>(&SerializedError::from(&error)).unwrap();

FlightData {
data_body: serialized_error,
app_metadata: vec![0x02],
data_header: error.code().to_be_bytes().to_vec(),
data_body: serialized_error.into(),
app_metadata: vec![0x02].into(),
data_header: error.code().to_be_bytes().to_vec().into(),
flight_descriptor: None,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/exception/tests/it/exception_flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use std::sync::Arc;

use arrow_flight::FlightData;
use backtrace::Backtrace;
use databend_common_arrow::arrow_format::flight::data::FlightData;
use databend_common_exception::exception::ErrorCodeBacktrace;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down
17 changes: 17 additions & 0 deletions src/meta/api/src/meta_txn_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::ErrorCode;
use databend_common_meta_app::app_error::AppErrorMessage;
use databend_common_meta_app::app_error::TxnRetryMaxTimes;
use databend_common_meta_types::InvalidArgument;
use databend_common_meta_types::MetaError;
Expand All @@ -33,3 +35,18 @@ impl From<InvalidArgument> for MetaTxnError {
Self::MetaError(MetaError::from(network_error))
}
}

impl From<MetaNetworkError> for MetaTxnError {
fn from(value: MetaNetworkError) -> Self {
Self::MetaError(MetaError::from(value))
}
}

impl From<MetaTxnError> for ErrorCode {
fn from(meta_err: MetaTxnError) -> Self {
match meta_err {
MetaTxnError::TxnRetryMaxTimes(err) => ErrorCode::TxnRetryMaxTimes(err.message()),
MetaTxnError::MetaError(err) => ErrorCode::from(err),
}
}
}
1 change: 1 addition & 0 deletions src/query/expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ test = true
[dependencies]
arrow-array = { workspace = true }
arrow-flight = { workspace = true }
arrow-ipc = { workspace = true, features = ["lz4"] }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
async-backtrace = { workspace = true }
Expand Down
11 changes: 11 additions & 0 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::types::AnyType;
use crate::types::DataType;
use crate::Column;
use crate::ColumnBuilder;
use crate::DataField;
use crate::DataSchemaRef;
use crate::Domain;
use crate::Scalar;
Expand Down Expand Up @@ -589,6 +590,16 @@ impl DataBlock {
debug_assert!(self.columns.last().unwrap().value.as_column().is_some());
self.columns.last().unwrap().value.as_column().unwrap()
}

pub fn infer_schema(&self) -> DataSchema {
let fields = self
.columns()
.iter()
.enumerate()
.map(|(index, e)| DataField::new(&format!("col_{index}"), e.data_type.clone()))
.collect();
DataSchema::new(fields)
}
}

impl TryFrom<DataBlock> for ArrowChunk<ArrayRef> {
Expand Down
54 changes: 19 additions & 35 deletions src/query/expression/src/converts/arrow/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::DataType as ArrowDataType;
use arrow_schema::Field as ArrowField;
use arrow_schema::Field;
use arrow_schema::Schema as ArrowSchema;
use databend_common_arrow::arrow::datatypes::DataType as Arrow2DataType;
use databend_common_arrow::arrow::datatypes::Field as Arrow2Field;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

use super::EXTENSION_KEY;
use crate::types::DataType;
use crate::Column;
use crate::DataBlock;
Expand All @@ -32,17 +29,30 @@ use crate::DataSchema;
use crate::TableField;
use crate::TableSchema;

impl TryFrom<&ArrowSchema> for DataSchema {
impl TryFrom<&Field> for DataField {
type Error = ErrorCode;
fn try_from(arrow_f: &Field) -> Result<DataField> {
Ok(DataField::from(&TableField::try_from(arrow_f)?))
}
}

impl TryFrom<&Field> for TableField {
type Error = ErrorCode;
fn try_from(arrow_f: &Field) -> Result<TableField> {
TableField::try_from(&Arrow2Field::from(arrow_f))
}
}

impl TryFrom<&ArrowSchema> for DataSchema {
type Error = ErrorCode;
fn try_from(schema: &ArrowSchema) -> Result<DataSchema> {
let fields = schema
.fields
.iter()
.map(|arrow_f| {
Ok(DataField::from(&TableField::try_from(
&arrow2_field_from_arrow_field(arrow_f),
)?))
Ok(DataField::from(&TableField::try_from(&Arrow2Field::from(
arrow_f,
))?))
})
.collect::<Result<Vec<_>>>()?;
Ok(DataSchema::new_from(
Expand All @@ -54,12 +64,11 @@ impl TryFrom<&ArrowSchema> for DataSchema {

impl TryFrom<&ArrowSchema> for TableSchema {
type Error = ErrorCode;

fn try_from(schema: &ArrowSchema) -> Result<TableSchema> {
let fields = schema
.fields
.iter()
.map(|arrow_f| TableField::try_from(&arrow2_field_from_arrow_field(arrow_f)))
.map(|arrow_f| TableField::try_from(&Arrow2Field::from(arrow_f)))
.collect::<Result<Vec<_>>>()?;
Ok(TableSchema::new_from(
fields,
Expand Down Expand Up @@ -108,28 +117,3 @@ impl Column {
Column::from_arrow(arrow2_array.as_ref(), data_type)
}
}

fn arrow2_field_from_arrow_field(field: &ArrowField) -> Arrow2Field {
let mut data_type = match field.data_type() {
ArrowDataType::List(f) => Arrow2DataType::List(Box::new(arrow2_field_from_arrow_field(f))),
ArrowDataType::LargeList(f) => {
Arrow2DataType::LargeList(Box::new(arrow2_field_from_arrow_field(f)))
}
ArrowDataType::FixedSizeList(f, size) => {
Arrow2DataType::FixedSizeList(Box::new(arrow2_field_from_arrow_field(f)), *size as _)
}
ArrowDataType::Map(f, ordered) => {
Arrow2DataType::Map(Box::new(arrow2_field_from_arrow_field(f)), *ordered)
}
ArrowDataType::Struct(f) => {
Arrow2DataType::Struct(f.iter().map(|f| arrow2_field_from_arrow_field(f)).collect())
}
other => other.clone().into(),
};

if let Some(extension_type) = field.metadata().get(EXTENSION_KEY) {
data_type = Arrow2DataType::Extension(extension_type.clone(), Box::new(data_type), None);
}

Arrow2Field::new(field.name(), data_type, field.is_nullable())
}
Loading

0 comments on commit 0788ff3

Please sign in to comment.