Skip to content

Commit

Permalink
chore: smartmodule latest interface adoption (#3661)
Browse files Browse the repository at this point in the history
Closes #3639

Co-authored-by: MuhtasimTanmoy <[email protected]>
  • Loading branch information
MuhtasimTanmoy and MuhtasimTanmoy committed Nov 2, 2023
1 parent 61455a4 commit 3985fb9
Show file tree
Hide file tree
Showing 23 changed files with 52 additions and 52 deletions.
12 changes: 6 additions & 6 deletions smartmodule/examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions smartmodule/examples/aggregate-average/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use serde::{Serialize, Deserialize};
use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};
use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData};

#[derive(Default, Serialize, Deserialize)]
struct IncrementalAverage {
Expand All @@ -22,7 +22,7 @@ impl IncrementalAverage {
}

#[smartmodule(aggregate)]
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> {
pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result<RecordData> {
// Parse the average from JSON
let mut average: IncrementalAverage =
serde_json::from_slice(accumulator.as_ref()).unwrap_or_default();
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/aggregate-init/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::OnceLock;

use fluvio_smartmodule::{
dataplane::smartmodule::SmartModuleExtraParams, smartmodule, Record, RecordData, Result,
dataplane::smartmodule::SmartModuleExtraParams, smartmodule, SmartModuleRecord, RecordData, Result,
};

static INITIAL_VALUE: OnceLock<UseOnce<RecordData>> = OnceLock::new();

const PARAM_NAME: &str = "initial_value";

#[smartmodule(aggregate)]
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> {
pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result<RecordData> {
let accumulator = if let Some(initial_value) = INITIAL_VALUE.get() {
initial_value.get_or(&accumulator)
} else {
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/aggregate-json/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::HashMap;
use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};
use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData};
use serde::{Serialize, Deserialize};

#[derive(Default, Serialize, Deserialize)]
Expand All @@ -20,7 +20,7 @@ impl std::ops::Add for GithubStars {
}

#[smartmodule(aggregate)]
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> {
pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result<RecordData> {
// Parse accumulator
let accumulated_stars: GithubStars =
serde_json::from_slice(accumulator.as_ref()).unwrap_or_default();
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/aggregate-sum/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};
use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData};

#[smartmodule(aggregate)]
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> {
pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result<RecordData> {
// Parse the accumulator and current record as strings
let accumulator_string = std::str::from_utf8(accumulator.as_ref())?;
let current_string = std::str::from_utf8(current.value.as_ref())?;
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/aggregate/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};
use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData};

/// This aggregate concatenate accumulator and current value
/// values: "a","b"
// accumulator: "1",
// "1a","1ab"
#[smartmodule(aggregate)]
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> {
pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result<RecordData> {
let mut acc = String::from_utf8(accumulator.as_ref().to_vec())?;
let next = std::str::from_utf8(current.value.as_ref())?;
acc.push_str(next);
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/array_map_json_array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
//! "Cranberry"
//! ```
use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};

#[smartmodule(array_map)]
pub fn array_map(record: &Record) -> Result<Vec<(Option<RecordData>, RecordData)>> {
pub fn array_map(record: &SmartModuleRecord) -> Result<Vec<(Option<RecordData>, RecordData)>> {
// Deserialize a JSON array with any kind of values inside
let array: Vec<serde_json::Value> = serde_json::from_slice(record.value.as_ref())?;

Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/array_map_json_object/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@
//! [c] "Cranberry"
//! ```
use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};
use serde_json::{Map, Value};

#[smartmodule(array_map)]
pub fn array_map(record: &Record) -> Result<Vec<(Option<RecordData>, RecordData)>> {
pub fn array_map(record: &SmartModuleRecord) -> Result<Vec<(Option<RecordData>, RecordData)>> {
// Deserialize a JSON object (Map) with any kind of values inside
let object: Map<String, Value> = serde_json::from_slice(record.value.as_ref())?;

Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/array_map_json_reddit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! zero or many output records. This example showcases taking a stream of Reddit API
//! responses and converting it into a stream of the individual posts.
use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};
use serde::{Serialize, Deserialize};

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -33,7 +33,7 @@ struct RedditPostData {
}

#[smartmodule(array_map)]
pub fn array_map(record: &Record) -> Result<Vec<(Option<RecordData>, RecordData)>> {
pub fn array_map(record: &SmartModuleRecord) -> Result<Vec<(Option<RecordData>, RecordData)>> {
// Deserialize a RedditListing from JSON
let listing: RedditListing = serde_json::from_slice(record.value.as_ref())?;

Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/filter/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use fluvio_smartmodule::{smartmodule, Record, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result};

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
Ok(string.contains('a'))
}
6 changes: 3 additions & 3 deletions smartmodule/examples/filter_hashset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ use std::{
};

use fluvio_smartmodule::{
smartmodule, Record, Result, dataplane::smartmodule::SmartModuleExtraParams, eyre,
smartmodule, SmartModuleRecord, Result, dataplane::smartmodule::SmartModuleExtraParams, eyre,
};

static SET: OnceLock<BoundedHashSet<String>> = OnceLock::new();

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
Ok(get_mut_set()?.insert(string.to_owned()))
}

#[smartmodule(look_back)]
pub fn look_back(record: &Record) -> Result<()> {
pub fn look_back(record: &SmartModuleRecord) -> Result<()> {
let string = std::str::from_utf8(record.value.as_ref())?;
get_mut_set()?.insert(string.to_owned());
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/filter_init/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::OnceLock;

use fluvio_smartmodule::{
smartmodule, Record, Result, eyre,
smartmodule, SmartModuleRecord, Result, eyre,
dataplane::smartmodule::{SmartModuleExtraParams, SmartModuleInitError},
};

Expand All @@ -19,7 +19,7 @@ fn init(params: SmartModuleExtraParams) -> Result<()> {
}

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
Ok(string.contains(CRITERIA.get().unwrap()))
}
4 changes: 2 additions & 2 deletions smartmodule/examples/filter_json/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
//! {"level":"error","message":"Unable to connect to database"}
//! ```
use fluvio_smartmodule::{smartmodule, Record, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result};

#[derive(PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
Expand All @@ -64,7 +64,7 @@ struct StructuredLog {
}

#[smartmodule(filter)]
pub fn filter_log_level(record: &Record) -> Result<bool> {
pub fn filter_log_level(record: &SmartModuleRecord) -> Result<bool> {
let log = serde_json::from_slice::<StructuredLog>(record.value.as_ref())?;
Ok(log.level > LogLevel::Debug)
}
6 changes: 3 additions & 3 deletions smartmodule/examples/filter_look_back/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::sync::atomic::{AtomicI32, Ordering::SeqCst};

use fluvio_smartmodule::{smartmodule, Record, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result};

static PREV: AtomicI32 = AtomicI32::new(0);

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
let current: i32 = string.parse()?;
let last = PREV.load(SeqCst);
Expand All @@ -18,7 +18,7 @@ pub fn filter(record: &Record) -> Result<bool> {
}

#[smartmodule(look_back)]
pub fn look_back(record: &Record) -> Result<()> {
pub fn look_back(record: &SmartModuleRecord) -> Result<()> {
let string = std::str::from_utf8(record.value.as_ref())?;
let last: i32 = string.parse()?;
PREV.store(last, SeqCst);
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/filter_map/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! This SmartModule filters out all odd numbers, and divides all even numbers by 2.
use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};

#[smartmodule(filter_map)]
pub fn filter_map(record: &Record) -> Result<Option<(Option<RecordData>, RecordData)>> {
pub fn filter_map(record: &SmartModuleRecord) -> Result<Option<(Option<RecordData>, RecordData)>> {
let key = record.key.clone();
let string = String::from_utf8_lossy(record.value.as_ref()).to_string();
let int: i32 = string.parse()?;
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/filter_odd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! filter_odd/src/lib.rs:45:38
//! ```
use fluvio_smartmodule::{smartmodule, Record, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result};

#[derive(Debug, thiserror::Error)]
pub enum SecondErrorWrapper {
Expand All @@ -37,7 +37,7 @@ pub enum FirstErrorWrapper {
}

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
let int = string
.parse::<i32>()
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/filter_regex/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use fluvio_smartmodule::{smartmodule, Record, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result};
use regex::Regex;

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;

// Check whether the Record contains a Social Security number
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/filter_with_param/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::OnceLock;

use fluvio_smartmodule::{
smartmodule, Record, Result, eyre,
smartmodule, SmartModuleRecord, Result, eyre,
dataplane::smartmodule::{SmartModuleExtraParams},
};

Expand All @@ -22,7 +22,7 @@ fn init(params: SmartModuleExtraParams) -> Result<()> {
}

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
Ok(string.contains(CRITERIA.get().unwrap()))
}
4 changes: 2 additions & 2 deletions smartmodule/examples/map/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};

#[smartmodule(map)]
pub fn map(record: &Record) -> Result<(Option<RecordData>, RecordData)> {
pub fn map(record: &SmartModuleRecord) -> Result<(Option<RecordData>, RecordData)> {
let key = record.key.clone();
let mut value = Vec::from(record.value.as_ref());

Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/map_double/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};

#[smartmodule(map)]
pub fn map(record: &Record) -> Result<(Option<RecordData>, RecordData)> {
pub fn map(record: &SmartModuleRecord) -> Result<(Option<RecordData>, RecordData)> {
let key = record.key.clone();

let string = std::str::from_utf8(record.value.as_ref())?;
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/map_json/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};

#[smartmodule(map)]
pub fn map(record: &Record) -> Result<(Option<RecordData>, RecordData)> {
pub fn map(record: &SmartModuleRecord) -> Result<(Option<RecordData>, RecordData)> {
let json = serde_json::from_slice::<serde_json::Value>(record.value.as_ref())?;
let yaml_bytes = serde_yaml::to_string(&json)?.into_bytes();

Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/map_regex/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use regex::Regex;
use once_cell::sync::Lazy;
use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};
use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData};

static SSN_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\d{3}-\d{2}-\d{4}").unwrap());

#[smartmodule(map)]
pub fn map(record: &Record) -> Result<(Option<RecordData>, RecordData)> {
pub fn map(record: &SmartModuleRecord) -> Result<(Option<RecordData>, RecordData)> {
let key = record.key.clone();

let string = std::str::from_utf8(record.value.as_ref())?;
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/regex-filter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use regex::Regex;


use fluvio_smartmodule::{
smartmodule, Record, Result, eyre,
smartmodule, SmartModuleRecord, Result, eyre,
dataplane::smartmodule::{SmartModuleExtraParams, SmartModuleInitError},
};

Expand All @@ -22,7 +22,7 @@ fn init(params: SmartModuleExtraParams) -> Result<()> {
}

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
Ok(REGEX.get().unwrap().is_match(string))
}

0 comments on commit 3985fb9

Please sign in to comment.