Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ANSI support for Add #616

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 202 additions & 0 deletions core/src/execution/datafusion/expressions/binary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::fmt::{Display, Formatter};
use std::hash::Hasher;
use std::sync::Arc;

use arrow_array::{BooleanArray, RecordBatch};
use arrow_schema::{DataType, Schema};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::{ColumnarValue, Operator};
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_physical_expr_common::physical_expr::{down_cast_any_ref, PhysicalExpr};

use crate::execution::datafusion::expressions::EvalMode;

use super::arithmetic_overflow_error;

#[derive(Debug, Hash, Clone)]
pub struct CometBinaryExpr {
left: Arc<dyn PhysicalExpr>,
op: Operator,
right: Arc<dyn PhysicalExpr>,
eval_mode: EvalMode,
inner: Arc<BinaryExpr>,
}

impl CometBinaryExpr {
pub fn new(
left: Arc<dyn PhysicalExpr>,
op: Operator,
right: Arc<dyn PhysicalExpr>,
eval_mode: EvalMode,
) -> Self {
Self {
left: Arc::clone(&left),
op,
right: Arc::clone(&right),
eval_mode,
inner: Arc::new(BinaryExpr::new(left, op, right)),
}
}

fn fail_on_overflow(&self, batch: &RecordBatch, result: &ColumnarValue) -> Result<()> {
if self.eval_mode == EvalMode::Ansi && self.op == Operator::Plus {
match result.data_type() {
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
self.check_int_overflow(batch, result)?
}
_ => {}
}
}
Ok(())
}

fn check_int_overflow(&self, batch: &RecordBatch, result: &ColumnarValue) -> Result<()> {
let check_overflow_expr = Arc::new(BinaryExpr::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since arrow provide overflow checked kernels (apache/arrow-rs#2643) does it makes sense to use those directly rather than re-implementing it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to review it. Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @eejbyfeldt , I've been looking at datafusion and I don't see any options to use those arrow operations from datafusion physical expressions. Do you know if this is implemented yet?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@planga82 Can we make use of this arithmetic kernel https://docs.rs/arrow/latest/arrow/compute/kernels/numeric/fn.add.html to compute the addition and throw error based on the result.

Copy link
Contributor

@dharanad dharanad Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do something like this ?

    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
        use arrow::compute::kernels::numeric::add;
        let lhs = self.left.evaluate(batch)?;
        let rhs = self.left.evaluate(batch)?;
        match (self.op, self.eval_mode) {
            (Operator::Plus, EvalMode::Ansi)  => {
              return apply(&lhs, &rhs, add)
            },
            _ => {
                self.inner.evaluate(batch)
            }
        }
    }

But the visibility of apply fn is more restrcited pub (crate). We might need to raise a PR with datafusion to make it public

Copy link
Contributor Author

@planga82 planga82 Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that if we directly use the kernels to perform the operations instead of reusing the physical datafusion expression we may lose functionality or have to reimplement it here.
From my point of view in comet we should try to translate from Spark to Datafusuion and add in the form of Wrapper the functionality that may be missing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well put. I agree to what you are saying. I was thinking to override the implementation but your thoughts makes much more sense, safer & cleaner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @eejbyfeldt that we should just use the existing add_checked or add_wrapped kernels in arrow-rs that already provide the functionality that we need (unless we discover any compatibility issue compared to the JVM addExact logic). I will create an example to demonstrate how to use this and will post here later today

Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
self.left.clone(),
Operator::BitwiseXor,
self.inner.clone(),
)),
Operator::BitwiseAnd,
Arc::new(BinaryExpr::new(
self.right.clone(),
Operator::BitwiseXor,
self.inner.clone(),
)),
)),
Operator::Lt,
Self::zero_literal(&result.data_type())?,
));
match check_overflow_expr.evaluate(batch)? {
Comment on lines +75 to +91
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very expensive way of implementing this. We don't need to use DataFusion to perform simple math operations when we can just implement this in Rust directly as we process the arrays. I think we can delegate to arrow-rs though and avoid all of this. As stated in another comment, I will provide an example later today.

ColumnarValue::Array(array) => {
let boolean_array = array
.as_any()
.downcast_ref::<BooleanArray>()
.expect("Expected BooleanArray");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are using expect here this function may panic, can instead return an errors ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solved! thanks

if boolean_array.true_count() > 0 {
return Err(arithmetic_overflow_error(&result.data_type().to_string()).into());
}
Ok(())
}
ColumnarValue::Scalar(ScalarValue::Boolean(Some(true))) => {
Err(arithmetic_overflow_error(&result.data_type().to_string()).into())
}
_ => Ok(()),
}
}

fn zero_literal(data_type: &DataType) -> Result<Arc<dyn PhysicalExpr>> {
let zero_literal = match data_type {
DataType::Int8 => ScalarValue::Int8(Some(0)),
DataType::Int16 => ScalarValue::Int16(Some(0)),
DataType::Int32 => ScalarValue::Int32(Some(0)),
DataType::Int64 => ScalarValue::Int64(Some(0)),
_ => {
return Err(DataFusionError::Internal(format!(
"Unsupported data type: {:?}",
data_type
)))
}
};
Ok(Arc::new(Literal::new(zero_literal)))
}
}

