Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: clone smdk test to fluvio sm test #3559

Merged
merged 3 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions crates/fluvio-cli-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,32 @@ path = "src/lib.rs"
default = ["fluvio-types"]
file-records = ["fluvio-protocol/record", "fluvio-protocol/api"]
version-cmd = ["dep:current_platform", "dep:clap", "dep:sysinfo"]
smartmodule-test = ["file-records", "dep:fluvio-sc-schema", "dep:fluvio-smartmodule", "dep:fluvio", "dep:fluvio-smartengine", "dep:clap"]

[dependencies]
anyhow = { workspace = true }
current_platform = { workspace = true, optional = true }
clap = { workspace = true, optional = true }
bytes = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true, features = ["std", "io-compat"]}
home = { workspace = true }
hex = { workspace = true }
http = { workspace = true }


semver = { workspace = true }
sha2 = { workspace = true }
sysinfo = { workspace = true, optional = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }

fluvio = { path = "../fluvio", optional = true, default-features = false }
fluvio-package-index = { workspace = true, features = ["http_agent"] }
fluvio-types = { workspace = true , optional = true }
fluvio-protocol = { workspace = true, optional = true }
fluvio-sc-schema = { path = "../fluvio-sc-schema", optional = true }
fluvio-smartmodule = { path = "../fluvio-smartmodule", optional = true, default-features = false }
fluvio-smartengine = { path = "../fluvio-smartengine", optional = true, features = ["transformation"] }

[target.'cfg(not(target_os = "macos"))'.dependencies]
isahc = { version = "1.7", default-features = false, features = ["static-curl"] }
Expand Down
4 changes: 4 additions & 0 deletions crates/fluvio-cli-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ pub mod error;

#[cfg(feature = "file-records")]
pub mod user_input;

#[cfg(feature = "version-cmd")]
pub mod version_cmd;

#[cfg(feature = "smartmodule-test")]
pub mod smartmodule;

// Environment vars for Channels
pub const FLUVIO_RELEASE_CHANNEL: &str = "FLUVIO_RELEASE_CHANNEL";
pub const FLUVIO_EXTENSIONS_DIR: &str = "FLUVIO_EXTENSIONS_DIR";
Expand Down
266 changes: 266 additions & 0 deletions crates/fluvio-cli-common/src/smartmodule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
use std::convert::TryInto;
use std::fmt::Debug;
use std::io;
use std::path::PathBuf;

use anyhow::{Result, Context, anyhow};
use bytes::Bytes;
use clap::Args;
use chrono::Utc;
use tracing::debug;

use fluvio::FluvioConfig;
use fluvio_sc_schema::smartmodule::SmartModuleApiClient;
use fluvio_smartengine::DEFAULT_SMARTENGINE_VERSION;
use fluvio_smartengine::metrics::SmartModuleChainMetrics;
use fluvio_smartengine::transformation::TransformationConfig;
use fluvio_smartengine::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleChainInstance, Lookback,
};
use fluvio_smartmodule::dataplane::smartmodule::SmartModuleInput;
use fluvio_protocol::record::Record;
use crate::user_input::{UserInputRecords, UserInputType};

/// Test SmartModule
#[derive(Debug, Args)]
pub struct BaseTestCmd {
/// Provide test input with this flag
#[arg(long, group = "TestInput")]
pub text: Option<String>,

/// Read the test input from the StdIn (e.g. Unix piping)
#[arg(long, group = "TestInput")]
pub stdin: bool,

/// Path to test file. Default: Read file line by line
#[arg(long, groups = ["TestInput", "TestFile"])]
pub file: Option<PathBuf>,

/// Read the file as single record
#[arg(long, requires = "TestFile")]
pub raw: bool,

/// Key to use with the test record(s)
pub key: Option<String>,

/// Print records in "[key] value" format, with "[null]" for no key
#[arg(short, long)]
pub key_value: bool,

/// (Optional) Extra input parameters passed to the smartmodule module.
/// They should be passed using key=value format
/// Eg. fluvio consume topic-name --filter filter.wasm -e foo=bar -e key=value -e one=1
#[arg(
short = 'e',
long= "params",
value_parser=parse_key_val,
num_args = 1,
conflicts_with_all = ["transforms_file", "transform"]
)]
pub params: Vec<(String, String)>,

/// (Optional) File path to transformation specification.
#[arg(long, group = "TestSmartModule")]
pub transforms_file: Option<PathBuf>,

/// (Optional) Pass transformation specification as JSON formatted string.
/// E.g. smdk test --text '{}' --transform='{"uses":"infinyon/[email protected]","with":{"spec":"[{\"operation\":\"default\",\"spec\":{\"source\":\"test\"}}]"}}'
#[arg(long, short, group = "TestSmartModule")]
pub transform: Vec<String>,

/// verbose output
#[arg(short = 'v', long = "verbose")]
pub verbose: bool,

/// Records which act as existing in the topic before the SmartModule starts processing. Useful
/// for testing `lookback`. Multiple values are allowed.
#[arg(long, short)]
pub record: Vec<String>,

/// Sets the lookback parameter to the last N records.
#[arg(long, short)]
pub lookback_last: Option<u64>,
}

fn parse_key_val(s: &str) -> Result<(String, String)> {
let pos = s
.find('=')
.ok_or_else(|| anyhow::anyhow!(format!("invalid KEY=value: no `=` found in `{s}`")))?;
Ok((s[..pos].parse()?, s[pos + 1..].parse()?))
}

