From e31fc040661bfc48caa3eb5f95bd7e9b908e4d09 Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Tue, 26 Sep 2023 04:16:17 -0300 Subject: [PATCH 1/3] Clone `smdk test` to `fluvio sm test` --- Cargo.lock | 9 +- crates/fluvio-cli-common/Cargo.toml | 6 + crates/fluvio-cli-common/src/lib.rs | 3 + crates/fluvio-cli-common/src/smartmodule.rs | 266 ++++++++++++++++++ .../fluvio-cli/src/client/smartmodule/mod.rs | 10 +- .../fluvio-cli/src/client/smartmodule/test.rs | 29 ++ crates/smartmodule-development-kit/Cargo.toml | 4 - .../smartmodule-development-kit/src/test.rs | 245 ++-------------- 8 files changed, 336 insertions(+), 236 deletions(-) create mode 100644 crates/fluvio-cli-common/src/smartmodule.rs create mode 100644 crates/fluvio-cli/src/client/smartmodule/test.rs diff --git a/Cargo.lock b/Cargo.lock index 8a8a30d3b6..d71154c70c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2644,11 +2644,17 @@ name = "fluvio-cli-common" version = "0.0.0" dependencies = [ "anyhow", + "bytes 1.5.0", + "chrono", "clap", "current_platform", + "fluvio", "fluvio-future", "fluvio-package-index", "fluvio-protocol", + "fluvio-sc-schema", + "fluvio-smartengine", + "fluvio-smartmodule", "fluvio-types", "futures 0.3.29", "hex", @@ -6970,10 +6976,8 @@ name = "smartmodule-development-kit" version = "0.0.0" dependencies = [ "anyhow", - "bytes 1.5.0", "cargo-builder", "cargo-generate", - "chrono", "clap", "current_platform", "dirs 5.0.1", @@ -6987,7 +6991,6 @@ dependencies = [ "fluvio-protocol", "fluvio-sc-schema", "fluvio-smartengine", - "fluvio-smartmodule", "include_dir", "lib-cargo-crate", "tempfile", diff --git a/crates/fluvio-cli-common/Cargo.toml b/crates/fluvio-cli-common/Cargo.toml index e2b6bda20f..bcf491a866 100644 --- a/crates/fluvio-cli-common/Cargo.toml +++ b/crates/fluvio-cli-common/Cargo.toml @@ -21,6 +21,8 @@ version-cmd = ["dep:current_platform", "dep:clap", "dep:sysinfo"] anyhow = { workspace = true } current_platform = { workspace = true, optional = true } clap = { workspace = true, optional = true } +bytes = { workspace = true } +chrono = { workspace = true } futures = { workspace = true, features = ["std", "io-compat"]} home = { workspace = true } hex = { workspace = true } @@ -34,9 +36,13 @@ tempfile = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } +fluvio = { path = "../fluvio", default-features = false } fluvio-package-index = { workspace = true, features = ["http_agent"] } fluvio-types = { workspace = true , optional = true } fluvio-protocol = { workspace = true, optional = true } +fluvio-sc-schema = { path = "../fluvio-sc-schema" } +fluvio-smartmodule = { path = "../fluvio-smartmodule", default-features = false } +fluvio-smartengine = { path = "../fluvio-smartengine", features = ["transformation"] } [target.'cfg(not(target_os = "macos"))'.dependencies] isahc = { version = "1.7", default-features = false, features = ["static-curl"] } diff --git a/crates/fluvio-cli-common/src/lib.rs b/crates/fluvio-cli-common/src/lib.rs index 57c7619edb..42034dfa98 100644 --- a/crates/fluvio-cli-common/src/lib.rs +++ b/crates/fluvio-cli-common/src/lib.rs @@ -4,9 +4,12 @@ pub mod error; #[cfg(feature = "file-records")] pub mod user_input; + #[cfg(feature = "version-cmd")] pub mod version_cmd; +pub mod smartmodule; + // Environment vars for Channels pub const FLUVIO_RELEASE_CHANNEL: &str = "FLUVIO_RELEASE_CHANNEL"; pub const FLUVIO_EXTENSIONS_DIR: &str = "FLUVIO_EXTENSIONS_DIR"; diff --git a/crates/fluvio-cli-common/src/smartmodule.rs b/crates/fluvio-cli-common/src/smartmodule.rs new file mode 100644 index 0000000000..28bb6d78d6 --- /dev/null +++ b/crates/fluvio-cli-common/src/smartmodule.rs @@ -0,0 +1,266 @@ +use std::convert::TryInto; +use std::fmt::Debug; +use std::io; +use std::path::PathBuf; + +use anyhow::{Result, Context, anyhow}; +use bytes::Bytes; +use clap::Args; +use chrono::Utc; +use tracing::debug; + +use fluvio::FluvioConfig; +use fluvio_sc_schema::smartmodule::SmartModuleApiClient; +use fluvio_smartengine::DEFAULT_SMARTENGINE_VERSION; +use fluvio_smartengine::metrics::SmartModuleChainMetrics; +use fluvio_smartengine::transformation::TransformationConfig; +use fluvio_smartengine::{ + SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleChainInstance, Lookback, +}; +use fluvio_smartmodule::dataplane::smartmodule::SmartModuleInput; +use fluvio_protocol::record::Record; +use crate::user_input::{UserInputRecords, UserInputType}; + +/// Test SmartModule +#[derive(Debug, Args)] +pub struct BaseTestCmd { + /// Provide test input with this flag + #[arg(long, group = "TestInput")] + pub text: Option, + + /// Read the test input from the StdIn (e.g. Unix piping) + #[arg(long, group = "TestInput")] + pub stdin: bool, + + /// Path to test file. Default: Read file line by line + #[arg(long, groups = ["TestInput", "TestFile"])] + pub file: Option, + + /// Read the file as single record + #[arg(long, requires = "TestFile")] + pub raw: bool, + + /// Key to use with the test record(s) + pub key: Option, + + /// Print records in "[key] value" format, with "[null]" for no key + #[arg(short, long)] + pub key_value: bool, + + /// (Optional) Extra input parameters passed to the smartmodule module. + /// They should be passed using key=value format + /// Eg. fluvio consume topic-name --filter filter.wasm -e foo=bar -e key=value -e one=1 + #[arg( + short = 'e', + long= "params", + value_parser=parse_key_val, + num_args = 1, + conflicts_with_all = ["transforms_file", "transform"] + )] + pub params: Vec<(String, String)>, + + /// (Optional) File path to transformation specification. + #[arg(long, group = "TestSmartModule")] + pub transforms_file: Option, + + /// (Optional) Pass transformation specification as JSON formatted string. + /// E.g. smdk test --text '{}' --transform='{"uses":"infinyon/jolt@0.1.0","with":{"spec":"[{\"operation\":\"default\",\"spec\":{\"source\":\"test\"}}]"}}' + #[arg(long, short, group = "TestSmartModule")] + pub transform: Vec, + + /// verbose output + #[arg(short = 'v', long = "verbose")] + pub verbose: bool, + + /// Records which act as existing in the topic before the SmartModule starts processing. Useful + /// for testing `lookback`. Multiple values are allowed. + #[arg(long, short)] + pub record: Vec, + + /// Sets the lookback parameter to the last N records. + #[arg(long, short)] + pub lookback_last: Option, +} + +fn parse_key_val(s: &str) -> Result<(String, String)> { + let pos = s + .find('=') + .ok_or_else(|| anyhow::anyhow!(format!("invalid KEY=value: no `=` found in `{s}`")))?; + Ok((s[..pos].parse()?, s[pos + 1..].parse()?)) +} + +impl BaseTestCmd { + pub async fn process(self, with_chain_builder: WithChainBuilder) -> Result<()> + where + F: FnOnce(Option, Vec<(String, String)>) -> Result, + { + debug!("starting smartmodule test"); + + let chain_builder = with_chain_builder + .build( + self.lookback_last, + self.transforms_file, + self.transform, + self.params, + ) + .await?; + + let engine = SmartEngine::new(); + debug!("SmartModule chain created"); + + let mut chain = chain_builder.initialize(&engine)?; + look_back(&mut chain, self.record).await?; + + let key = self.key.map(Bytes::from); + + let test_data: UserInputRecords = if let Some(data) = self.text { + UserInputRecords::try_from(UserInputType::Text { + key, + data: Bytes::from(data), + })? + } else if let Some(test_file_path) = &self.file { + let path = test_file_path.to_path_buf(); + if self.raw { + UserInputRecords::try_from(UserInputType::File { key, path })? + } else { + UserInputRecords::try_from(UserInputType::FileByLine { key, path })? + } + } else if self.stdin { + let mut buf = String::new(); + io::stdin().read_line(&mut buf)?; + UserInputRecords::try_from(UserInputType::StdIn { + key, + data: buf.into(), + })? + } else { + return Err(anyhow::anyhow!("No valid input provided")); + }; + debug!(len = &test_data.len(), "input data"); + + let metrics = SmartModuleChainMetrics::default(); + + let test_records: Vec = test_data.into(); + let mut sm_input = + SmartModuleInput::try_from_records(test_records, DEFAULT_SMARTENGINE_VERSION)?; + + sm_input.set_base_timestamp(Utc::now().timestamp_millis()); + + let output = chain.process(sm_input, &metrics)?; + + if self.verbose { + println!("{:?} records outputed", output.successes.len()); + } + for output_record in output.successes { + let output_value = if self.key_value { + format!( + "[{formatted_key}] {value}", + formatted_key = if let Some(key) = output_record.key() { + key.to_string() + } else { + "null".to_string() + }, + value = output_record.value.as_str()? + ) + } else { + output_record.value.as_str()?.to_string() + }; + + println!("{output_value}"); + } + + Ok(()) + } +} + +async fn look_back(chain: &mut SmartModuleChainInstance, records: Vec) -> Result<()> { + let records: Vec = records + .into_iter() + .map(|r| Record::new(r.as_str())) + .collect(); + chain + .look_back( + |lookback| { + let n = match lookback { + fluvio_smartengine::Lookback::Last(n) => n, + fluvio_smartengine::Lookback::Age { age: _, last } => last, + }; + let res = Ok(records + .clone() + .into_iter() + .rev() + .take(n as usize) + .rev() + .collect()); + async { res } + }, + &Default::default(), + ) + .await +} + +#[derive(Debug)] +pub struct WithChainBuilder { + func: Option, +} + +impl Default for WithChainBuilder { + fn default() -> Self { + Self { + func: Default::default(), + } + } +} + +impl WithChainBuilder +where + F: FnOnce(Option, Vec<(String, String)>) -> Result, +{ + async fn build( + self, + lookback_last: Option, + transforms_file: Option, + transform: Vec, + params: Vec<(String, String)>, + ) -> Result { + let lookback = lookback_last.map(Lookback::Last); + if let Some(transforms_file) = transforms_file { + let config = TransformationConfig::from_file(transforms_file) + .context("unable to read transformation config")?; + build_chain(config, lookback).await + } else if !transform.is_empty() { + let config = + TransformationConfig::try_from(transform).context("unable to parse transform")?; + build_chain(config, lookback).await + } else { + debug_assert!(self.func.is_some(), "unknown condition"); + self.func.map(|f| f(lookback, params)).unwrap() + } + } + + pub fn extra_cond(mut self, func: F) -> Self { + self.func = Some(func); + self + } +} + +async fn build_chain( + config: TransformationConfig, + lookback: Option, +) -> Result { + let client_config = FluvioConfig::load()?.try_into()?; + let api_client = SmartModuleApiClient::connect_with_config(client_config).await?; + let mut chain_builder = SmartModuleChainBuilder::default(); + for transform in config.transforms { + debug!(?transform, "fetching"); + let wasm = api_client + .get(transform.uses.clone()) + .await? + .ok_or_else(|| anyhow!("smartmodule {} not found", &transform.uses))? + .wasm + .as_raw_wasm()?; + let mut config = SmartModuleConfig::from(transform); + config.set_lookback(lookback); + chain_builder.add_smart_module(config, wasm); + } + Ok(chain_builder) +} diff --git a/crates/fluvio-cli/src/client/smartmodule/mod.rs b/crates/fluvio-cli/src/client/smartmodule/mod.rs index e71354ea3d..b3a5d97b67 100644 --- a/crates/fluvio-cli/src/client/smartmodule/mod.rs +++ b/crates/fluvio-cli/src/client/smartmodule/mod.rs @@ -2,6 +2,7 @@ mod create; mod list; mod delete; mod watch; +mod test; pub use cmd::SmartModuleCmd; @@ -11,7 +12,7 @@ mod cmd { use std::fmt::Debug; use async_trait::async_trait; - use clap::Parser; + use clap::Subcommand; use anyhow::Result; use fluvio::Fluvio; @@ -23,15 +24,17 @@ mod cmd { use super::create::CreateSmartModuleOpt; use super::list::ListSmartModuleOpt; use super::delete::DeleteSmartModuleOpt; + use super::test::TestSmartModuleOpt; use super::watch::WatchSmartModuleOpt; - #[derive(Debug, Parser)] + #[derive(Debug, Subcommand)] pub enum SmartModuleCmd { Create(CreateSmartModuleOpt), List(ListSmartModuleOpt), Watch(WatchSmartModuleOpt), /// Delete one or more SmartModules with the given name(s) Delete(DeleteSmartModuleOpt), + Test(TestSmartModuleOpt), } #[async_trait] @@ -54,6 +57,9 @@ mod cmd { Self::Watch(opt) => { opt.process(out, target).await?; } + Self::Test(opt) => { + opt.process(out, target).await?; + } } Ok(()) } diff --git a/crates/fluvio-cli/src/client/smartmodule/test.rs b/crates/fluvio-cli/src/client/smartmodule/test.rs new file mode 100644 index 0000000000..29672f78e3 --- /dev/null +++ b/crates/fluvio-cli/src/client/smartmodule/test.rs @@ -0,0 +1,29 @@ +use std::fmt::Debug; +use std::sync::Arc; + +use clap::Parser; +use fluvio::Fluvio; +use fluvio_cli_common::smartmodule::{BaseTestCmd, WithChainBuilder}; +use fluvio_extension_common::Terminal; + +use crate::client::cmd::ClientCmd; + +#[derive(Debug, Parser)] +#[command(arg_required_else_help = true)] +pub struct TestSmartModuleOpt { + #[clap(flatten)] + base: BaseTestCmd, +} + +#[async_trait::async_trait] +impl ClientCmd for TestSmartModuleOpt { + async fn process_client( + self, + _out: Arc, + _fluvio: &Fluvio, + ) -> anyhow::Result<()> { + self.base + .process:: _>(WithChainBuilder::default()) + .await + } +} diff --git a/crates/smartmodule-development-kit/Cargo.toml b/crates/smartmodule-development-kit/Cargo.toml index 364d6ef8c5..9d3e547441 100644 --- a/crates/smartmodule-development-kit/Cargo.toml +++ b/crates/smartmodule-development-kit/Cargo.toml @@ -14,11 +14,8 @@ path = "src/main.rs" doc = false [dependencies] -chrono = { workspace = true } tracing = { workspace = true } -thiserror = { workspace = true } anyhow = { workspace = true } -bytes = { workspace = true } clap = { workspace = true, features = ["std", "derive", "help", "usage", "error-context", "env", "wrap_help", "suggestions"], default-features = false } current_platform = { workspace = true } dirs = { workspace = true } @@ -36,7 +33,6 @@ fluvio-protocol = { path = "../fluvio-protocol", features=["record","api"] } fluvio-future = { workspace = true, features = ["subscriber"]} fluvio-smartengine = { path = "../fluvio-smartengine", features = ["transformation"] } fluvio-extension-common = { path = "../fluvio-extension-common", features = ["target"] } -fluvio-smartmodule = { path = "../fluvio-smartmodule", default-features = false } fluvio-controlplane-metadata = { path = "../fluvio-controlplane-metadata", features = ["smartmodule"] } fluvio-sc-schema = { path = "../fluvio-sc-schema" } fluvio-cli-common = { path = "../fluvio-cli-common", features = ["file-records", "version-cmd"]} diff --git a/crates/smartmodule-development-kit/src/test.rs b/crates/smartmodule-development-kit/src/test.rs index 6c1e190a72..c5cbd7ec00 100644 --- a/crates/smartmodule-development-kit/src/test.rs +++ b/crates/smartmodule-development-kit/src/test.rs @@ -1,104 +1,23 @@ -use std::collections::BTreeMap; -use std::convert::TryInto; use std::fmt::Debug; -use std::io; use std::path::PathBuf; -use anyhow::{Result, Context, anyhow}; -use bytes::Bytes; +use anyhow::Result; use cargo_builder::package::PackageInfo; use clap::Parser; -use chrono::Utc; -use tracing::debug; - -use fluvio::FluvioConfig; use fluvio_future::task::run_block_on; -use fluvio_sc_schema::smartmodule::SmartModuleApiClient; -use fluvio_smartengine::DEFAULT_SMARTENGINE_VERSION; -use fluvio_smartengine::metrics::SmartModuleChainMetrics; -use fluvio_smartengine::transformation::TransformationConfig; -use fluvio_smartengine::{ - SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleChainInstance, Lookback, -}; -use fluvio_smartmodule::dataplane::smartmodule::SmartModuleInput; -use fluvio_protocol::record::Record; -use fluvio_cli_common::user_input::{UserInputRecords, UserInputType}; - +use fluvio_smartengine::{SmartModuleChainBuilder, SmartModuleConfig, Lookback}; use crate::cmd::PackageCmd; -/// Test SmartModule +use fluvio_cli_common::smartmodule::{BaseTestCmd, WithChainBuilder}; #[derive(Debug, Parser)] +#[command(arg_required_else_help = true)] pub struct TestCmd { - /// Provide test input with this flag - #[arg(long, group = "TestInput")] - text: Option, - - /// Read the test input from the StdIn (e.g. Unix piping) - #[arg(long, group = "TestInput")] - stdin: bool, - - /// Path to test file. Default: Read file line by line - #[arg(long, groups = ["TestInput", "TestFile"])] - file: Option, - - /// Read the file as single record - #[arg(long, requires = "TestFile")] - raw: bool, - - /// Key to use with the test record(s) - key: Option, - - /// Print records in "[key] value" format, with "[null]" for no key - #[arg(short, long)] - key_value: bool, - + #[clap(flatten)] + base_test_cmd: BaseTestCmd, #[clap(flatten)] package: PackageCmd, - - /// Optional wasm file path #[arg(long, group = "TestSmartModule")] wasm_file: Option, - - /// (Optional) Extra input parameters passed to the smartmodule module. - /// They should be passed using key=value format - /// Eg. fluvio consume topic-name --filter filter.wasm -e foo=bar -e key=value -e one=1 - #[arg( - short = 'e', - long= "params", - value_parser=parse_key_val, - num_args = 1, - conflicts_with_all = ["transforms_file", "transform"] - )] - params: Vec<(String, String)>, - - /// (Optional) File path to transformation specification. - #[arg(long, group = "TestSmartModule")] - transforms_file: Option, - - /// (Optional) Pass transformation specification as JSON formatted string. - /// E.g. smdk test --text '{}' --transform='{"uses":"infinyon/jolt@0.1.0","with":{"spec":"[{\"operation\":\"default\",\"spec\":{\"source\":\"test\"}}]"}}' - #[arg(long, short, group = "TestSmartModule")] - transform: Vec, - - /// verbose output - #[arg(short = 'v', long = "verbose")] - verbose: bool, - - /// Records which act as existing in the topic before the SmartModule starts processing. Useful - /// for testing `lookback`. Multiple values are allowed. - #[arg(long, short)] - record: Vec, - - /// Sets the lookback parameter to the last N records. - #[arg(long, short)] - lookback_last: Option, -} - -fn parse_key_val(s: &str) -> Result<(String, String)> { - let pos = s - .find('=') - .ok_or_else(|| anyhow::anyhow!(format!("invalid KEY=value: no `=` found in `{s}`")))?; - Ok((s[..pos].parse()?, s[pos + 1..].parse()?)) } impl TestCmd { @@ -107,147 +26,18 @@ impl TestCmd { } async fn process_async(self) -> Result<()> { - debug!("starting smartmodule test"); - - let lookback: Option = self.lookback_last.map(Lookback::Last); - - let chain_builder = if let Some(transforms_file) = self.transforms_file { - let config = TransformationConfig::from_file(transforms_file) - .context("unable to read transformation config")?; - build_chain(config, lookback).await? - } else if !self.transform.is_empty() { - let config = TransformationConfig::try_from(self.transform) - .context("unable to parse transform")?; - build_chain(config, lookback).await? - } else if let Some(wasm_file) = self.wasm_file { - build_chain_ad_hoc( - crate::read_bytes_from_path(&wasm_file)?, - self.params, - lookback, - )? - } else { - let package_info = PackageInfo::from_options(&self.package.as_opt())?; - let wasm_file = package_info.target_wasm32_path()?; - build_chain_ad_hoc( - crate::read_bytes_from_path(&wasm_file)?, - self.params, - lookback, - )? - }; - - let engine = SmartEngine::new(); - debug!("SmartModule chain created"); - - let mut chain = chain_builder.initialize(&engine)?; - look_back(&mut chain, self.record).await?; - - let key = self.key.map(Bytes::from); - - let test_data: UserInputRecords = if let Some(data) = self.text { - UserInputRecords::try_from(UserInputType::Text { - key, - data: Bytes::from(data), - })? - } else if let Some(test_file_path) = &self.file { - let path = test_file_path.to_path_buf(); - if self.raw { - UserInputRecords::try_from(UserInputType::File { key, path })? - } else { - UserInputRecords::try_from(UserInputType::FileByLine { key, path })? - } - } else if self.stdin { - let mut buf = String::new(); - io::stdin().read_line(&mut buf)?; - UserInputRecords::try_from(UserInputType::StdIn { - key, - data: buf.into(), - })? - } else { - return Err(anyhow::anyhow!("No valid input provided")); - }; - debug!(len = &test_data.len(), "input data"); - - let metrics = SmartModuleChainMetrics::default(); - - let test_records: Vec = test_data.into(); - let mut sm_input = - SmartModuleInput::try_from_records(test_records, DEFAULT_SMARTENGINE_VERSION)?; - - sm_input.set_base_timestamp(Utc::now().timestamp_millis()); - - let output = chain.process(sm_input, &metrics)?; - - if self.verbose { - println!("{:?} records outputed", output.successes.len()); - } - for output_record in output.successes { - let output_value = if self.key_value { - format!( - "[{formatted_key}] {value}", - formatted_key = if let Some(key) = output_record.key() { - key.to_string() - } else { - "null".to_string() - }, - value = output_record.value.as_str()? - ) - } else { - output_record.value.as_str()?.to_string() - }; - - println!("{output_value}"); - } - - Ok(()) - } -} - -async fn look_back(chain: &mut SmartModuleChainInstance, records: Vec) -> Result<()> { - let records: Vec = records - .into_iter() - .map(|r| Record::new(r.as_str())) - .collect(); - chain - .look_back( - |lookback| { - let n = match lookback { - fluvio_smartengine::Lookback::Last(n) => n, - fluvio_smartengine::Lookback::Age { age: _, last } => last, + self.base_test_cmd + .process(WithChainBuilder::default().extra_cond(|lookback, params| { + let wasm_file = if let Some(wasm_file) = self.wasm_file { + wasm_file + } else { + let package_info = PackageInfo::from_options(&self.package.as_opt())?; + package_info.target_wasm32_path()? }; - let res = Ok(records - .clone() - .into_iter() - .rev() - .take(n as usize) - .rev() - .collect()); - async { res } - }, - &Default::default(), - ) - .await -} - -async fn build_chain( - config: TransformationConfig, - lookback: Option, -) -> Result { - let client_config = FluvioConfig::load()?.try_into()?; - let api_client = SmartModuleApiClient::connect_with_config(client_config).await?; - let mut chain_builder = SmartModuleChainBuilder::default(); - for transform in config.transforms { - debug!(?transform, "fetching"); - let wasm = api_client - .get(transform.uses.clone()) - .await? - .ok_or_else(|| anyhow!("smartmodule {} not found", &transform.uses))? - .wasm - .as_raw_wasm()?; - let mut config = SmartModuleConfig::from(transform); - config.set_lookback(lookback); - chain_builder.add_smart_module(config, wasm); + build_chain_ad_hoc(crate::read_bytes_from_path(&wasm_file)?, params, lookback) + })) + .await } - Ok(chain_builder) } fn build_chain_ad_hoc( @@ -255,7 +45,8 @@ fn build_chain_ad_hoc( params: Vec<(String, String)>, lookback: Option, ) -> Result { - let params: BTreeMap = params.into_iter().collect(); + use std::collections::BTreeMap; + let params: BTreeMap<_, _> = params.into_iter().collect(); Ok(SmartModuleChainBuilder::from(( SmartModuleConfig::builder() .params(params.into()) From 18389a8ac7592e7fa482337a6975ba0bf42969a9 Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Tue, 26 Sep 2023 16:47:01 -0300 Subject: [PATCH 2/3] Make smartmodule-test common an optional feature --- crates/fluvio-cli-common/Cargo.toml | 9 +++++---- crates/fluvio-cli-common/src/lib.rs | 1 + crates/fluvio-cli/Cargo.toml | 2 +- crates/smartmodule-development-kit/Cargo.toml | 2 +- tests/cli/fluvio_smoke_tests/smartmodule-basic.bats | 13 +++++++++++++ 5 files changed, 21 insertions(+), 6 deletions(-) diff --git a/crates/fluvio-cli-common/Cargo.toml b/crates/fluvio-cli-common/Cargo.toml index bcf491a866..f66736a1c6 100644 --- a/crates/fluvio-cli-common/Cargo.toml +++ b/crates/fluvio-cli-common/Cargo.toml @@ -16,6 +16,7 @@ path = "src/lib.rs" default = ["fluvio-types"] file-records = ["fluvio-protocol/record", "fluvio-protocol/api"] version-cmd = ["dep:current_platform", "dep:clap", "dep:sysinfo"] +smartmodule-test = ["file-records", "dep:fluvio-sc-schema", "dep:fluvio-smartmodule", "dep:fluvio", "dep:fluvio-smartengine", "dep:clap"] [dependencies] anyhow = { workspace = true } @@ -36,13 +37,13 @@ tempfile = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } -fluvio = { path = "../fluvio", default-features = false } +fluvio = { path = "../fluvio", optional = true, default-features = false } fluvio-package-index = { workspace = true, features = ["http_agent"] } fluvio-types = { workspace = true , optional = true } fluvio-protocol = { workspace = true, optional = true } -fluvio-sc-schema = { path = "../fluvio-sc-schema" } -fluvio-smartmodule = { path = "../fluvio-smartmodule", default-features = false } -fluvio-smartengine = { path = "../fluvio-smartengine", features = ["transformation"] } +fluvio-sc-schema = { path = "../fluvio-sc-schema", optional = true } +fluvio-smartmodule = { path = "../fluvio-smartmodule", optional = true, default-features = false } +fluvio-smartengine = { path = "../fluvio-smartengine", optional = true, features = ["transformation"] } [target.'cfg(not(target_os = "macos"))'.dependencies] isahc = { version = "1.7", default-features = false, features = ["static-curl"] } diff --git a/crates/fluvio-cli-common/src/lib.rs b/crates/fluvio-cli-common/src/lib.rs index 42034dfa98..c7efd8ec37 100644 --- a/crates/fluvio-cli-common/src/lib.rs +++ b/crates/fluvio-cli-common/src/lib.rs @@ -8,6 +8,7 @@ pub mod user_input; #[cfg(feature = "version-cmd")] pub mod version_cmd; +#[cfg(feature = "smartmodule-test")] pub mod smartmodule; // Environment vars for Channels diff --git a/crates/fluvio-cli/Cargo.toml b/crates/fluvio-cli/Cargo.toml index bcdc356e39..9133d24f93 100644 --- a/crates/fluvio-cli/Cargo.toml +++ b/crates/fluvio-cli/Cargo.toml @@ -88,8 +88,8 @@ fluvio-command = { workspace = true } fluvio-package-index = { workspace = true } fluvio-extension-common = { workspace = true, features = ["target"] } fluvio-channel = { workspace = true } -fluvio-cli-common = { workspace = true } fluvio-hub-util = { workspace = true, features = ["connector-cmds"] } +fluvio-cli-common = { workspace = true, features = ["smartmodule-test"] } fluvio-smartengine = { workspace = true, features = ["transformation"]} fluvio-protocol = { workspace = true, features=["record","api"] } fluvio-smartmodule = { workspace = true } diff --git a/crates/smartmodule-development-kit/Cargo.toml b/crates/smartmodule-development-kit/Cargo.toml index 9d3e547441..c860de2b72 100644 --- a/crates/smartmodule-development-kit/Cargo.toml +++ b/crates/smartmodule-development-kit/Cargo.toml @@ -35,5 +35,5 @@ fluvio-smartengine = { path = "../fluvio-smartengine", features = ["transformati fluvio-extension-common = { path = "../fluvio-extension-common", features = ["target"] } fluvio-controlplane-metadata = { path = "../fluvio-controlplane-metadata", features = ["smartmodule"] } fluvio-sc-schema = { path = "../fluvio-sc-schema" } -fluvio-cli-common = { path = "../fluvio-cli-common", features = ["file-records", "version-cmd"]} +fluvio-cli-common = { path = "../fluvio-cli-common", features = ["file-records", "version-cmd", "smartmodule-test"] } cargo-builder = { path = "../cargo-builder"} diff --git a/tests/cli/fluvio_smoke_tests/smartmodule-basic.bats b/tests/cli/fluvio_smoke_tests/smartmodule-basic.bats index 3e644b2de2..5e3304b42c 100644 --- a/tests/cli/fluvio_smoke_tests/smartmodule-basic.bats +++ b/tests/cli/fluvio_smoke_tests/smartmodule-basic.bats @@ -68,3 +68,16 @@ setup_file() { assert_failure assert_output --partial "SmartModule not found" } + +# fix CI authentication to hub service first: +# https://github.com/infinyon/fluvio/issues/3634 +# +# Download smartmodule from hub and test it without creating a new project +# @test "Test external smartmodule" { +# run timeout 15s "$FLUVIO_BIN" hub sm download "infinyon/regex-filter@0.1.0" +# assert_success +# run timeout 15s "$FLUVIO_BIN" sm test \ +# -t '{"uses":"infinyon/regex-filter@0.1.0", "with": {"regex": "^f"}}' \ +# --text 'fluvio' +# assert_output --partial fluvio +# } From a772ba363e9fc012e1b8a34581e919878f7438e9 Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Fri, 3 Nov 2023 13:56:11 -0300 Subject: [PATCH 3/3] Hide test command in arm target architectures For testing a smartmodule we need cranelift as a dependency, but cranelift does not have support for arm target architectures --- Cargo.lock | 1 - crates/fluvio-cli-common/Cargo.toml | 2 -- crates/fluvio-cli/Cargo.toml | 5 ++++- crates/fluvio-cli/src/client/smartmodule/mod.rs | 10 +++++++--- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d71154c70c..61fb17db1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6994,7 +6994,6 @@ dependencies = [ "include_dir", "lib-cargo-crate", "tempfile", - "thiserror", "toml 0.8.6", "tracing", ] diff --git a/crates/fluvio-cli-common/Cargo.toml b/crates/fluvio-cli-common/Cargo.toml index f66736a1c6..0d681e77b5 100644 --- a/crates/fluvio-cli-common/Cargo.toml +++ b/crates/fluvio-cli-common/Cargo.toml @@ -28,8 +28,6 @@ futures = { workspace = true, features = ["std", "io-compat"]} home = { workspace = true } hex = { workspace = true } http = { workspace = true } - - semver = { workspace = true } sha2 = { workspace = true } sysinfo = { workspace = true, optional = true } diff --git a/crates/fluvio-cli/Cargo.toml b/crates/fluvio-cli/Cargo.toml index 9133d24f93..02faa88f13 100644 --- a/crates/fluvio-cli/Cargo.toml +++ b/crates/fluvio-cli/Cargo.toml @@ -89,7 +89,7 @@ fluvio-package-index = { workspace = true } fluvio-extension-common = { workspace = true, features = ["target"] } fluvio-channel = { workspace = true } fluvio-hub-util = { workspace = true, features = ["connector-cmds"] } -fluvio-cli-common = { workspace = true, features = ["smartmodule-test"] } +fluvio-cli-common = { workspace = true } fluvio-smartengine = { workspace = true, features = ["transformation"]} fluvio-protocol = { workspace = true, features=["record","api"] } fluvio-smartmodule = { workspace = true } @@ -101,6 +101,9 @@ fluvio-future = { workspace = true, features = ["fs", "io", "subscriber", "nativ fluvio-sc-schema = { workspace = true, features = ["use_serde"], optional = true } fluvio-spu-schema = { workspace = true, optional = true } +# smartmodule depends on cranelift, which is not available for `arm` +[target.'cfg(not(target_arch = "arm"))'.dependencies] +fluvio-cli-common = { workspace = true, features = ["smartmodule-test"] } [dev-dependencies] fluvio-future = { workspace = true, features = ["fixture"] } diff --git a/crates/fluvio-cli/src/client/smartmodule/mod.rs b/crates/fluvio-cli/src/client/smartmodule/mod.rs index b3a5d97b67..c3b265683c 100644 --- a/crates/fluvio-cli/src/client/smartmodule/mod.rs +++ b/crates/fluvio-cli/src/client/smartmodule/mod.rs @@ -2,12 +2,15 @@ mod create; mod list; mod delete; mod watch; + +// testing a smartmodule depends on cranelift +// but cranelift is not available for arm architectures +#[cfg(not(target_arch = "arm"))] mod test; pub use cmd::SmartModuleCmd; mod cmd { - use std::sync::Arc; use std::fmt::Debug; @@ -24,7 +27,6 @@ mod cmd { use super::create::CreateSmartModuleOpt; use super::list::ListSmartModuleOpt; use super::delete::DeleteSmartModuleOpt; - use super::test::TestSmartModuleOpt; use super::watch::WatchSmartModuleOpt; #[derive(Debug, Subcommand)] @@ -34,7 +36,8 @@ mod cmd { Watch(WatchSmartModuleOpt), /// Delete one or more SmartModules with the given name(s) Delete(DeleteSmartModuleOpt), - Test(TestSmartModuleOpt), + #[cfg(not(target_arch = "arm"))] + Test(super::test::TestSmartModuleOpt), } #[async_trait] @@ -57,6 +60,7 @@ mod cmd { Self::Watch(opt) => { opt.process(out, target).await?; } + #[cfg(not(target_arch = "arm"))] Self::Test(opt) => { opt.process(out, target).await?; }