From a431d753999fd663fe9ee5f1cf45a17960113599 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Wed, 16 Oct 2024 12:58:42 -0700 Subject: [PATCH] Add a new `arroyo visualize` command to help debug query plans (#757) --- Cargo.lock | 38 +++++++ .../src/blackhole/operator.rs | 1 + .../filesystem/sink/two_phase_committer.rs | 4 +- crates/arroyo-connectors/src/fluvio/sink.rs | 10 ++ .../arroyo-connectors/src/kafka/sink/mod.rs | 34 +++++- crates/arroyo-operator/src/operator.rs | 88 +++++++++++++++ crates/arroyo-worker/src/arrow/async_udf.rs | 39 ++++++- .../arroyo-worker/src/arrow/instant_join.rs | 30 +++-- .../src/arrow/join_with_expiration.rs | 36 ++++-- crates/arroyo-worker/src/arrow/mod.rs | 22 +++- .../src/arrow/sliding_aggregating_window.rs | 35 ++++-- .../src/arrow/tumbling_aggregating_window.rs | 48 ++++++-- .../src/arrow/updating_aggregator.rs | 41 ++++++- .../src/arrow/watermark_generator.rs | 16 ++- crates/arroyo-worker/src/lib.rs | 4 + crates/arroyo-worker/src/utils.rs | 104 ++++++++++++++++++ crates/arroyo/Cargo.toml | 3 + crates/arroyo/src/main.rs | 59 +++++++++- 18 files changed, 559 insertions(+), 53 deletions(-) create mode 100644 crates/arroyo-worker/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index f6ab24d10..81d65e691 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,6 +424,7 @@ dependencies = [ "arroyo-compiler-service", "arroyo-connectors", "arroyo-controller", + "arroyo-df", "arroyo-node", "arroyo-openapi", "arroyo-rpc", @@ -438,6 +439,7 @@ dependencies = [ "cornucopia_async", "deadpool-postgres", "dirs", + "open", "postgres-types", "rand 0.8.5", "refinery", @@ -4907,6 +4909,15 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "is-docker" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "928bae27f42bc99b60d9ac7334e3a21d10ad8f1835a4e12ec3ec0464765ed1b3" +dependencies = [ + "once_cell", +] + [[package]] name = "is-terminal" version = "0.4.12" @@ -4918,6 +4929,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "is-wsl" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "173609498df190136aa7dea1a91db051746d339e18476eed5ca40521f02d7aa5" +dependencies = [ + "is-docker", + "once_cell", +] + [[package]] name = "is_ci" version = "1.2.0" @@ -5904,6 +5925,17 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "open" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61a877bf6abd716642a53ef1b89fb498923a4afca5c754f9050b4d081c05c4b3" +dependencies = [ + "is-wsl", + "libc", + "pathdiff", +] + [[package]] name = "openapiv3" version = "2.0.0" @@ -6160,6 +6192,12 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pathdiff" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d61c5ce1153ab5b689d0c074c4e7fc613e942dfb7dd9eea5ab202d2ad91fe361" + [[package]] name = "pear" version = "0.2.9" diff --git a/crates/arroyo-connectors/src/blackhole/operator.rs b/crates/arroyo-connectors/src/blackhole/operator.rs index 7a549f93c..e5ca2eec3 100644 --- a/crates/arroyo-connectors/src/blackhole/operator.rs +++ b/crates/arroyo-connectors/src/blackhole/operator.rs @@ -3,6 +3,7 @@ use arroyo_operator::context::ArrowContext; use arroyo_operator::operator::ArrowOperator; use async_trait::async_trait; +#[derive(Debug)] pub struct BlackholeSinkFunc {} impl BlackholeSinkFunc { diff --git a/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs b/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs index 5f1c7c6b3..c75d7079f 100644 --- a/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs +++ b/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs @@ -1,5 +1,3 @@ -use std::{collections::HashMap, time::SystemTime}; - use anyhow::Result; use arrow::record_batch::RecordBatch; use arroyo_operator::{context::ArrowContext, operator::ArrowOperator}; @@ -12,6 +10,8 @@ use arroyo_types::{Data, SignalMessage, TaskInfo, Watermark}; use async_trait::async_trait; use bincode::config; use prost::Message; +use std::fmt::Debug; +use std::{collections::HashMap, time::SystemTime}; use tracing::{info, warn}; pub struct TwoPhaseCommitterOperator { diff --git a/crates/arroyo-connectors/src/fluvio/sink.rs b/crates/arroyo-connectors/src/fluvio/sink.rs index 62de04bbf..edf1ad513 100644 --- a/crates/arroyo-connectors/src/fluvio/sink.rs +++ b/crates/arroyo-connectors/src/fluvio/sink.rs @@ -1,6 +1,7 @@ use arrow::array::RecordBatch; use async_trait::async_trait; use fluvio::{Fluvio, FluvioConfig, TopicProducerPool}; +use std::fmt::Debug; use arroyo_formats::ser::ArrowSerializer; use tracing::info; @@ -16,6 +17,15 @@ pub struct FluvioSinkFunc { pub serializer: ArrowSerializer, } +impl Debug for FluvioSinkFunc { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FluvioSinkFunc") + .field("topic", &self.topic) + .field("endpoint", &self.endpoint) + .finish() + } +} + #[async_trait] impl ArrowOperator for FluvioSinkFunc { fn name(&self) -> String { diff --git a/crates/arroyo-connectors/src/kafka/sink/mod.rs b/crates/arroyo-connectors/src/kafka/sink/mod.rs index 520629f8c..5d756d3c2 100644 --- a/crates/arroyo-connectors/src/kafka/sink/mod.rs +++ b/crates/arroyo-connectors/src/kafka/sink/mod.rs @@ -1,10 +1,11 @@ use anyhow::Result; +use std::borrow::Cow; use arroyo_rpc::grpc::rpc::{GlobalKeyedTableConfig, TableConfig, TableEnum}; use arroyo_rpc::{CheckpointEvent, ControlMessage, ControlResp}; use arroyo_types::*; use std::collections::HashMap; - +use std::fmt::{Display, Formatter}; use tracing::{error, warn}; use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer}; @@ -17,7 +18,7 @@ use arrow::array::{Array, AsArray, RecordBatch}; use arrow::datatypes::{DataType, TimeUnit}; use arroyo_formats::ser::ArrowSerializer; use arroyo_operator::context::ArrowContext; -use arroyo_operator::operator::ArrowOperator; +use arroyo_operator::operator::{ArrowOperator, AsDisplayable, DisplayableOperator}; use arroyo_rpc::df::ArroyoSchema; use arroyo_types::CheckpointBarrier; use async_trait::async_trait; @@ -50,6 +51,15 @@ pub enum ConsistencyMode { }, } +impl Display for ConsistencyMode { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + ConsistencyMode::AtLeastOnce => write!(f, "AtLeastOnce"), + ConsistencyMode::ExactlyOnce { .. } => write!(f, "ExactlyOnce"), + } + } +} + impl From for ConsistencyMode { fn from(commit_mode: SinkCommitMode) -> Self { match commit_mode { @@ -219,6 +229,26 @@ impl ArrowOperator for KafkaSinkFunc { format!("kafka-producer-{}", self.topic) } + fn display(&self) -> DisplayableOperator { + DisplayableOperator { + name: Cow::Borrowed("KafkaSinkFunc"), + fields: vec![ + ("topic", self.topic.as_str().into()), + ("bootstrap_servers", self.bootstrap_servers.as_str().into()), + ( + "consistency_mode", + AsDisplayable::Display(&self.consistency_mode), + ), + ( + "timestamp_field", + AsDisplayable::Debug(&self.timestamp_field), + ), + ("key_field", AsDisplayable::Debug(&self.key_field)), + ("client_config", AsDisplayable::Debug(&self.client_config)), + ], + } + } + fn tables(&self) -> HashMap { if self.is_committing() { single_item_hash_map( diff --git a/crates/arroyo-operator/src/operator.rs b/crates/arroyo-operator/src/operator.rs index cd0acf9ac..76e55b818 100644 --- a/crates/arroyo-operator/src/operator.rs +++ b/crates/arroyo-operator/src/operator.rs @@ -5,6 +5,7 @@ use crate::{CheckpointCounter, ControlOutcome, SourceFinishType}; use anyhow::anyhow; use arrow::array::RecordBatch; use arrow::datatypes::DataType; +use arrow::datatypes::Schema; use arroyo_datastream::logical::{DylibUdfConfig, PythonUdfConfig}; use arroyo_metrics::TaskCounters; use arroyo_rpc::grpc::rpc::{TableConfig, TaskCheckpointEventType}; @@ -22,10 +23,13 @@ use datafusion::logical_expr::planner::ExprPlanner; use datafusion::logical_expr::{ create_udaf, AggregateUDF, ScalarUDF, Signature, TypeSignature, Volatility, WindowUDF, }; +use datafusion::physical_plan::{displayable, ExecutionPlan}; use dlopen2::wrapper::Container; use futures::future::OptionFuture; use std::any::Any; +use std::borrow::Cow; use std::collections::{HashMap, HashSet}; +use std::fmt::{Debug, Display, Formatter}; use std::future::Future; use std::path::Path; use std::pin::Pin; @@ -68,6 +72,16 @@ impl OperatorNode { } } + pub fn display(&self) -> DisplayableOperator { + match self { + OperatorNode::Source(_) => DisplayableOperator { + name: self.name().into(), + fields: vec![], + }, + OperatorNode::Operator(op) => op.display(), + } + } + pub fn tables(&self) -> HashMap { match self { OperatorNode::Source(s) => s.tables(), @@ -314,6 +328,73 @@ async fn operator_run_behavior( final_message } +pub enum AsDisplayable<'a> { + Str(&'a str), + String(String), + Display(&'a dyn Display), + Debug(&'a dyn Debug), + Plan(&'a dyn ExecutionPlan), + Schema(&'a Schema), +} + +impl<'a> From<&'a str> for AsDisplayable<'a> { + fn from(s: &'a str) -> Self { + AsDisplayable::Str(s) + } +} + +impl From for AsDisplayable<'_> { + fn from(s: String) -> Self { + AsDisplayable::String(s) + } +} + +impl<'a> From<&'a dyn ExecutionPlan> for AsDisplayable<'a> { + fn from(p: &'a dyn ExecutionPlan) -> Self { + AsDisplayable::Plan(p) + } +} + +impl<'a> From<&'a Schema> for AsDisplayable<'a> { + fn from(s: &'a Schema) -> Self { + AsDisplayable::Schema(s) + } +} + +impl Display for AsDisplayable<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + AsDisplayable::Str(s) => { + write!(f, "{}", s) + } + AsDisplayable::String(s) => { + write!(f, "{}", s) + } + AsDisplayable::Display(d) => { + write!(f, "{}", d) + } + AsDisplayable::Debug(d) => { + write!(f, "{:?}", d) + } + AsDisplayable::Plan(p) => { + write!(f, "`{}`", displayable(*p).one_line()) + } + AsDisplayable::Schema(s) => { + for field in s.fields() { + write!(f, "\n * {}: {:?}, ", field.name(), field.data_type())?; + } + + Ok(()) + } + } + } +} + +pub struct DisplayableOperator<'a> { + pub name: Cow<'a, str>, + pub fields: Vec<(&'static str, AsDisplayable<'a>)>, +} + #[async_trait::async_trait] pub trait ArrowOperator: Send + 'static { async fn handle_watermark_int(&mut self, watermark: Watermark, ctx: &mut ArrowContext) { @@ -461,6 +542,13 @@ pub trait ArrowOperator: Send + 'static { None } + fn display(&self) -> DisplayableOperator { + DisplayableOperator { + name: self.name().into(), + fields: vec![], + } + } + #[allow(unused_variables)] async fn on_start(&mut self, ctx: &mut ArrowContext) {} diff --git a/crates/arroyo-worker/src/arrow/async_udf.rs b/crates/arroyo-worker/src/arrow/async_udf.rs index 9fa93becc..147e0ec42 100644 --- a/crates/arroyo-worker/src/arrow/async_udf.rs +++ b/crates/arroyo-worker/src/arrow/async_udf.rs @@ -5,7 +5,9 @@ use arrow_schema::{Field, Schema}; use arroyo_datastream::logical::DylibUdfConfig; use arroyo_df::ASYNC_RESULT_FIELD; use arroyo_operator::context::ArrowContext; -use arroyo_operator::operator::{ArrowOperator, OperatorConstructor, OperatorNode, Registry}; +use arroyo_operator::operator::{ + ArrowOperator, AsDisplayable, DisplayableOperator, OperatorConstructor, OperatorNode, Registry, +}; use arroyo_rpc::grpc::api; use arroyo_rpc::grpc::rpc::TableConfig; use arroyo_state::global_table_config; @@ -18,7 +20,9 @@ use datafusion_proto::physical_plan::from_proto::parse_physical_expr; use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec; use datafusion_proto::protobuf::PhysicalExprNode; use prost::Message; +use std::borrow::Cow; use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; use tracing::info; @@ -106,6 +110,39 @@ impl ArrowOperator for AsyncUdfOperator { self.name.clone() } + fn display(&self) -> DisplayableOperator { + DisplayableOperator { + name: Cow::Borrowed("AsyncUdfOperator"), + fields: vec![ + ("name", self.name.as_str().into()), + ("ordered", AsDisplayable::Debug(&self.ordered)), + ( + "allowed_in_flight", + AsDisplayable::Debug(&self.allowed_in_flight), + ), + ("timeout", AsDisplayable::Debug(&self.timeout)), + ( + "input_exprs", + self.input_exprs + .iter() + .map(|e| format!("{}", e)) + .collect::>() + .join(", ") + .into(), + ), + ( + "final_exprs", + self.final_exprs + .iter() + .map(|e| format!("{}", e)) + .collect::>() + .join(", ") + .into(), + ), + ], + } + } + fn tables(&self) -> HashMap { global_table_config("a", "AsyncMapOperator state") } diff --git a/crates/arroyo-worker/src/arrow/instant_join.rs b/crates/arroyo-worker/src/arrow/instant_join.rs index bb459deac..c638714ce 100644 --- a/crates/arroyo-worker/src/arrow/instant_join.rs +++ b/crates/arroyo-worker/src/arrow/instant_join.rs @@ -1,17 +1,12 @@ -use std::{ - any::Any, - collections::{BTreeMap, HashMap}, - pin::Pin, - sync::{Arc, RwLock}, - time::{Duration, SystemTime}, -}; - +use super::sync::streams::KeyedCloneableStreamFuture; use anyhow::Result; use arrow::compute::{max, min, partition, sort_to_indices, take}; use arrow_array::{RecordBatch, TimestampNanosecondArray}; use arroyo_df::physical::{ArroyoPhysicalExtensionCodec, DecodingContext}; use arroyo_operator::context::ArrowContext; -use arroyo_operator::operator::{ArrowOperator, OperatorConstructor, OperatorNode, Registry}; +use arroyo_operator::operator::{ + ArrowOperator, DisplayableOperator, OperatorConstructor, OperatorNode, Registry, +}; use arroyo_rpc::{ df::{ArroyoSchema, ArroyoSchemaRef}, grpc::{api, rpc::TableConfig}, @@ -28,10 +23,16 @@ use datafusion_proto::{physical_plan::AsExecutionPlan, protobuf::PhysicalPlanNod use futures::StreamExt; use futures::{lock::Mutex, stream::FuturesUnordered, Future}; use prost::Message; +use std::borrow::Cow; +use std::{ + any::Any, + collections::{BTreeMap, HashMap}, + pin::Pin, + sync::{Arc, RwLock}, + time::{Duration, SystemTime}, +}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tracing::debug; - -use super::sync::streams::KeyedCloneableStreamFuture; type NextBatchFuture = KeyedCloneableStreamFuture; pub struct InstantJoin { @@ -192,6 +193,13 @@ impl ArrowOperator for InstantJoin { "InstantJoin".to_string() } + fn display(&self) -> DisplayableOperator { + DisplayableOperator { + name: Cow::Borrowed("InstantJoin"), + fields: vec![("join_execution_plan", self.join_exec.as_ref().into())], + } + } + async fn on_start(&mut self, ctx: &mut ArrowContext) { let watermark = ctx.last_present_watermark(); let left_table = ctx diff --git a/crates/arroyo-worker/src/arrow/join_with_expiration.rs b/crates/arroyo-worker/src/arrow/join_with_expiration.rs index e6f9f76f8..f7510aabf 100644 --- a/crates/arroyo-worker/src/arrow/join_with_expiration.rs +++ b/crates/arroyo-worker/src/arrow/join_with_expiration.rs @@ -1,15 +1,11 @@ -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, - time::Duration, -}; - use anyhow::Result; use arrow::compute::concat_batches; use arrow_array::RecordBatch; use arroyo_df::physical::{ArroyoPhysicalExtensionCodec, DecodingContext}; use arroyo_operator::context::ArrowContext; -use arroyo_operator::operator::{ArrowOperator, OperatorConstructor, OperatorNode, Registry}; +use arroyo_operator::operator::{ + ArrowOperator, AsDisplayable, DisplayableOperator, OperatorConstructor, OperatorNode, Registry, +}; use arroyo_rpc::{ df::ArroyoSchema, grpc::{api, rpc::TableConfig}, @@ -21,6 +17,12 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion_proto::{physical_plan::AsExecutionPlan, protobuf::PhysicalPlanNode}; use futures::StreamExt; use prost::Message; +use std::borrow::Cow; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, + time::Duration, +}; use tracing::warn; pub struct JoinWithExpiration { @@ -140,6 +142,26 @@ impl ArrowOperator for JoinWithExpiration { "JoinWithExpiration".to_string() } + fn display(&self) -> DisplayableOperator { + DisplayableOperator { + name: Cow::Borrowed("JoinWithExpiration"), + fields: vec![ + ( + "left_expiration", + AsDisplayable::Debug(&self.left_expiration), + ), + ( + "right_expiration", + AsDisplayable::Debug(&self.right_expiration), + ), + ( + "join_execution_plan", + self.join_execution_plan.as_ref().into(), + ), + ], + } + } + async fn process_batch(&mut self, _record_batch: RecordBatch, _ctx: &mut ArrowContext) { unreachable!(); } diff --git a/crates/arroyo-worker/src/arrow/mod.rs b/crates/arroyo-worker/src/arrow/mod.rs index f15e5a727..1dbdd3733 100644 --- a/crates/arroyo-worker/src/arrow/mod.rs +++ b/crates/arroyo-worker/src/arrow/mod.rs @@ -3,7 +3,9 @@ use arrow_array::RecordBatch; use arroyo_df::physical::ArroyoPhysicalExtensionCodec; use arroyo_df::physical::DecodingContext; use arroyo_operator::context::ArrowContext; -use arroyo_operator::operator::{ArrowOperator, OperatorConstructor, OperatorNode, Registry}; +use arroyo_operator::operator::{ + ArrowOperator, AsDisplayable, DisplayableOperator, OperatorConstructor, OperatorNode, Registry, +}; use arroyo_rpc::grpc::api; use datafusion::common::DataFusionError; use datafusion::common::Result as DFResult; @@ -18,6 +20,7 @@ use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf::PhysicalPlanNode; use futures::StreamExt; use prost::Message as ProstMessage; +use std::borrow::Cow; use std::sync::Arc; use std::sync::RwLock; @@ -61,6 +64,13 @@ impl ArrowOperator for ValueExecutionOperator { self.name.clone() } + fn display(&self) -> DisplayableOperator { + DisplayableOperator { + name: (&self.name).into(), + fields: vec![("plan", (&*self.executor.plan).into())], + } + } + async fn process_batch(&mut self, record_batch: RecordBatch, ctx: &mut ArrowContext) { let mut records = self.executor.process_batch(record_batch).await; while let Some(batch) = records.next().await { @@ -176,6 +186,16 @@ impl ArrowOperator for KeyExecutionOperator { self.name.clone() } + fn display(&self) -> DisplayableOperator { + DisplayableOperator { + name: Cow::Borrowed(&self.name), + fields: vec![ + ("key_fields", AsDisplayable::Debug(&self.key_fields)), + ("plan", AsDisplayable::Plan(self.executor.plan.as_ref())), + ], + } + } + async fn process_batch(&mut self, batch: RecordBatch, ctx: &mut ArrowContext) { let mut records = self.executor.process_batch(batch).await; while let Some(batch) = records.next().await { diff --git a/crates/arroyo-worker/src/arrow/sliding_aggregating_window.rs b/crates/arroyo-worker/src/arrow/sliding_aggregating_window.rs index 514daba89..57c8f6a55 100644 --- a/crates/arroyo-worker/src/arrow/sliding_aggregating_window.rs +++ b/crates/arroyo-worker/src/arrow/sliding_aggregating_window.rs @@ -1,10 +1,3 @@ -use std::{ - collections::{BTreeMap, HashMap, VecDeque}, - fmt::{Display, Formatter}, - sync::{Arc, RwLock}, - time::SystemTime, -}; - use anyhow::{anyhow, bail, Result}; use arrow::compute::{partition, sort_to_indices, take}; use arrow_array::{types::TimestampNanosecondType, Array, PrimitiveArray, RecordBatch}; @@ -18,11 +11,18 @@ use arroyo_state::timestamp_table_config; use arroyo_types::{from_nanos, print_time, to_nanos, CheckpointBarrier, Watermark}; use datafusion::common::ScalarValue; use datafusion::{execution::context::SessionContext, physical_plan::ExecutionPlan}; +use std::borrow::Cow; +use std::{ + collections::{BTreeMap, HashMap, VecDeque}, + fmt::{Display, Formatter}, + sync::{Arc, RwLock}, + time::SystemTime, +}; use futures::stream::FuturesUnordered; use arroyo_df::physical::{ArroyoPhysicalExtensionCodec, DecodingContext}; -use arroyo_operator::operator::Registry; +use arroyo_operator::operator::{AsDisplayable, DisplayableOperator, Registry}; use arroyo_rpc::df::ArroyoSchema; use datafusion::execution::{ runtime_env::{RuntimeConfig, RuntimeEnv}, @@ -535,6 +535,25 @@ impl ArrowOperator for SlidingAggregatingWindowFunc { "sliding_window".to_string() } + fn display(&self) -> DisplayableOperator { + DisplayableOperator { + name: Cow::Borrowed("SlidingAggregatingWindowFunc"), + fields: vec![ + ("slide", AsDisplayable::Debug(&self.slide)), + ("width", AsDisplayable::Debug(&self.width)), + ( + "partial_aggregation_plan", + self.partial_aggregation_plan.as_ref().into(), + ), + ( + "finish_execution_plan", + self.finish_execution_plan.as_ref().into(), + ), + ("final_projection", self.final_projection.as_ref().into()), + ], + } + } + async fn on_start(&mut self, ctx: &mut ArrowContext) { let watermark = ctx.last_present_watermark(); let table = ctx diff --git a/crates/arroyo-worker/src/arrow/tumbling_aggregating_window.rs b/crates/arroyo-worker/src/arrow/tumbling_aggregating_window.rs index 8a617fde6..448b243d6 100644 --- a/crates/arroyo-worker/src/arrow/tumbling_aggregating_window.rs +++ b/crates/arroyo-worker/src/arrow/tumbling_aggregating_window.rs @@ -1,26 +1,28 @@ -use std::any::Any; -use std::future::Future; -use std::pin::Pin; -use std::{ - collections::{BTreeMap, HashMap}, - mem, - sync::{Arc, RwLock}, - time::SystemTime, -}; - use anyhow::{anyhow, Result}; use arrow::compute::{partition, sort_to_indices, take}; use arrow_array::{types::TimestampNanosecondType, Array, PrimitiveArray, RecordBatch}; use arrow_schema::SchemaRef; use arroyo_df::schemas::add_timestamp_field_arrow; use arroyo_operator::context::ArrowContext; -use arroyo_operator::operator::{ArrowOperator, OperatorConstructor, OperatorNode, Registry}; +use arroyo_operator::operator::{ + ArrowOperator, AsDisplayable, DisplayableOperator, OperatorConstructor, OperatorNode, Registry, +}; use arroyo_rpc::grpc::{api, rpc::TableConfig}; use arroyo_state::timestamp_table_config; use arroyo_types::{from_nanos, print_time, to_nanos, CheckpointBarrier, Watermark}; use datafusion::common::ScalarValue; use datafusion::{execution::context::SessionContext, physical_plan::ExecutionPlan}; use futures::{stream::FuturesUnordered, StreamExt}; +use std::any::Any; +use std::borrow::Cow; +use std::future::Future; +use std::pin::Pin; +use std::{ + collections::{BTreeMap, HashMap}, + mem, + sync::{Arc, RwLock}, + time::SystemTime, +}; use arroyo_df::physical::{ArroyoPhysicalExtensionCodec, DecodingContext}; use arroyo_rpc::df::ArroyoSchema; @@ -204,6 +206,30 @@ impl ArrowOperator for TumblingAggregatingWindowFunc { "tumbling_window".to_string() } + fn display(&self) -> DisplayableOperator { + DisplayableOperator { + name: Cow::Borrowed("TumblingAggregatingWindowFunc"), + fields: vec![ + ("width", AsDisplayable::Debug(&self.width)), + ( + "partial_aggregation_plan", + self.partial_aggregation_plan.as_ref().into(), + ), + ( + "finish_execution_plan", + self.finish_execution_plan.as_ref().into(), + ), + ( + "final_projection", + self.final_projection + .as_ref() + .map(|t| t.as_ref().into()) + .unwrap_or_else(|| "None".into()), + ), + ], + } + } + async fn on_start(&mut self, ctx: &mut ArrowContext) { let watermark = ctx.last_present_watermark(); let table = ctx diff --git a/crates/arroyo-worker/src/arrow/updating_aggregator.rs b/crates/arroyo-worker/src/arrow/updating_aggregator.rs index 118fb4c6f..feb88c3c3 100644 --- a/crates/arroyo-worker/src/arrow/updating_aggregator.rs +++ b/crates/arroyo-worker/src/arrow/updating_aggregator.rs @@ -1,3 +1,7 @@ +use anyhow::{anyhow, Result}; +use arrow::compute::concat_batches; +use arrow_array::RecordBatch; +use std::borrow::Cow; use std::{ any::Any, collections::HashMap, @@ -5,10 +9,6 @@ use std::{ sync::{Arc, RwLock}, }; -use anyhow::{anyhow, Result}; -use arrow::compute::concat_batches; -use arrow_array::RecordBatch; - use arroyo_operator::{ context::ArrowContext, operator::{ArrowOperator, OperatorConstructor, OperatorNode}, @@ -19,7 +19,7 @@ use arroyo_types::{CheckpointBarrier, SignalMessage, Watermark}; use datafusion::{execution::context::SessionContext, physical_plan::ExecutionPlan}; use arroyo_df::physical::{ArroyoPhysicalExtensionCodec, DecodingContext}; -use arroyo_operator::operator::Registry; +use arroyo_operator::operator::{AsDisplayable, DisplayableOperator, Registry}; use arroyo_rpc::df::ArroyoSchemaRef; use datafusion::common::ScalarValue; use datafusion::execution::{ @@ -188,6 +188,37 @@ impl ArrowOperator for UpdatingAggregatingFunc { "UpdatingAggregatingFunc".to_string() } + fn display(&self) -> DisplayableOperator { + DisplayableOperator { + name: Cow::Borrowed("UpdatingAggregatingFunc"), + fields: vec![ + ("flush_interval", AsDisplayable::Debug(&self.flush_interval)), + ("ttl", AsDisplayable::Debug(&self.ttl)), + ( + "partial_aggregation_schema", + (&*self.partial_schema.schema).into(), + ), + ( + "partial_aggregation_plan", + self.partial_aggregation_plan.as_ref().into(), + ), + ( + "state_partial_schema", + (&*self.state_partial_schema.schema).into(), + ), + ("combine_plan", self.combine_plan.as_ref().into()), + ( + "state_final_schema", + (&*self.state_final_schema.schema).into(), + ), + ( + "finish_execution_plan", + self.finish_execution_plan.as_ref().into(), + ), + ], + } + } + async fn process_batch(&mut self, batch: RecordBatch, _ctx: &mut ArrowContext) { if self.sender.is_none() { self.init_exec(); diff --git a/crates/arroyo-worker/src/arrow/watermark_generator.rs b/crates/arroyo-worker/src/arrow/watermark_generator.rs index 2d61d1dad..8eefd3343 100644 --- a/crates/arroyo-worker/src/arrow/watermark_generator.rs +++ b/crates/arroyo-worker/src/arrow/watermark_generator.rs @@ -2,7 +2,9 @@ use arrow::compute::kernels; use arrow_array::RecordBatch; use arroyo_operator::context::ArrowContext; use arroyo_operator::get_timestamp_col; -use arroyo_operator::operator::{ArrowOperator, OperatorConstructor, OperatorNode, Registry}; +use arroyo_operator::operator::{ + ArrowOperator, AsDisplayable, DisplayableOperator, OperatorConstructor, OperatorNode, Registry, +}; use arroyo_rpc::df::ArroyoSchema; use arroyo_rpc::grpc::api::ExpressionWatermarkConfig; use arroyo_rpc::grpc::rpc::TableConfig; @@ -17,6 +19,7 @@ use datafusion_proto::physical_plan::from_proto::parse_physical_expr; use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec; use datafusion_proto::protobuf::PhysicalExprNode; use prost::Message; +use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -95,6 +98,17 @@ impl ArrowOperator for WatermarkGenerator { "expression_watermark_generator".to_string() } + fn display(&self) -> DisplayableOperator { + DisplayableOperator { + name: Cow::Borrowed("WatermarkGenerator"), + fields: vec![ + ("interval", AsDisplayable::Debug(&self.interval)), + ("idle_time", AsDisplayable::Debug(&self.idle_time)), + ("expression", AsDisplayable::Debug(&self.expression)), + ], + } + } + fn tick_interval(&self) -> Option { Some(Duration::from_secs(1)) } diff --git a/crates/arroyo-worker/src/lib.rs b/crates/arroyo-worker/src/lib.rs index d3b251038..a6f2f914a 100644 --- a/crates/arroyo-worker/src/lib.rs +++ b/crates/arroyo-worker/src/lib.rs @@ -38,6 +38,7 @@ pub use ordered_float::OrderedFloat; use prometheus::{Encoder, ProtobufEncoder}; use prost::Message; +use crate::utils::to_d2; use arroyo_datastream::logical::LogicalProgram; use arroyo_df::physical::new_registry; use arroyo_rpc::config::config; @@ -48,6 +49,7 @@ pub mod arrow; pub mod engine; mod network_manager; +pub mod utils; pub static TIMER_TABLE: char = '['; @@ -410,6 +412,8 @@ impl WorkerGrpc for WorkerServer { let logical = LogicalProgram::try_from(req.program.expect("Program is None")) .expect("Failed to create LogicalProgram"); + debug!("Starting execution for graph\n{}", to_d2(&logical)); + for (udf_name, dylib_config) in &logical.program_config.udf_dylibs { info!("Loading UDF {}", udf_name); registry diff --git a/crates/arroyo-worker/src/utils.rs b/crates/arroyo-worker/src/utils.rs new file mode 100644 index 000000000..cf153cd7f --- /dev/null +++ b/crates/arroyo-worker/src/utils.rs @@ -0,0 +1,104 @@ +use crate::engine::construct_operator; +use arrow_schema::Schema; +use arroyo_datastream::logical::LogicalProgram; +use arroyo_df::physical::new_registry; +use std::fmt::Write; +use std::sync::Arc; + +fn format_arrow_schema_fields(schema: &Schema) -> Vec<(String, String)> { + schema + .fields() + .iter() + .map(|field| (field.name().clone(), field.data_type().to_string())) + .collect() +} + +pub fn to_d2(logical: &LogicalProgram) -> String { + let registry = Arc::new(new_registry()); + assert!( + logical.program_config.udf_dylibs.is_empty(), + "UDFs not supported" + ); + assert!( + logical.program_config.python_udfs.is_empty(), + "UDFs not supported" + ); + + let mut d2 = String::new(); + + // Nodes + for idx in logical.graph.node_indices() { + let node = logical.graph.node_weight(idx).unwrap(); + let operator = construct_operator( + node.operator_name, + node.operator_config.clone(), + registry.clone(), + ); + let display = operator.display(); + + // Create a Markdown-formatted label with operator details + let mut label = format!("### {} ({})", operator.name(), &display.name); + for (field, value) in display.fields { + label.push_str(&format!("\n- **{}**: {}", field, value)); + } + + writeln!( + &mut d2, + "{}: {{ + label: |markdown +{} + | + shape: rectangle +}}", + idx.index(), + label + ) + .unwrap(); + } + + // Edges and Schema Nodes + for idx in logical.graph.edge_indices() { + let edge = logical.graph.edge_weight(idx).unwrap(); + let (from, to) = logical.graph.edge_endpoints(idx).unwrap(); + + // Edge label (could be empty or minimal) + let edge_label = format!("{}", edge.edge_type); + + // Create a schema node using sql_table shape + let schema_node_name = format!("schema_{}", idx.index()); + let schema_fields = format_arrow_schema_fields(&edge.schema.schema); + + // Begin schema node definition + writeln!(&mut d2, "{}: {{", schema_node_name).unwrap(); + writeln!(&mut d2, " shape: sql_table").unwrap(); + + // Add fields to the schema node + for (field_name, field_type) in schema_fields { + writeln!( + &mut d2, + " \"{}\": \"{}\"", // Field definition + field_name.replace("\"", "\\\""), + field_type.replace("\"", "\\\"") + ) + .unwrap(); + } + + // End schema node definition + writeln!(&mut d2, "}}").unwrap(); + + // Connect source operator to schema node + writeln!( + &mut d2, + "{} -> {}: \"{}\"", + from.index(), + schema_node_name, + edge_label.replace("\"", "\\\"") + ) + .unwrap(); + + // Connect schema node to destination operator + writeln!(&mut d2, "{} -> {}", schema_node_name, to.index()).unwrap(); + } + + d2 +} diff --git a/crates/arroyo/Cargo.toml b/crates/arroyo/Cargo.toml index d7a5d262d..78e3501d9 100644 --- a/crates/arroyo/Cargo.toml +++ b/crates/arroyo/Cargo.toml @@ -20,6 +20,7 @@ arroyo-rpc = { path = "../arroyo-rpc" } arroyo-openapi = { path ="../arroyo-openapi" } arroyo-storage = { path = "../arroyo-storage" } arroyo-udf-python = { path = "../arroyo-udf/arroyo-udf-python" } +arroyo-df = { path = "../arroyo-planner" } clap = { version = "4", features = ["derive"] } tokio = { version = "1", features = ["full"] } @@ -41,6 +42,8 @@ rand = "0.8.5" reqwest = "0.11" clio = { version = "0.3.5", features = ["clap", "clap-parse"] } async-trait = "0.1.80" +open = '5.3.0' + [target.'cfg(all(not(target_env = "msvc"), any(target_arch = "x86_64", target_arch = "aarch64")))'.dependencies] tikv-jemallocator = { version = "0.5.4", features = ["unprefixed_malloc_on_supported_platforms"]} \ No newline at end of file diff --git a/crates/arroyo/src/main.rs b/crates/arroyo/src/main.rs index bf531d57d..01c025f5c 100644 --- a/crates/arroyo/src/main.rs +++ b/crates/arroyo/src/main.rs @@ -1,22 +1,23 @@ mod run; use anyhow::{anyhow, bail}; -use std::path::PathBuf; -use std::{env, fs}; - +use arroyo_df::{ArroyoSchemaProvider, SqlConfig}; use arroyo_rpc::config; use arroyo_rpc::config::{config, DatabaseType}; use arroyo_server_common::shutdown::{Shutdown, SignalBehavior}; use arroyo_server_common::{log_event, start_admin_server}; -use arroyo_worker::WorkerServer; +use arroyo_worker::{utils, WorkerServer}; use clap::{Args, Parser, Subcommand}; use clio::Input; use cornucopia_async::DatabaseSource; use deadpool_postgres::{ManagerConfig, Pool, RecyclingMethod}; use serde_json::json; +use std::env::temp_dir; +use std::path::PathBuf; use std::process::exit; use std::sync::Arc; use std::time::Duration; +use std::{env, fs}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::time::timeout; use tokio_postgres::{Client, Connection, NoTls}; @@ -108,6 +109,17 @@ enum Commands { #[arg(long)] wait: Option, }, + + /// Visualizes a query plan + Visualize { + /// Open the visualization in the browser + #[clap(short, long, action)] + open: bool, + + /// The query to visualize + #[clap(value_parser, default_value = "-")] + query: Input, + }, } #[derive(Debug, Eq, PartialEq, Copy, Clone)] @@ -166,6 +178,9 @@ async fn main() { Commands::Run(args) => { run::run(args).await; } + Commands::Visualize { query, open } => { + visualize(query, open).await; + } }; } @@ -458,3 +473,39 @@ async fn start_node() { Shutdown::handle_shutdown(shutdown.wait_for_shutdown(Duration::from_secs(30)).await); } + +async fn visualize(query: Input, open: bool) { + let query = std::io::read_to_string(query).expect("Failed to read query"); + + let schema_provider = ArroyoSchemaProvider::new(); + let compiled = arroyo_df::parse_and_get_program(&query, schema_provider, SqlConfig::default()) + .await + .expect("Failed while planning query"); + + if open { + let tmp = temp_dir().join("plan.d2"); + tokio::fs::write(&tmp, utils::to_d2(&compiled.program)) + .await + .expect("Failed to write plan"); + let output = tmp.with_extension("svg"); + let result = tokio::process::Command::new("d2") + .arg(&tmp) + .arg(&output) + .output() + .await + .expect("d2 must be installed to visualize the plan"); + + if !result.status.success() { + panic!( + "Failed to run d2: {}", + String::from_utf8_lossy(&result.stderr) + ); + } + + eprintln!("Wrote svg to {:?}", output); + + let _ = open::that(format!("file://{}", output.to_string_lossy())); + } else { + println!("{}", utils::to_d2(&compiled.program)); + } +}