impl BaseTestCmd {
pub async fn process<F>(self, with_chain_builder: WithChainBuilder<F>) -> Result<()>
where
F: FnOnce(Option<Lookback>, Vec<(String, String)>) -> Result<SmartModuleChainBuilder>,
{
debug!("starting smartmodule test");

let chain_builder = with_chain_builder
.build(
self.lookback_last,
self.transforms_file,
self.transform,
self.params,
)
.await?;

let engine = SmartEngine::new();
debug!("SmartModule chain created");

let mut chain = chain_builder.initialize(&engine)?;
look_back(&mut chain, self.record).await?;

let key = self.key.map(Bytes::from);

let test_data: UserInputRecords = if let Some(data) = self.text {
UserInputRecords::try_from(UserInputType::Text {
key,
data: Bytes::from(data),
})?
} else if let Some(test_file_path) = &self.file {
let path = test_file_path.to_path_buf();
if self.raw {
UserInputRecords::try_from(UserInputType::File { key, path })?
} else {
UserInputRecords::try_from(UserInputType::FileByLine { key, path })?
}
} else if self.stdin {
let mut buf = String::new();
io::stdin().read_line(&mut buf)?;
UserInputRecords::try_from(UserInputType::StdIn {
key,
data: buf.into(),
})?
} else {
return Err(anyhow::anyhow!("No valid input provided"));
};
debug!(len = &test_data.len(), "input data");

let metrics = SmartModuleChainMetrics::default();

let test_records: Vec<Record> = test_data.into();
let mut sm_input =
SmartModuleInput::try_from_records(test_records, DEFAULT_SMARTENGINE_VERSION)?;

sm_input.set_base_timestamp(Utc::now().timestamp_millis());

let output = chain.process(sm_input, &metrics)?;

if self.verbose {
println!("{:?} records outputed", output.successes.len());
}
for output_record in output.successes {
let output_value = if self.key_value {
format!(
"[{formatted_key}] {value}",
formatted_key = if let Some(key) = output_record.key() {
key.to_string()
} else {
"null".to_string()
},
value = output_record.value.as_str()?
)
} else {
output_record.value.as_str()?.to_string()
};

println!("{output_value}");
}

Ok(())
}
}

async fn look_back(chain: &mut SmartModuleChainInstance, records: Vec<String>) -> Result<()> {
let records: Vec<Record> = records
.into_iter()
.map(|r| Record::new(r.as_str()))
.collect();
chain
.look_back(
|lookback| {
let n = match lookback {
fluvio_smartengine::Lookback::Last(n) => n,
fluvio_smartengine::Lookback::Age { age: _, last } => last,
};
let res = Ok(records
.clone()
.into_iter()
.rev()
.take(n as usize)
.rev()
.collect());
async { res }
},
&Default::default(),
)
.await
}

#[derive(Debug)]
pub struct WithChainBuilder<F> {
func: Option<F>,
}

impl<F> Default for WithChainBuilder<F> {
fn default() -> Self {
Self {
func: Default::default(),
}
}
}

impl<F> WithChainBuilder<F>
where
F: FnOnce(Option<Lookback>, Vec<(String, String)>) -> Result<SmartModuleChainBuilder>,
{
async fn build(
self,
lookback_last: Option<u64>,
transforms_file: Option<PathBuf>,
transform: Vec<String>,
params: Vec<(String, String)>,
) -> Result<SmartModuleChainBuilder> {
let lookback = lookback_last.map(Lookback::Last);
if let Some(transforms_file) = transforms_file {
let config = TransformationConfig::from_file(transforms_file)
.context("unable to read transformation config")?;
build_chain(config, lookback).await
} else if !transform.is_empty() {
let config =
TransformationConfig::try_from(transform).context("unable to parse transform")?;
build_chain(config, lookback).await
} else {
debug_assert!(self.func.is_some(), "unknown condition");
self.func.map(|f| f(lookback, params)).unwrap()
}
}

pub fn extra_cond(mut self, func: F) -> Self {
self.func = Some(func);
self
}
}

async fn build_chain(
config: TransformationConfig,
lookback: Option<Lookback>,
) -> Result<SmartModuleChainBuilder> {
let client_config = FluvioConfig::load()?.try_into()?;
let api_client = SmartModuleApiClient::connect_with_config(client_config).await?;
let mut chain_builder = SmartModuleChainBuilder::default();
for transform in config.transforms {
debug!(?transform, "fetching");
let wasm = api_client
.get(transform.uses.clone())
.await?
.ok_or_else(|| anyhow!("smartmodule {} not found", &transform.uses))?
.wasm
.as_raw_wasm()?;
let mut config = SmartModuleConfig::from(transform);
config.set_lookback(lookback);
chain_builder.add_smart_module(config, wasm);
}
Ok(chain_builder)
}
5 changes: 4 additions & 1 deletion crates/fluvio-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ fluvio-command = { workspace = true }
fluvio-package-index = { workspace = true }
fluvio-extension-common = { workspace = true, features = ["target"] }
fluvio-channel = { workspace = true }
fluvio-cli-common = { workspace = true }
fluvio-hub-util = { workspace = true, features = ["connector-cmds"] }
fluvio-cli-common = { workspace = true }
fluvio-smartengine = { workspace = true, features = ["transformation"]}
fluvio-protocol = { workspace = true, features=["record","api"] }
fluvio-smartmodule = { workspace = true }
Expand All @@ -101,6 +101,9 @@ fluvio-future = { workspace = true, features = ["fs", "io", "subscriber", "nativ
fluvio-sc-schema = { workspace = true, features = ["use_serde"], optional = true }
fluvio-spu-schema = { workspace = true, optional = true }

# smartmodule depends on cranelift, which is not available for `arm`
[target.'cfg(not(target_arch = "arm"))'.dependencies]
fluvio-cli-common = { workspace = true, features = ["smartmodule-test"] }

[dev-dependencies]
fluvio-future = { workspace = true, features = ["fixture"] }
Loading
Loading