Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Nov 14, 2024
1 parent eec4f91 commit e0e0a4b
Show file tree
Hide file tree
Showing 28 changed files with 221 additions and 153 deletions.
4 changes: 2 additions & 2 deletions crates/arroyo-api/src/cloud.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use axum_extra::headers::Authorization;
use crate::{rest_utils::ErrorResp, AuthData, OrgMetadata};
use axum_extra::headers::authorization::Bearer;
use axum_extra::headers::Authorization;
use axum_extra::TypedHeader;
use crate::{rest_utils::ErrorResp, AuthData, OrgMetadata};
use cornucopia_async::Database;

pub(crate) async fn authenticate(
Expand Down
10 changes: 4 additions & 6 deletions crates/arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use axum::Json;
use cornucopia_async::DatabaseSource;
use http::StatusCode;
use serde::{Deserialize, Serialize};
use std::net::{SocketAddr};
use std::net::SocketAddr;
use time::OffsetDateTime;
use tokio::net::TcpListener;
use tonic::transport::Channel;
Expand Down Expand Up @@ -138,11 +138,9 @@ pub async fn start_server(database: DatabaseSource, guard: ShutdownGuard) -> any
);

info!("Starting API server on {:?}", local_addr);
guard.into_spawn_task(wrap_start(
"api",
local_addr,
async { axum::serve(listener, app.into_make_service()).await },
));
guard.into_spawn_task(wrap_start("api", local_addr, async {
axum::serve(listener, app.into_make_service()).await
}));

