From f4b9622d616fcf8612ad553428ec5ecda9b3eba5 Mon Sep 17 00:00:00 2001 From: Consoli Date: Wed, 8 Nov 2023 22:04:40 -0300 Subject: [PATCH] feat: add support for custom metadata in cluster configuration (#3667) * Add support for cluster annotations * Add metadata to cluster * Bump crate version * Add cluster metadata path methods * Simplify metadata implementation --- Cargo.lock | 2 +- crates/fluvio/Cargo.toml | 4 +- crates/fluvio/src/config/cluster.rs | 277 ++++++++++++++++++- crates/fluvio/src/config/config.rs | 14 +- crates/fluvio/test-data/profiles/config.toml | 61 ++-- 5 files changed, 317 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1cc6c3f2c4..8a8a30d3b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2459,7 +2459,7 @@ dependencies = [ [[package]] name = "fluvio" -version = "0.21.1" +version = "0.21.2" dependencies = [ "anyhow", "async-channel", diff --git a/crates/fluvio/Cargo.toml b/crates/fluvio/Cargo.toml index 39b8a4abf7..eb99ee29f7 100644 --- a/crates/fluvio/Cargo.toml +++ b/crates/fluvio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio" -version = "0.21.1" +version = "0.21.2" edition = "2021" license = "Apache-2.0" authors = ["Fluvio Contributors "] @@ -44,7 +44,7 @@ pin-project = { workspace = true } siphasher = { workspace = true } -toml = { workspace = true, features = ["display"] } +toml = { workspace = true, features = ["display", "preserve_order"] } tracing = { workspace = true } # Fluvio dependencies diff --git a/crates/fluvio/src/config/cluster.rs b/crates/fluvio/src/config/cluster.rs index 2a58ec433f..88f84f330c 100644 --- a/crates/fluvio/src/config/cluster.rs +++ b/crates/fluvio/src/config/cluster.rs @@ -4,6 +4,7 @@ //! Stores configuration parameter retrieved from the default or custom profile file. //! use serde::{Serialize, Deserialize}; +use toml::Table as Metadata; use crate::{config::TlsPolicy, FluvioError}; @@ -11,7 +12,7 @@ use super::ConfigFile; /// Fluvio Cluster Target Configuration /// This is part of profile -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[non_exhaustive] pub struct FluvioConfig { /// The address to connect to the Fluvio cluster @@ -29,6 +30,10 @@ pub struct FluvioConfig { #[serde(default)] pub tls: TlsPolicy, + /// Cluster custom metadata + #[serde(default = "Metadata::new", skip_serializing_if = "Metadata::is_empty")] + metadata: Metadata, + /// This is not part of profile and doesn't persist. /// It is purely to override client id when creating ClientConfig #[serde(skip)] @@ -49,6 +54,7 @@ impl FluvioConfig { endpoint: addr.into(), use_spu_local_address: false, tls: TlsPolicy::Disabled, + metadata: Metadata::new(), client_id: None, } } @@ -58,6 +64,33 @@ impl FluvioConfig { self.tls = tls.into(); self } + + pub fn query_metadata_by_name<'de, T>(&self, name: &str) -> Option + where + T: Deserialize<'de>, + { + let metadata = self.metadata.get(name)?; + + T::deserialize(metadata.clone()).ok() + } + + pub fn update_metadata_by_name(&mut self, name: &str, data: S) -> anyhow::Result<()> + where + S: Serialize, + { + use toml::{Value, map::Entry}; + + match self.metadata.entry(name) { + Entry::Vacant(entry) => { + entry.insert(Value::try_from(data)?); + } + Entry::Occupied(mut entry) => { + *entry.get_mut() = Value::try_from(data)?; + } + } + + Ok(()) + } } impl TryFrom for fluvio_socket::ClientConfig { @@ -71,3 +104,245 @@ impl TryFrom for fluvio_socket::ClientConfig { )) } } + +#[cfg(test)] +mod test_metadata { + use serde::{Deserialize, Serialize}; + use crate::config::{Config, ConfigFile}; + + #[test] + fn test_get_metadata_path() { + let toml = r#"version = "2" +[profile.local] +cluster = "local" + +[cluster.local] +endpoint = "127.0.0.1:9003" + +[cluster.local.metadata.custom] +name = "foo" +"#; + let profile: Config = toml::de::from_str(toml).unwrap(); + let config = profile.cluster("local").unwrap(); + + #[derive(Deserialize, Debug, PartialEq)] + struct Custom { + name: String, + } + + let custom: Option = config.query_metadata_by_name("custom"); + + assert_eq!( + custom, + Some(Custom { + name: "foo".to_owned() + }) + ); + } + + #[test] + fn test_create_metadata() { + let toml = r#"version = "2" +[profile.local] +cluster = "local" + +[cluster.local] +endpoint = "127.0.0.1:9003" +"#; + let mut profile: Config = toml::de::from_str(toml).unwrap(); + let config = profile.cluster_mut("local").unwrap(); + + #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] + struct Preference { + connection: String, + } + + let preference = Preference { + connection: "wired".to_owned(), + }; + + config + .update_metadata_by_name("preference", preference.clone()) + .expect("failed to add metadata"); + + let metadata = config.query_metadata_by_name("preference").expect(""); + + assert_eq!(preference, metadata); + } + + #[test] + fn test_update_old_metadata() { + let toml = r#"version = "2" +[profile.local] +cluster = "local" + +[cluster.local] +endpoint = "127.0.0.1:9003" + +[cluster.local.metadata.installation] +type = "local" +"#; + let mut profile: Config = toml::de::from_str(toml).unwrap(); + let config = profile.cluster_mut("local").unwrap(); + + #[derive(Debug, Serialize, Deserialize, PartialEq)] + struct Installation { + #[serde(rename = "type")] + typ: String, + } + + let mut install = config + .query_metadata_by_name::("installation") + .expect("message"); + + assert_eq!( + install, + Installation { + typ: "local".to_owned() + } + ); + + install.typ = "cloud".to_owned(); + + config + .update_metadata_by_name("installation", install) + .expect("failed to add metadata"); + + let metadata = config + .query_metadata_by_name::("installation") + .expect("could not get Installation metadata"); + + assert_eq!("cloud", metadata.typ); + } + + #[test] + fn test_update_with_new_metadata() { + let toml = r#"version = "2" +[profile.local] +cluster = "local" + +[cluster.local] +endpoint = "127.0.0.1:9003" + +[cluster.local.metadata.installation] +type = "local" +"#; + let mut profile: Config = toml::de::from_str(toml).unwrap(); + let config = profile.cluster_mut("local").unwrap(); + + #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] + struct Preference { + connection: String, + } + + let preference = Preference { + connection: "wired".to_owned(), + }; + + config + .update_metadata_by_name("preference", preference.clone()) + .expect("failed to add metadata"); + + #[derive(Debug, Serialize, Deserialize, PartialEq)] + struct Installation { + #[serde(rename = "type")] + typ: String, + } + + let installation: Installation = config + .query_metadata_by_name("installation") + .expect("could not get installation metadata"); + assert_eq!(installation.typ, "local"); + + let preference: Preference = config + .query_metadata_by_name("preference") + .expect("could not get preference metadata"); + assert_eq!(preference.connection, "wired"); + } + + #[test] + fn test_profile_with_metadata() { + let config_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned())) + .expect("could not parse config file"); + let config = config_file.config(); + + let cluster = config + .cluster("extra") + .expect("could not find `extra` cluster in test file"); + + let table = toml::toml! { + [deep.nesting.example] + key = "custom field" + + [installation] + type = "local" + } + .into(); + + assert_eq!(cluster.metadata, table); + } + + #[test] + fn test_save_updated_metadata() { + #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] + struct Installation { + #[serde(rename = "type")] + typ: String, + } + + let mut config_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned())) + .expect("could not parse config file"); + let config = config_file.mut_config(); + + let cluster = config + .cluster_mut("updated") + .expect("could not find `updated` cluster in test file"); + + let table: toml::Table = toml::toml! { + [installation] + type = "local" + } + .into(); + assert_eq!(cluster.metadata, table); + + cluster + .update_metadata_by_name( + "installation", + Installation { + typ: "cloud".to_owned(), + }, + ) + .expect("should have updated key"); + + let updated_table: toml::Table = toml::toml! { + [installation] + type = "cloud" + } + .into(); + + assert_eq!(cluster.metadata, updated_table.clone()); + + config_file.save().expect("failed to save config file"); + + let mut config_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned())) + .expect("could not parse config file"); + let config = config_file.mut_config(); + let cluster = config + .cluster_mut("updated") + .expect("could not find `updated` cluster in test file"); + assert_eq!(cluster.metadata, updated_table); + + cluster + .update_metadata_by_name( + "installation", + Installation { + typ: "local".to_owned(), + }, + ) + .expect("teardown: failed to set installation type back to local"); + + config_file + .save() + .expect("teardown: failed to set installation type back to local"); + } +} diff --git a/crates/fluvio/src/config/config.rs b/crates/fluvio/src/config/config.rs index 5be5c3fe42..0cbf7190a6 100644 --- a/crates/fluvio/src/config/config.rs +++ b/crates/fluvio/src/config/config.rs @@ -137,7 +137,7 @@ impl ConfigFile { &mut self.config } - // save to file + /// Save to file pub fn save(&self) -> Result<(), FluvioError> { create_dir_all(self.path.parent().unwrap()) .map_err(|e| config_file_error(&format!("parent {:?}", self.path), e))?; @@ -191,7 +191,7 @@ impl ConfigFile { pub const LOCAL_PROFILE: &str = "local"; const CONFIG_VERSION: &str = "2.0"; -#[derive(Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize)] pub struct Config { version: String, current_profile: Option, @@ -250,7 +250,7 @@ impl Config { /// current profile pub fn current_profile_name(&self) -> Option<&str> { - self.current_profile.as_ref().map(|c| c.as_ref()) + self.current_profile.as_deref() } /// set current profile, if profile doesn't exists return false @@ -559,14 +559,6 @@ pub mod test { ); } - /* - #[test] - fn test_topic_config() { - let conf_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned())).expect("parse failed"); - let config = conf_file.config().resolve_replica_config("test3",0); - } - */ - #[test] fn test_local_cluster() { let config = Config::new_with_local_cluster("localhost:9003".to_owned()); diff --git a/crates/fluvio/test-data/profiles/config.toml b/crates/fluvio/test-data/profiles/config.toml index e651c450d2..d165691a11 100644 --- a/crates/fluvio/test-data/profiles/config.toml +++ b/crates/fluvio/test-data/profiles/config.toml @@ -1,42 +1,51 @@ version = "1.0" - current_profile = "local" -[cluster.local] -endpoint = "127.0.0.1:9003" - - -[cluster.ec2] -endpoint = "sandbox.xxxx.eksctl.io:9003" - +[profile.local3] +cluster = "local" +topic = "test3" +partition = 3 -# no default topic [profile.local] cluster = "local" -# use topic test3 as default [profile.local2] cluster = "local" topic = "test3" -# use topic test3 and partition 3 -[profile.local3] -cluster = "local" -topic = "test3" -partition = 3 -client_id = "local" +[cluster.updated] +endpoint = "127.0.0.1" +use_spu_local_address = false + +[cluster.updated.tls] +tls_policy = "disabled" + +[cluster.updated.metadata.installation] +type = "local" + +[cluster.extra] +endpoint = "127.0.0.1:9003" +use_spu_local_address = false +[cluster.extra.tls] +tls_policy = "disabled" -# default for all topics, this has lowest precedent -[topic."*"] -isolation = "uncommitted" -fetch_max_bytes = 10000 +[cluster.extra.metadata.deep.nesting.example] +key = "custom field" -# apply for all partition of test3 -[topic.test3] -isolation = "read_committed" +[cluster.extra.metadata.installation] +type = "local" +[cluster.local] +endpoint = "127.0.0.1:9003" +use_spu_local_address = false + +[cluster.local.tls] +tls_policy = "disabled" + +[cluster.ec2] +endpoint = "sandbox.xxxx.eksctl.io:9003" +use_spu_local_address = false -# only for topic test=3,replication=2 -[topic.test4.2] -isolation = "uncommitted" +[cluster.ec2.tls] +tls_policy = "disabled"