diff --git a/smartmodule/examples/Cargo.lock b/smartmodule/examples/Cargo.lock index 0725dc5898..4ae10cbeae 100644 --- a/smartmodule/examples/Cargo.lock +++ b/smartmodule/examples/Cargo.lock @@ -137,7 +137,7 @@ dependencies = [ [[package]] name = "fluvio-compression" -version = "0.3.0" +version = "0.3.2" dependencies = [ "serde", "thiserror", @@ -192,15 +192,15 @@ dependencies = [ [[package]] name = "fluvio-protocol" -version = "0.10.5" +version = "0.10.7" dependencies = [ "bytes", "content_inspector", "crc32c", "eyre", - "fluvio-compression 0.3.0", + "fluvio-compression 0.3.2", "fluvio-protocol-derive 0.5.4", - "fluvio-types 0.4.3", + "fluvio-types 0.4.4", "flv-util", "once_cell", "semver", @@ -246,7 +246,7 @@ name = "fluvio-smartmodule" version = "0.7.3" dependencies = [ "eyre", - "fluvio-protocol 0.10.5", + "fluvio-protocol 0.10.7", "fluvio-smartmodule-derive 0.6.2", "thiserror", "tracing", @@ -388,7 +388,7 @@ dependencies = [ [[package]] name = "fluvio-types" -version = "0.4.3" +version = "0.4.4" dependencies = [ "thiserror", "tracing", diff --git a/smartmodule/examples/aggregate-average/src/lib.rs b/smartmodule/examples/aggregate-average/src/lib.rs index 823a442993..47a9692d2d 100644 --- a/smartmodule/examples/aggregate-average/src/lib.rs +++ b/smartmodule/examples/aggregate-average/src/lib.rs @@ -1,5 +1,5 @@ use serde::{Serialize, Deserialize}; -use fluvio_smartmodule::{smartmodule, Result, Record, RecordData}; +use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData}; #[derive(Default, Serialize, Deserialize)] struct IncrementalAverage { @@ -22,7 +22,7 @@ impl IncrementalAverage { } #[smartmodule(aggregate)] -pub fn aggregate(accumulator: RecordData, current: &Record) -> Result { +pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result { // Parse the average from JSON let mut average: IncrementalAverage = serde_json::from_slice(accumulator.as_ref()).unwrap_or_default(); diff --git a/smartmodule/examples/aggregate-init/src/lib.rs b/smartmodule/examples/aggregate-init/src/lib.rs index eea4232554..20567e91bc 100644 --- a/smartmodule/examples/aggregate-init/src/lib.rs +++ b/smartmodule/examples/aggregate-init/src/lib.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::OnceLock; use fluvio_smartmodule::{ - dataplane::smartmodule::SmartModuleExtraParams, smartmodule, Record, RecordData, Result, + dataplane::smartmodule::SmartModuleExtraParams, smartmodule, SmartModuleRecord, RecordData, Result, }; static INITIAL_VALUE: OnceLock> = OnceLock::new(); @@ -10,7 +10,7 @@ static INITIAL_VALUE: OnceLock> = OnceLock::new(); const PARAM_NAME: &str = "initial_value"; #[smartmodule(aggregate)] -pub fn aggregate(accumulator: RecordData, current: &Record) -> Result { +pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result { let accumulator = if let Some(initial_value) = INITIAL_VALUE.get() { initial_value.get_or(&accumulator) } else { diff --git a/smartmodule/examples/aggregate-json/src/lib.rs b/smartmodule/examples/aggregate-json/src/lib.rs index 2c2d04d7ea..96532092a0 100644 --- a/smartmodule/examples/aggregate-json/src/lib.rs +++ b/smartmodule/examples/aggregate-json/src/lib.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use fluvio_smartmodule::{smartmodule, Result, Record, RecordData}; +use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData}; use serde::{Serialize, Deserialize}; #[derive(Default, Serialize, Deserialize)] @@ -20,7 +20,7 @@ impl std::ops::Add for GithubStars { } #[smartmodule(aggregate)] -pub fn aggregate(accumulator: RecordData, current: &Record) -> Result { +pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result { // Parse accumulator let accumulated_stars: GithubStars = serde_json::from_slice(accumulator.as_ref()).unwrap_or_default(); diff --git a/smartmodule/examples/aggregate-sum/src/lib.rs b/smartmodule/examples/aggregate-sum/src/lib.rs index 0865daac35..ce0ccdd17b 100644 --- a/smartmodule/examples/aggregate-sum/src/lib.rs +++ b/smartmodule/examples/aggregate-sum/src/lib.rs @@ -1,7 +1,7 @@ -use fluvio_smartmodule::{smartmodule, Result, Record, RecordData}; +use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData}; #[smartmodule(aggregate)] -pub fn aggregate(accumulator: RecordData, current: &Record) -> Result { +pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result { // Parse the accumulator and current record as strings let accumulator_string = std::str::from_utf8(accumulator.as_ref())?; let current_string = std::str::from_utf8(current.value.as_ref())?; diff --git a/smartmodule/examples/aggregate/src/lib.rs b/smartmodule/examples/aggregate/src/lib.rs index 539fc4d27d..32388ce00e 100644 --- a/smartmodule/examples/aggregate/src/lib.rs +++ b/smartmodule/examples/aggregate/src/lib.rs @@ -1,11 +1,11 @@ -use fluvio_smartmodule::{smartmodule, Result, Record, RecordData}; +use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData}; /// This aggregate concatenate accumulator and current value /// values: "a","b" // accumulator: "1", // "1a","1ab" #[smartmodule(aggregate)] -pub fn aggregate(accumulator: RecordData, current: &Record) -> Result { +pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result { let mut acc = String::from_utf8(accumulator.as_ref().to_vec())?; let next = std::str::from_utf8(current.value.as_ref())?; acc.push_str(next); diff --git a/smartmodule/examples/array_map_json_array/src/lib.rs b/smartmodule/examples/array_map_json_array/src/lib.rs index ec51851fd0..69533a446a 100644 --- a/smartmodule/examples/array_map_json_array/src/lib.rs +++ b/smartmodule/examples/array_map_json_array/src/lib.rs @@ -33,10 +33,10 @@ //! "Cranberry" //! ``` -use fluvio_smartmodule::{smartmodule, Record, RecordData, Result}; +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result}; #[smartmodule(array_map)] -pub fn array_map(record: &Record) -> Result, RecordData)>> { +pub fn array_map(record: &SmartModuleRecord) -> Result, RecordData)>> { // Deserialize a JSON array with any kind of values inside let array: Vec = serde_json::from_slice(record.value.as_ref())?; diff --git a/smartmodule/examples/array_map_json_object/src/lib.rs b/smartmodule/examples/array_map_json_object/src/lib.rs index 6a8945c01d..abbcfe74af 100644 --- a/smartmodule/examples/array_map_json_object/src/lib.rs +++ b/smartmodule/examples/array_map_json_object/src/lib.rs @@ -51,11 +51,11 @@ //! [c] "Cranberry" //! ``` -use fluvio_smartmodule::{smartmodule, Record, RecordData, Result}; +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result}; use serde_json::{Map, Value}; #[smartmodule(array_map)] -pub fn array_map(record: &Record) -> Result, RecordData)>> { +pub fn array_map(record: &SmartModuleRecord) -> Result, RecordData)>> { // Deserialize a JSON object (Map) with any kind of values inside let object: Map = serde_json::from_slice(record.value.as_ref())?; diff --git a/smartmodule/examples/array_map_json_reddit/src/lib.rs b/smartmodule/examples/array_map_json_reddit/src/lib.rs index e3281eda55..a1a407cf6a 100644 --- a/smartmodule/examples/array_map_json_reddit/src/lib.rs +++ b/smartmodule/examples/array_map_json_reddit/src/lib.rs @@ -4,7 +4,7 @@ //! zero or many output records. This example showcases taking a stream of Reddit API //! responses and converting it into a stream of the individual posts. -use fluvio_smartmodule::{smartmodule, Record, RecordData, Result}; +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result}; use serde::{Serialize, Deserialize}; #[derive(Debug, Serialize, Deserialize)] @@ -33,7 +33,7 @@ struct RedditPostData { } #[smartmodule(array_map)] -pub fn array_map(record: &Record) -> Result, RecordData)>> { +pub fn array_map(record: &SmartModuleRecord) -> Result, RecordData)>> { // Deserialize a RedditListing from JSON let listing: RedditListing = serde_json::from_slice(record.value.as_ref())?; diff --git a/smartmodule/examples/filter/src/lib.rs b/smartmodule/examples/filter/src/lib.rs index 880e9c0632..71822a2095 100644 --- a/smartmodule/examples/filter/src/lib.rs +++ b/smartmodule/examples/filter/src/lib.rs @@ -1,7 +1,7 @@ -use fluvio_smartmodule::{smartmodule, Record, Result}; +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result}; #[smartmodule(filter)] -pub fn filter(record: &Record) -> Result { +pub fn filter(record: &SmartModuleRecord) -> Result { let string = std::str::from_utf8(record.value.as_ref())?; Ok(string.contains('a')) } diff --git a/smartmodule/examples/filter_hashset/src/lib.rs b/smartmodule/examples/filter_hashset/src/lib.rs index d06e7d11d5..8fb7c53a8d 100644 --- a/smartmodule/examples/filter_hashset/src/lib.rs +++ b/smartmodule/examples/filter_hashset/src/lib.rs @@ -7,19 +7,19 @@ use std::{ }; use fluvio_smartmodule::{ - smartmodule, Record, Result, dataplane::smartmodule::SmartModuleExtraParams, eyre, + smartmodule, SmartModuleRecord, Result, dataplane::smartmodule::SmartModuleExtraParams, eyre, }; static SET: OnceLock> = OnceLock::new(); #[smartmodule(filter)] -pub fn filter(record: &Record) -> Result { +pub fn filter(record: &SmartModuleRecord) -> Result { let string = std::str::from_utf8(record.value.as_ref())?; Ok(get_mut_set()?.insert(string.to_owned())) } #[smartmodule(look_back)] -pub fn look_back(record: &Record) -> Result<()> { +pub fn look_back(record: &SmartModuleRecord) -> Result<()> { let string = std::str::from_utf8(record.value.as_ref())?; get_mut_set()?.insert(string.to_owned()); Ok(()) diff --git a/smartmodule/examples/filter_init/src/lib.rs b/smartmodule/examples/filter_init/src/lib.rs index 94815b4003..13cf218f51 100644 --- a/smartmodule/examples/filter_init/src/lib.rs +++ b/smartmodule/examples/filter_init/src/lib.rs @@ -1,7 +1,7 @@ use std::sync::OnceLock; use fluvio_smartmodule::{ - smartmodule, Record, Result, eyre, + smartmodule, SmartModuleRecord, Result, eyre, dataplane::smartmodule::{SmartModuleExtraParams, SmartModuleInitError}, }; @@ -19,7 +19,7 @@ fn init(params: SmartModuleExtraParams) -> Result<()> { } #[smartmodule(filter)] -pub fn filter(record: &Record) -> Result { +pub fn filter(record: &SmartModuleRecord) -> Result { let string = std::str::from_utf8(record.value.as_ref())?; Ok(string.contains(CRITERIA.get().unwrap())) } diff --git a/smartmodule/examples/filter_json/src/lib.rs b/smartmodule/examples/filter_json/src/lib.rs index e4903b482d..e7fae4e07c 100644 --- a/smartmodule/examples/filter_json/src/lib.rs +++ b/smartmodule/examples/filter_json/src/lib.rs @@ -45,7 +45,7 @@ //! {"level":"error","message":"Unable to connect to database"} //! ``` -use fluvio_smartmodule::{smartmodule, Record, Result}; +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result}; #[derive(PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)] #[serde(rename_all = "lowercase")] @@ -64,7 +64,7 @@ struct StructuredLog { } #[smartmodule(filter)] -pub fn filter_log_level(record: &Record) -> Result { +pub fn filter_log_level(record: &SmartModuleRecord) -> Result { let log = serde_json::from_slice::(record.value.as_ref())?; Ok(log.level > LogLevel::Debug) } diff --git a/smartmodule/examples/filter_look_back/src/lib.rs b/smartmodule/examples/filter_look_back/src/lib.rs index d1bf272034..569389840a 100644 --- a/smartmodule/examples/filter_look_back/src/lib.rs +++ b/smartmodule/examples/filter_look_back/src/lib.rs @@ -1,11 +1,11 @@ use std::sync::atomic::{AtomicI32, Ordering::SeqCst}; -use fluvio_smartmodule::{smartmodule, Record, Result}; +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result}; static PREV: AtomicI32 = AtomicI32::new(0); #[smartmodule(filter)] -pub fn filter(record: &Record) -> Result { +pub fn filter(record: &SmartModuleRecord) -> Result { let string = std::str::from_utf8(record.value.as_ref())?; let current: i32 = string.parse()?; let last = PREV.load(SeqCst); @@ -18,7 +18,7 @@ pub fn filter(record: &Record) -> Result { } #[smartmodule(look_back)] -pub fn look_back(record: &Record) -> Result<()> { +pub fn look_back(record: &SmartModuleRecord) -> Result<()> { let string = std::str::from_utf8(record.value.as_ref())?; let last: i32 = string.parse()?; PREV.store(last, SeqCst); diff --git a/smartmodule/examples/filter_map/src/lib.rs b/smartmodule/examples/filter_map/src/lib.rs index 7af895f444..48178ec7ea 100644 --- a/smartmodule/examples/filter_map/src/lib.rs +++ b/smartmodule/examples/filter_map/src/lib.rs @@ -1,9 +1,9 @@ //! This SmartModule filters out all odd numbers, and divides all even numbers by 2. -use fluvio_smartmodule::{smartmodule, Record, RecordData, Result}; +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result}; #[smartmodule(filter_map)] -pub fn filter_map(record: &Record) -> Result, RecordData)>> { +pub fn filter_map(record: &SmartModuleRecord) -> Result, RecordData)>> { let key = record.key.clone(); let string = String::from_utf8_lossy(record.value.as_ref()).to_string(); let int: i32 = string.parse()?; diff --git a/smartmodule/examples/filter_odd/src/lib.rs b/smartmodule/examples/filter_odd/src/lib.rs index c671828613..cbd3c520c5 100644 --- a/smartmodule/examples/filter_odd/src/lib.rs +++ b/smartmodule/examples/filter_odd/src/lib.rs @@ -22,7 +22,7 @@ //! filter_odd/src/lib.rs:45:38 //! ``` -use fluvio_smartmodule::{smartmodule, Record, Result}; +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result}; #[derive(Debug, thiserror::Error)] pub enum SecondErrorWrapper { @@ -37,7 +37,7 @@ pub enum FirstErrorWrapper { } #[smartmodule(filter)] -pub fn filter(record: &Record) -> Result { +pub fn filter(record: &SmartModuleRecord) -> Result { let string = std::str::from_utf8(record.value.as_ref())?; let int = string .parse::() diff --git a/smartmodule/examples/filter_regex/src/lib.rs b/smartmodule/examples/filter_regex/src/lib.rs index 18ac797bda..1894abfa56 100644 --- a/smartmodule/examples/filter_regex/src/lib.rs +++ b/smartmodule/examples/filter_regex/src/lib.rs @@ -1,8 +1,8 @@ -use fluvio_smartmodule::{smartmodule, Record, Result}; +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result}; use regex::Regex; #[smartmodule(filter)] -pub fn filter(record: &Record) -> Result { +pub fn filter(record: &SmartModuleRecord) -> Result { let string = std::str::from_utf8(record.value.as_ref())?; // Check whether the Record contains a Social Security number diff --git a/smartmodule/examples/filter_with_param/src/lib.rs b/smartmodule/examples/filter_with_param/src/lib.rs index 6490d3dafd..41d658ba13 100644 --- a/smartmodule/examples/filter_with_param/src/lib.rs +++ b/smartmodule/examples/filter_with_param/src/lib.rs @@ -1,7 +1,7 @@ use std::sync::OnceLock; use fluvio_smartmodule::{ - smartmodule, Record, Result, eyre, + smartmodule, SmartModuleRecord, Result, eyre, dataplane::smartmodule::{SmartModuleExtraParams}, }; @@ -22,7 +22,7 @@ fn init(params: SmartModuleExtraParams) -> Result<()> { } #[smartmodule(filter)] -pub fn filter(record: &Record) -> Result { +pub fn filter(record: &SmartModuleRecord) -> Result { let string = std::str::from_utf8(record.value.as_ref())?; Ok(string.contains(CRITERIA.get().unwrap())) } diff --git a/smartmodule/examples/map/src/lib.rs b/smartmodule/examples/map/src/lib.rs index b1942543f4..de3e4d9e34 100644 --- a/smartmodule/examples/map/src/lib.rs +++ b/smartmodule/examples/map/src/lib.rs @@ -1,7 +1,7 @@ -use fluvio_smartmodule::{smartmodule, Record, RecordData, Result}; +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result}; #[smartmodule(map)] -pub fn map(record: &Record) -> Result<(Option, RecordData)> { +pub fn map(record: &SmartModuleRecord) -> Result<(Option, RecordData)> { let key = record.key.clone(); let mut value = Vec::from(record.value.as_ref()); diff --git a/smartmodule/examples/map_double/src/lib.rs b/smartmodule/examples/map_double/src/lib.rs index f3b32e86a0..a0def3064b 100644 --- a/smartmodule/examples/map_double/src/lib.rs +++ b/smartmodule/examples/map_double/src/lib.rs @@ -1,7 +1,7 @@ -use fluvio_smartmodule::{smartmodule, Record, RecordData, Result}; +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result}; #[smartmodule(map)] -pub fn map(record: &Record) -> Result<(Option, RecordData)> { +pub fn map(record: &SmartModuleRecord) -> Result<(Option, RecordData)> { let key = record.key.clone(); let string = std::str::from_utf8(record.value.as_ref())?; diff --git a/smartmodule/examples/map_json/src/lib.rs b/smartmodule/examples/map_json/src/lib.rs index bfdd653626..a14cfec740 100644 --- a/smartmodule/examples/map_json/src/lib.rs +++ b/smartmodule/examples/map_json/src/lib.rs @@ -1,7 +1,7 @@ -use fluvio_smartmodule::{smartmodule, Record, RecordData, Result}; +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result}; #[smartmodule(map)] -pub fn map(record: &Record) -> Result<(Option, RecordData)> { +pub fn map(record: &SmartModuleRecord) -> Result<(Option, RecordData)> { let json = serde_json::from_slice::(record.value.as_ref())?; let yaml_bytes = serde_yaml::to_string(&json)?.into_bytes(); diff --git a/smartmodule/examples/map_regex/src/lib.rs b/smartmodule/examples/map_regex/src/lib.rs index 42a1594b1b..58b4d99dcc 100644 --- a/smartmodule/examples/map_regex/src/lib.rs +++ b/smartmodule/examples/map_regex/src/lib.rs @@ -1,11 +1,11 @@ use regex::Regex; use once_cell::sync::Lazy; -use fluvio_smartmodule::{smartmodule, Result, Record, RecordData}; +use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData}; static SSN_RE: Lazy = Lazy::new(|| Regex::new(r"\d{3}-\d{2}-\d{4}").unwrap()); #[smartmodule(map)] -pub fn map(record: &Record) -> Result<(Option, RecordData)> { +pub fn map(record: &SmartModuleRecord) -> Result<(Option, RecordData)> { let key = record.key.clone(); let string = std::str::from_utf8(record.value.as_ref())?; diff --git a/smartmodule/regex-filter/src/lib.rs b/smartmodule/regex-filter/src/lib.rs index 486bcd25cb..48297e8df2 100644 --- a/smartmodule/regex-filter/src/lib.rs +++ b/smartmodule/regex-filter/src/lib.rs @@ -4,7 +4,7 @@ use regex::Regex; use fluvio_smartmodule::{ - smartmodule, Record, Result, eyre, + smartmodule, SmartModuleRecord, Result, eyre, dataplane::smartmodule::{SmartModuleExtraParams, SmartModuleInitError}, }; @@ -22,7 +22,7 @@ fn init(params: SmartModuleExtraParams) -> Result<()> { } #[smartmodule(filter)] -pub fn filter(record: &Record) -> Result { +pub fn filter(record: &SmartModuleRecord) -> Result { let string = std::str::from_utf8(record.value.as_ref())?; Ok(REGEX.get().unwrap().is_match(string)) }