Skip to content

Commit

Permalink
Add a new arroyo visualize command to help debug query plans (#757)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Oct 16, 2024
1 parent 1d95e13 commit a431d75
Show file tree
Hide file tree
Showing 18 changed files with 559 additions and 53 deletions.
38 changes: 38 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/blackhole/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::ArrowOperator;
use async_trait::async_trait;

#[derive(Debug)]
pub struct BlackholeSinkFunc {}

impl BlackholeSinkFunc {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::{collections::HashMap, time::SystemTime};

use anyhow::Result;
use arrow::record_batch::RecordBatch;
use arroyo_operator::{context::ArrowContext, operator::ArrowOperator};
Expand All @@ -12,6 +10,8 @@ use arroyo_types::{Data, SignalMessage, TaskInfo, Watermark};
use async_trait::async_trait;
use bincode::config;
use prost::Message;
use std::fmt::Debug;
use std::{collections::HashMap, time::SystemTime};
use tracing::{info, warn};

pub struct TwoPhaseCommitterOperator<TPC: TwoPhaseCommitter> {
Expand Down
10 changes: 10 additions & 0 deletions crates/arroyo-connectors/src/fluvio/sink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use arrow::array::RecordBatch;
use async_trait::async_trait;
use fluvio::{Fluvio, FluvioConfig, TopicProducerPool};
use std::fmt::Debug;

use arroyo_formats::ser::ArrowSerializer;
use tracing::info;
Expand All @@ -16,6 +17,15 @@ pub struct FluvioSinkFunc {
pub serializer: ArrowSerializer,
}

impl Debug for FluvioSinkFunc {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FluvioSinkFunc")
.field("topic", &self.topic)
.field("endpoint", &self.endpoint)
.finish()
}
}

#[async_trait]
impl ArrowOperator for FluvioSinkFunc {
fn name(&self) -> String {
Expand Down
34 changes: 32 additions & 2 deletions crates/arroyo-connectors/src/kafka/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use anyhow::Result;
use std::borrow::Cow;

use arroyo_rpc::grpc::rpc::{GlobalKeyedTableConfig, TableConfig, TableEnum};
use arroyo_rpc::{CheckpointEvent, ControlMessage, ControlResp};
use arroyo_types::*;
use std::collections::HashMap;

use std::fmt::{Display, Formatter};
use tracing::{error, warn};

use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
Expand All @@ -17,7 +18,7 @@ use arrow::array::{Array, AsArray, RecordBatch};
use arrow::datatypes::{DataType, TimeUnit};
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::ArrowOperator;
use arroyo_operator::operator::{ArrowOperator, AsDisplayable, DisplayableOperator};
use arroyo_rpc::df::ArroyoSchema;
use arroyo_types::CheckpointBarrier;
use async_trait::async_trait;
Expand Down Expand Up @@ -50,6 +51,15 @@ pub enum ConsistencyMode {
},
}

impl Display for ConsistencyMode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ConsistencyMode::AtLeastOnce => write!(f, "AtLeastOnce"),
ConsistencyMode::ExactlyOnce { .. } => write!(f, "ExactlyOnce"),
}
}
}