Ok(local_addr.port())
}
Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-api/src/rest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use arroyo_server_common::log_event;
use axum::extract::rejection::JsonRejection;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::{Json};
use axum_extra::headers::Authorization;
use axum::Json;
use axum_extra::headers::authorization::Bearer;
use axum_extra::headers::Authorization;
use axum_extra::TypedHeader;
use serde_json::json;
use tracing::{error, warn};
Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-connectors/src/filesystem/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use arroyo_storage::StorageProvider;
use async_trait::async_trait;
use bincode::{Decode, Encode};
use chrono::{DateTime, Utc};
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::concat;
use datafusion::{
common::{Column, Result as DFResult},
Expand All @@ -35,7 +36,6 @@ use datafusion::{
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
scalar::ScalarValue,
};
use datafusion::execution::SessionStateBuilder;
use futures::{stream::FuturesUnordered, Future};
use futures::{stream::StreamExt, TryStreamExt};
use object_store::{multipart::PartId, path::Path, MultipartId};
Expand Down Expand Up @@ -212,7 +212,7 @@ fn partition_string_for_fields_and_time(
fn compile_expression(expr: &Expr, schema: ArroyoSchemaRef) -> Result<Arc<dyn PhysicalExpr>> {
let physical_planner = DefaultPhysicalPlanner::default();
let session_state = SessionStateBuilder::new().build();

let plan = physical_planner.create_physical_expr(
expr,
&(schema.schema.as_ref().clone()).try_into()?,
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/kinesis/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl KinesisSourceFunc {
}

async fn init_client(&mut self) {
let mut loader = aws_config::defaults(BehaviorVersion::v2024_03_28());
let mut loader = aws_config::defaults(BehaviorVersion::v2024_03_28());
if let Some(region) = &self.aws_region {
loader = loader.region(Region::new(region.clone()));
}
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-controller/src/schedulers/kubernetes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use arroyo_rpc::grpc::rpc::{HeartbeatNodeReq, RegisterNodeReq, WorkerFinishedReq
use arroyo_types::{WorkerId, JOB_ID_ENV, RUN_ID_ENV};
use async_trait::async_trait;
use k8s_openapi::api::core::v1::Pod;
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use kube::api::{DeleteParams, ListParams};
use kube::{Api, Client};
use serde_json::json;
use std::time::Duration;
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use tonic::Status;
use tracing::{info, warn};

Expand Down
11 changes: 8 additions & 3 deletions crates/arroyo-planner/src/extension/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,14 @@ use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use datafusion_proto::{physical_plan::AsExecutionPlan, protobuf::PhysicalPlanNode};
use prost::Message;

use crate::{builder::{NamedNode, Planner, SplitPlanOutput}, fields_with_qualifiers, multifield_partial_ord, physical::ArroyoPhysicalExtensionCodec, schema_from_df_fields, schema_from_df_fields_with_metadata, DFField, WindowBehavior};
use crate::physical::window;
use super::{ArroyoExtension, NodeWithIncomingEdges, TimestampAppendExtension};
use crate::physical::window;
use crate::{
builder::{NamedNode, Planner, SplitPlanOutput},
fields_with_qualifiers, multifield_partial_ord,
physical::ArroyoPhysicalExtensionCodec,
schema_from_df_fields, schema_from_df_fields_with_metadata, DFField, WindowBehavior,
};

pub(crate) const AGGREGATE_EXTENSION_NAME: &str = "AggregateExtension";

Expand Down Expand Up @@ -331,7 +336,7 @@ impl AggregateExtension {
let timestamp_column =
Column::new(timestamp_field.qualifier().cloned(), timestamp_field.name());
aggregate_fields.insert(window_index, window_field.clone());

let window_expression = Expr::ScalarFunction(ScalarFunction {
func: window(),
args: vec![
Expand Down
7 changes: 6 additions & 1 deletion crates/arroyo-planner/src/extension/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ pub struct DebeziumUnrollingExtension {
primary_key_names: Arc<Vec<String>>,
}

multifield_partial_ord!(DebeziumUnrollingExtension, input, primary_keys, primary_key_names);
multifield_partial_ord!(
DebeziumUnrollingExtension,
input,
primary_keys,
primary_key_names
);

impl DebeziumUnrollingExtension {
pub(crate) fn as_debezium_schema(
Expand Down
7 changes: 6 additions & 1 deletion crates/arroyo-planner/src/extension/key_calculation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_proto::{physical_plan::AsExecutionPlan, protobuf::PhysicalPlanNode};
use prost::Message;

use crate::{builder::{NamedNode, Planner}, fields_with_qualifiers, multifield_partial_ord, physical::ArroyoPhysicalExtensionCodec, schema_from_df_fields_with_metadata};
use crate::{
builder::{NamedNode, Planner},
fields_with_qualifiers, multifield_partial_ord,
physical::ArroyoPhysicalExtensionCodec,
schema_from_df_fields_with_metadata,
};

use super::{ArroyoExtension, NodeWithIncomingEdges};

Expand Down
13 changes: 11 additions & 2 deletions crates/arroyo-planner/src/extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ macro_rules! multifield_partial_ord {
}
}


#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct TimestampAppendExtension {
pub(crate) input: LogicalPlan,
Expand Down Expand Up @@ -187,7 +186,17 @@ pub(crate) struct AsyncUDFExtension {
pub(crate) final_schema: DFSchemaRef,
}

multifield_partial_ord!(AsyncUDFExtension, input, name, udf, arg_exprs, final_exprs, ordered, max_concurrency, timeout);
multifield_partial_ord!(
AsyncUDFExtension,
input,
name,
udf,
arg_exprs,
final_exprs,
ordered,
max_concurrency,
timeout
);

impl ArroyoExtension for AsyncUDFExtension {
fn node_name(&self) -> Option<NamedNode> {
Expand Down
6 changes: 5 additions & 1 deletion crates/arroyo-planner/src/extension/remote_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_proto::{physical_plan::AsExecutionPlan, protobuf::PhysicalPlanNode};
use prost::Message;

use crate::{builder::{NamedNode, Planner}, multifield_partial_ord, physical::ArroyoPhysicalExtensionCodec};
use crate::{
builder::{NamedNode, Planner},
multifield_partial_ord,
physical::ArroyoPhysicalExtensionCodec,
};

use super::{ArroyoExtension, NodeWithIncomingEdges};

Expand Down
6 changes: 5 additions & 1 deletion crates/arroyo-planner/src/extension/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ use datafusion::logical_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalN

use prost::Message;

use crate::{builder::{NamedNode, Planner}, multifield_partial_ord, tables::Table};
use crate::{
builder::{NamedNode, Planner},
multifield_partial_ord,
tables::Table,
};

use super::{
debezium::ToDebeziumExtension, remote_table::RemoteTableExtension, ArroyoExtension,
Expand Down
7 changes: 6 additions & 1 deletion crates/arroyo-planner/src/extension/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ use prost::Message;

use super::{ArroyoExtension, DebeziumUnrollingExtension, NodeWithIncomingEdges};
use crate::tables::FieldSpec;
use crate::{builder::{NamedNode, Planner}, multifield_partial_ord, schema_from_df_fields, schemas::add_timestamp_field, tables::ConnectorTable};
use crate::{
builder::{NamedNode, Planner},
multifield_partial_ord, schema_from_df_fields,
schemas::add_timestamp_field,
tables::ConnectorTable,
};
pub(crate) const TABLE_SOURCE_NAME: &str = "TableSourceExtension";

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down
10 changes: 8 additions & 2 deletions crates/arroyo-planner/src/extension/watermark_node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::builder::{NamedNode, Planner};
use crate::extension::{ArroyoExtension, NodeWithIncomingEdges};
use crate::multifield_partial_ord;
use crate::schemas::add_timestamp_field;
use arroyo_datastream::logical::{LogicalEdge, LogicalEdgeType, LogicalNode, OperatorName};
use arroyo_rpc::df::{ArroyoSchema, ArroyoSchemaRef};
Expand All @@ -12,7 +13,6 @@ use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use prost::Message;
use std::fmt::Formatter;
use std::sync::Arc;
use crate::multifield_partial_ord;

pub(crate) const WATERMARK_NODE_NAME: &str = "WatermarkNode";
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand All @@ -24,7 +24,13 @@ pub struct WatermarkNode {
timestamp_index: usize,
}

multifield_partial_ord!(WatermarkNode, input, qualifier, watermark_expression, timestamp_index);
multifield_partial_ord!(
WatermarkNode,
input,
qualifier,
watermark_expression,
timestamp_index
);

impl UserDefinedLogicalNodeCore for WatermarkNode {
fn name(&self) -> &str {
Expand Down
6 changes: 1 addition & 5 deletions crates/arroyo-planner/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) {
.register_udf(Arc::new(create_udf(
"extract_json",
vec![DataType::Utf8, DataType::Utf8],
DataType::List(Arc::new(Field::new(
"item",
DataType::Utf8,
true,
))),
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
Volatility::Immutable,
Arc::new(extract_json),
)))
Expand Down
Loading

0 comments on commit e0e0a4b

Please sign in to comment.