impl Display for CometBinaryExpr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.inner.fmt(f)
}
}

impl PhysicalExpr for CometBinaryExpr {
fn as_any(&self) -> &dyn Any {
self.inner.as_any()
}

fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
self.inner.data_type(input_schema)
}

fn nullable(&self, input_schema: &Schema) -> Result<bool> {
self.inner.nullable(input_schema)
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
match self.inner.evaluate(batch) {
Ok(result) => {
self.fail_on_overflow(batch, &result)?;
Comment on lines +155 to +157
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Evaluating the same expression twice is going to be expensive. We should just evaluate once, either checking for overflows, or not, depending on eval mode.

Ok(result)
}
Err(e) => Err(e),
}
}

fn evaluate_selection(
&self,
batch: &RecordBatch,
selection: &BooleanArray,
) -> Result<ColumnarValue> {
self.inner.evaluate_selection(batch, selection)
}

fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
self.inner.children()
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
Arc::clone(&self.inner).with_new_children(children)
}

fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
self.inner.evaluate_bounds(children)
}

fn propagate_constraints(
&self,
interval: &Interval,
children: &[&Interval],
) -> Result<Option<Vec<Interval>>> {
self.inner.propagate_constraints(interval, children)
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
self.inner.dyn_hash(state)
}

fn get_properties(&self, children: &[ExprProperties]) -> Result<ExprProperties> {
self.inner.get_properties(children)
}
}

impl PartialEq<dyn Any> for CometBinaryExpr {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| self.left.eq(&x.left) && self.op == x.op && self.right.eq(&x.right))
.unwrap_or(false)
}
}
1 change: 1 addition & 0 deletions core/src/execution/datafusion/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{errors::CometError, execution::spark_expression};
pub mod abs;
pub mod avg;
pub mod avg_decimal;
pub mod binary;
pub mod bloom_filter_might_contain;
pub mod correlation;
pub mod covariance;
Expand Down
31 changes: 22 additions & 9 deletions core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ use crate::{
},
};

use super::expressions::{abs::CometAbsFunc, EvalMode};
use super::expressions::{abs::CometAbsFunc, binary::CometBinaryExpr, EvalMode};

// For clippy error on type_complexity.
type ExecResult<T> = Result<T, ExecutionError>;
Expand Down Expand Up @@ -160,40 +160,52 @@ impl PhysicalPlanner {
input_schema: SchemaRef,
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
match spark_expr.expr_struct.as_ref().unwrap() {
ExprStruct::Add(expr) => self.create_binary_expr(
expr.left.as_ref().unwrap(),
expr.right.as_ref().unwrap(),
expr.return_type.as_ref(),
DataFusionOperator::Plus,
input_schema,
),
ExprStruct::Add(expr) => {
let mode = if expr.fail_on_error {
EvalMode::Ansi
} else {
EvalMode::Legacy
};
self.create_binary_expr(
expr.left.as_ref().unwrap(),
expr.right.as_ref().unwrap(),
expr.return_type.as_ref(),
DataFusionOperator::Plus,
input_schema,
mode,
)
}
ExprStruct::Subtract(expr) => self.create_binary_expr(
expr.left.as_ref().unwrap(),
expr.right.as_ref().unwrap(),
expr.return_type.as_ref(),
DataFusionOperator::Minus,
input_schema,
EvalMode::Legacy,
),
ExprStruct::Multiply(expr) => self.create_binary_expr(
expr.left.as_ref().unwrap(),
expr.right.as_ref().unwrap(),
expr.return_type.as_ref(),
DataFusionOperator::Multiply,
input_schema,
EvalMode::Legacy,
),
ExprStruct::Divide(expr) => self.create_binary_expr(
expr.left.as_ref().unwrap(),
expr.right.as_ref().unwrap(),
expr.return_type.as_ref(),
DataFusionOperator::Divide,
input_schema,
EvalMode::Legacy,
),
ExprStruct::Remainder(expr) => self.create_binary_expr(
expr.left.as_ref().unwrap(),
expr.right.as_ref().unwrap(),
expr.return_type.as_ref(),
DataFusionOperator::Modulo,
input_schema,
EvalMode::Legacy,
),
ExprStruct::Eq(expr) => {
let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?;
Expand Down Expand Up @@ -627,6 +639,7 @@ impl PhysicalPlanner {
return_type: Option<&spark_expression::DataType>,
op: DataFusionOperator,
input_schema: SchemaRef,
eval_mode: EvalMode,
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
let left = self.create_expr(left, input_schema.clone())?;
let right = self.create_expr(right, input_schema.clone())?;
Expand Down Expand Up @@ -681,7 +694,7 @@ impl PhysicalPlanner {
data_type,
)))
}
_ => Ok(Arc::new(BinaryExpr::new(left, op, right))),
_ => Ok(Arc::new(CometBinaryExpr::new(left, op, right, eval_mode))),
}
}

Expand Down
Loading
Loading