impl From<SinkCommitMode> for ConsistencyMode {
fn from(commit_mode: SinkCommitMode) -> Self {
match commit_mode {
Expand Down Expand Up @@ -219,6 +229,26 @@ impl ArrowOperator for KafkaSinkFunc {
format!("kafka-producer-{}", self.topic)
}

fn display(&self) -> DisplayableOperator {
DisplayableOperator {
name: Cow::Borrowed("KafkaSinkFunc"),
fields: vec![
("topic", self.topic.as_str().into()),
("bootstrap_servers", self.bootstrap_servers.as_str().into()),
(
"consistency_mode",
AsDisplayable::Display(&self.consistency_mode),
),
(
"timestamp_field",
AsDisplayable::Debug(&self.timestamp_field),
),
("key_field", AsDisplayable::Debug(&self.key_field)),
("client_config", AsDisplayable::Debug(&self.client_config)),
],
}
}

fn tables(&self) -> HashMap<String, TableConfig> {
if self.is_committing() {
single_item_hash_map(
Expand Down
88 changes: 88 additions & 0 deletions crates/arroyo-operator/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{CheckpointCounter, ControlOutcome, SourceFinishType};
use anyhow::anyhow;
use arrow::array::RecordBatch;
use arrow::datatypes::DataType;
use arrow::datatypes::Schema;
use arroyo_datastream::logical::{DylibUdfConfig, PythonUdfConfig};
use arroyo_metrics::TaskCounters;
use arroyo_rpc::grpc::rpc::{TableConfig, TaskCheckpointEventType};
Expand All @@ -22,10 +23,13 @@ use datafusion::logical_expr::planner::ExprPlanner;
use datafusion::logical_expr::{
create_udaf, AggregateUDF, ScalarUDF, Signature, TypeSignature, Volatility, WindowUDF,
};
use datafusion::physical_plan::{displayable, ExecutionPlan};
use dlopen2::wrapper::Container;
use futures::future::OptionFuture;
use std::any::Any;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display, Formatter};
use std::future::Future;
use std::path::Path;
use std::pin::Pin;
Expand Down Expand Up @@ -68,6 +72,16 @@ impl OperatorNode {
}
}

pub fn display(&self) -> DisplayableOperator {
match self {
OperatorNode::Source(_) => DisplayableOperator {
name: self.name().into(),
fields: vec![],
},
OperatorNode::Operator(op) => op.display(),
}
}

pub fn tables(&self) -> HashMap<String, TableConfig> {
match self {
OperatorNode::Source(s) => s.tables(),
Expand Down Expand Up @@ -314,6 +328,73 @@ async fn operator_run_behavior(
final_message
}

pub enum AsDisplayable<'a> {
Str(&'a str),
String(String),
Display(&'a dyn Display),
Debug(&'a dyn Debug),
Plan(&'a dyn ExecutionPlan),
Schema(&'a Schema),
}

impl<'a> From<&'a str> for AsDisplayable<'a> {
fn from(s: &'a str) -> Self {
AsDisplayable::Str(s)
}
}

impl From<String> for AsDisplayable<'_> {
fn from(s: String) -> Self {
AsDisplayable::String(s)
}
}

impl<'a> From<&'a dyn ExecutionPlan> for AsDisplayable<'a> {
fn from(p: &'a dyn ExecutionPlan) -> Self {
AsDisplayable::Plan(p)
}
}

impl<'a> From<&'a Schema> for AsDisplayable<'a> {
fn from(s: &'a Schema) -> Self {
AsDisplayable::Schema(s)
}
}

impl Display for AsDisplayable<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
AsDisplayable::Str(s) => {
write!(f, "{}", s)
}
AsDisplayable::String(s) => {
write!(f, "{}", s)
}
AsDisplayable::Display(d) => {
write!(f, "{}", d)
}
AsDisplayable::Debug(d) => {
write!(f, "{:?}", d)
}
AsDisplayable::Plan(p) => {
write!(f, "`{}`", displayable(*p).one_line())
}
AsDisplayable::Schema(s) => {
for field in s.fields() {
write!(f, "\n * {}: {:?}, ", field.name(), field.data_type())?;
}

Ok(())
}
}
}
}

pub struct DisplayableOperator<'a> {
pub name: Cow<'a, str>,
pub fields: Vec<(&'static str, AsDisplayable<'a>)>,
}

#[async_trait::async_trait]
pub trait ArrowOperator: Send + 'static {
async fn handle_watermark_int(&mut self, watermark: Watermark, ctx: &mut ArrowContext) {
Expand Down Expand Up @@ -461,6 +542,13 @@ pub trait ArrowOperator: Send + 'static {
None
}

fn display(&self) -> DisplayableOperator {
DisplayableOperator {
name: self.name().into(),
fields: vec![],
}
}

#[allow(unused_variables)]
async fn on_start(&mut self, ctx: &mut ArrowContext) {}

Expand Down
39 changes: 38 additions & 1 deletion crates/arroyo-worker/src/arrow/async_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use arrow_schema::{Field, Schema};
use arroyo_datastream::logical::DylibUdfConfig;
use arroyo_df::ASYNC_RESULT_FIELD;
use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::{ArrowOperator, OperatorConstructor, OperatorNode, Registry};
use arroyo_operator::operator::{
ArrowOperator, AsDisplayable, DisplayableOperator, OperatorConstructor, OperatorNode, Registry,
};
use arroyo_rpc::grpc::api;
use arroyo_rpc::grpc::rpc::TableConfig;
use arroyo_state::global_table_config;
Expand All @@ -18,7 +20,9 @@ use datafusion_proto::physical_plan::from_proto::parse_physical_expr;
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use datafusion_proto::protobuf::PhysicalExprNode;
use prost::Message;
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use tracing::info;
Expand Down Expand Up @@ -106,6 +110,39 @@ impl ArrowOperator for AsyncUdfOperator {
self.name.clone()
}

fn display(&self) -> DisplayableOperator {
DisplayableOperator {
name: Cow::Borrowed("AsyncUdfOperator"),
fields: vec![
("name", self.name.as_str().into()),
("ordered", AsDisplayable::Debug(&self.ordered)),
(
"allowed_in_flight",
AsDisplayable::Debug(&self.allowed_in_flight),
),
("timeout", AsDisplayable::Debug(&self.timeout)),
(
"input_exprs",
self.input_exprs
.iter()
.map(|e| format!("{}", e))
.collect::<Vec<_>>()
.join(", ")
.into(),
),
(
"final_exprs",
self.final_exprs
.iter()
.map(|e| format!("{}", e))
.collect::<Vec<_>>()
.join(", ")
.into(),
),
],
}
}

fn tables(&self) -> HashMap<String, TableConfig> {
global_table_config("a", "AsyncMapOperator state")
}
Expand Down
Loading

0 comments on commit a431d75

Please sign in to comment.