Skip to content

Commit

Permalink
Non-data fields - Metadata fields to Kafka Connector (#765)
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhawvipul authored Oct 30, 2024
1 parent 790b47d commit 28360fe
Show file tree
Hide file tree
Showing 40 changed files with 617 additions and 47 deletions.
1 change: 1 addition & 0 deletions crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ async fn compile_sql<'a>(
.unwrap_or(json!({})),
&table.config,
Some(&table.schema),
None,
)
.map_err(log_and_map)?;

Expand Down
6 changes: 5 additions & 1 deletion crates/arroyo-connectors/src/blackhole/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::blackhole::operator::BlackholeSinkFunc;
use anyhow::anyhow;
use arrow::datatypes::DataType;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
use arroyo_rpc::api_types::connections::{
Expand Down Expand Up @@ -78,8 +79,9 @@ impl Connector for BlackholeConnector {
_options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
self.from_config(None, name, EmptyConfig {}, EmptyConfig {}, schema)
self.from_config(None, name, EmptyConfig {}, EmptyConfig {}, schema, None)
}

fn from_config(
Expand All @@ -89,6 +91,7 @@ impl Connector for BlackholeConnector {
config: Self::ProfileT,
table: Self::TableT,
s: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let description = "Blackhole".to_string();

Expand All @@ -99,6 +102,7 @@ impl Connector for BlackholeConnector {
format: None,
bad_data: None,
framing: None,
additional_fields: None,
};

Ok(Connection {
Expand Down
7 changes: 5 additions & 2 deletions crates/arroyo-connectors/src/confluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::kafka::{
};
use crate::{kafka, pull_opt};
use anyhow::anyhow;
use arrow::datatypes::DataType;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
use arroyo_rpc::api_types::connections::{
Expand Down Expand Up @@ -161,6 +162,7 @@ impl Connector for ConfluentConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let connection = profile
.map(|p| {
Expand All @@ -172,7 +174,7 @@ impl Connector for ConfluentConnector {

let table = KafkaConnector::table_from_options(options)?;

self.from_config(None, name, connection, table, schema)
self.from_config(None, name, connection, table, schema, None)
}

fn from_config(
Expand All @@ -182,11 +184,12 @@ impl Connector for ConfluentConnector {
config: Self::ProfileT,
mut table: Self::TableT,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
table
.client_configs
.insert("client.id".to_string(), CLIENT_ID.to_string());
KafkaConnector {}.from_config(id, name, config.into(), table, schema)
KafkaConnector {}.from_config(id, name, config.into(), table, schema, None)
}

fn make_operator(
Expand Down
6 changes: 5 additions & 1 deletion crates/arroyo-connectors/src/filesystem/delta.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arroyo_operator::connector::Connection;
use arroyo_storage::BackendConfig;
use std::collections::HashMap;
Expand Down Expand Up @@ -77,6 +78,7 @@ impl Connector for DeltaLakeConnector {
config: Self::ProfileT,
table: Self::TableT,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<arroyo_operator::connector::Connection> {
let TableType::Sink {
write_path,
Expand Down Expand Up @@ -123,6 +125,7 @@ impl Connector for DeltaLakeConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: None,
};

Ok(Connection {
Expand All @@ -142,10 +145,11 @@ impl Connector for DeltaLakeConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let table = file_system_sink_from_options(options, schema, CommitStyle::DeltaLake)?;

self.from_config(None, name, EmptyConfig {}, table, schema)
self.from_config(None, name, EmptyConfig {}, table, schema, None)
}

fn make_operator(
Expand Down
7 changes: 6 additions & 1 deletion crates/arroyo-connectors/src/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod sink;
mod source;

use anyhow::{anyhow, bail, Result};
use arrow::datatypes::DataType;
use arroyo_storage::BackendConfig;
use regex::Regex;
use std::collections::HashMap;
Expand Down Expand Up @@ -114,6 +115,7 @@ impl Connector for FileSystemConnector {
config: Self::ProfileT,
table: Self::TableT,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let (description, connection_type) = match table.table_type {
TableType::Source { .. } => ("FileSystem".to_string(), ConnectionType::Source),
Expand Down Expand Up @@ -168,6 +170,7 @@ impl Connector for FileSystemConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: None,
};

Ok(Connection {
Expand All @@ -187,6 +190,7 @@ impl Connector for FileSystemConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
match options.remove("type") {
Some(t) if t == "source" => {
Expand All @@ -210,12 +214,13 @@ impl Connector for FileSystemConnector {
},
},
schema,
None,
)
}
Some(t) if t == "sink" => {
let table = file_system_sink_from_options(options, schema, CommitStyle::Direct)?;

self.from_config(None, name, EmptyConfig {}, table, schema)
self.from_config(None, name, EmptyConfig {}, table, schema, None)
}
Some(t) => bail!("unknown type: {}", t),
None => bail!("must have type set"),
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/filesystem/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ impl FileSystemSourceFunc {
line = line_reader.next() => {
match line.transpose()? {
Some(line) => {
ctx.deserialize_slice(line.as_bytes(), SystemTime::now()).await?;
ctx.deserialize_slice(line.as_bytes(), SystemTime::now(), None).await?;
records_read += 1;
if ctx.should_flush() {
ctx.flush_buffer().await?;
Expand Down
6 changes: 5 additions & 1 deletion crates/arroyo-connectors/src/fluvio/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
Expand Down Expand Up @@ -88,6 +89,7 @@ impl Connector for FluvioConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let endpoint = options.remove("endpoint");
let topic = pull_opt("topic", options)?;
Expand Down Expand Up @@ -116,7 +118,7 @@ impl Connector for FluvioConnector {
type_: table_type,
};

Self::from_config(self, None, name, EmptyConfig {}, table, schema)
Self::from_config(self, None, name, EmptyConfig {}, table, schema, None)
}

fn from_config(
Expand All @@ -126,6 +128,7 @@ impl Connector for FluvioConnector {
config: EmptyConfig,
table: FluvioTable,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let (typ, desc) = match table.type_ {
TableType::Source { .. } => (
Expand Down Expand Up @@ -154,6 +157,7 @@ impl Connector for FluvioConnector {
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: None,
};

Ok(Connection {
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/src/fluvio/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl FluvioSourceFunc {
match message {
Some((_, Ok(msg))) => {
let timestamp = from_millis(msg.timestamp().max(0) as u64);
ctx.deserialize_slice(msg.value(), timestamp).await?;
ctx.deserialize_slice(msg.value(), timestamp, None).await?;

if ctx.should_flush() {
ctx.flush_buffer().await?;
Expand Down
5 changes: 5 additions & 0 deletions crates/arroyo-connectors/src/impulse/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod operator;

use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arroyo_operator::connector::{Connection, Connector};
use arroyo_operator::operator::OperatorNode;
use arroyo_rpc::api_types::connections::FieldType::Primitive;
Expand Down Expand Up @@ -101,6 +102,7 @@ impl Connector for ImpulseConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
_profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let event_rate = f64::from_str(&pull_opt("event_rate", options)?)
.map_err(|_| anyhow!("invalid value for event_rate; expected float"))?;
Expand Down Expand Up @@ -134,6 +136,7 @@ impl Connector for ImpulseConnector {
message_count,
},
None,
None,
)
}

Expand All @@ -144,6 +147,7 @@ impl Connector for ImpulseConnector {
config: Self::ProfileT,
table: Self::TableT,
_: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let description = format!(
"{}Impulse<{} eps{}>",
Expand All @@ -166,6 +170,7 @@ impl Connector for ImpulseConnector {
format: None,
bad_data: None,
framing: None,
additional_fields: None,
};

Ok(Connection {
Expand Down
50 changes: 46 additions & 4 deletions crates/arroyo-connectors/src/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{anyhow, bail};
use arrow::datatypes::DataType;
use arroyo_formats::de::ArrowDeserializer;
use arroyo_formats::ser::ArrowSerializer;
use arroyo_operator::connector::Connection;
Expand Down Expand Up @@ -188,6 +189,7 @@ impl Connector for KafkaConnector {
config: KafkaConfig,
table: KafkaTable,
schema: Option<&ConnectionSchema>,
metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let (typ, desc) = match table.type_ {
TableType::Source { .. } => (
Expand All @@ -207,13 +209,21 @@ impl Connector for KafkaConnector {
.map(|t| t.to_owned())
.ok_or_else(|| anyhow!("'format' must be set for Kafka connection"))?;

let metadata_fields = metadata_fields.map(|fields| {
fields
.into_iter()
.map(|(k, (v, _))| (k, v))
.collect::<HashMap<String, String>>()
});

let config = OperatorConfig {
connection: serde_json::to_value(config).unwrap(),
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: metadata_fields,
};

Ok(Connection {
Expand Down Expand Up @@ -312,6 +322,7 @@ impl Connector for KafkaConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
profile: Option<&ConnectionProfile>,
metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let connection = profile
.map(|p| {
Expand All @@ -323,7 +334,37 @@ impl Connector for KafkaConnector {

let table = Self::table_from_options(options)?;

Self::from_config(self, None, name, connection, table, schema)
let allowed_metadata_udf_args: HashMap<&str, DataType> = [
("offset_id", DataType::Int64),
("partition", DataType::Int32),
("topic", DataType::Utf8),
]
.iter()
.cloned()
.collect();

if let Some(fields) = &metadata_fields {
for (field_name, data_type) in fields.values() {
match allowed_metadata_udf_args.get(field_name.as_str()) {
Some(expected_type) => {
if expected_type != data_type {
return Err(anyhow!(
"Invalid datatype for metadata field '{}': expected '{:?}', found '{:?}'",
field_name, expected_type, data_type
));
}
}
None => {
return Err(anyhow!(
"Invalid metadata field name for Kafka connector: '{}'",
field_name
));
}
}
}
}

Self::from_config(self, None, name, connection, table, schema, metadata_fields)
}

fn make_operator(
Expand Down Expand Up @@ -383,6 +424,7 @@ impl Connector for KafkaConnector {
.unwrap_or(u32::MAX),
)
.unwrap(),
metadata_fields: config.additional_fields,
})))
}
TableType::Sink {
Expand Down Expand Up @@ -622,7 +664,7 @@ impl KafkaTester {
let mut builders = aschema.builders();

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now())
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
.await
.into_iter()
.next();
Expand All @@ -644,7 +686,7 @@ impl KafkaTester {
let mut builders = aschema.builders();

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now())
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
.await
.into_iter()
.next();
Expand Down Expand Up @@ -678,7 +720,7 @@ impl KafkaTester {
let mut builders = aschema.builders();

let mut error = deserializer
.deserialize_slice(&mut builders, &msg, SystemTime::now())
.deserialize_slice(&mut builders, &msg, SystemTime::now(), None)
.await
.into_iter()
.next();
Expand Down
Loading

0 comments on commit 28360fe

Please sign in to comment.