Skip to content

Commit

Permalink
Properly plan subqueries with async UDFs (#780)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Nov 2, 2024
1 parent 347c2f7 commit 5aa08b7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
16 changes: 4 additions & 12 deletions crates/arroyo-planner/src/extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,20 @@ use datafusion::logical_expr::{
};
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr;
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use datafusion_proto::protobuf::ProjectionNode;
use prost::Message;
use watermark_node::WatermarkNode;

use crate::builder::{NamedNode, Planner};
use crate::schemas::{add_timestamp_field, has_timestamp_field};
use crate::{fields_with_qualifiers, schema_from_df_fields, DFField, ASYNC_RESULT_FIELD};
use join::JoinExtension;

use self::debezium::{DebeziumUnrollingExtension, ToDebeziumExtension};
use self::updating_aggregate::UpdatingAggregateExtension;
use self::{
aggregate::AggregateExtension, key_calculation::KeyCalculationExtension,
remote_table::RemoteTableExtension, sink::SinkExtension, table_source::TableSourceExtension,
window_fn::WindowFunctionExtension,
};
use crate::builder::{NamedNode, Planner};
use crate::schemas::{add_timestamp_field, has_timestamp_field};
use crate::{fields_with_qualifiers, schema_from_df_fields, DFField, ASYNC_RESULT_FIELD};
use join::JoinExtension;

pub(crate) mod aggregate;
pub(crate) mod debezium;
Expand Down Expand Up @@ -207,12 +205,6 @@ impl ArroyoExtension for AsyncUDFExtension {
})
.collect::<Result<Vec<_>>>()?;

ProjectionNode {
input: None,
expr: vec![],
optional_alias: None,
};

let config = AsyncUdfOperator {
name: self.name.clone(),
udf: Some(self.udf.clone().into()),
Expand Down
18 changes: 17 additions & 1 deletion crates/arroyo-planner/src/rewriters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,25 @@ impl<'a> TreeNodeRewriter for AsyncUdfRewriter<'a> {

let udf = self.provider.dylib_udfs.get(&name).unwrap().clone();

let input = if matches!(*projection.input, LogicalPlan::Projection(..)) {
// if our input is a projection, we need to plan it separately -- this happens
// for subqueries

Arc::new(LogicalPlan::Extension(Extension {
node: Arc::new(RemoteTableExtension {
input: (*projection.input).clone(),
name: TableReference::bare("subquery_projection"),
schema: projection.input.schema().clone(),
materialize: false,
}),
}))
} else {
projection.input
};

Ok(Transformed::yes(LogicalPlan::Extension(Extension {
node: Arc::new(AsyncUDFExtension {
input: projection.input,
input,
name,
udf,
arg_exprs: args,
Expand Down

0 comments on commit 5aa08b7

Please sign in to comment.