From b0f846dfd618c3e7ece53918ca87391c217016fa Mon Sep 17 00:00:00 2001 From: Consoli Date: Fri, 3 Nov 2023 16:26:12 +0000 Subject: [PATCH] feat: add support for cluster metadata (#3628) Add support for custom metadata in the cluster configuration. ```toml [cluster.my_favorite_cluster] endpoint = "127.0.0.1:9003" [cluster.my_favorite_cluster.metadata.installation] "fluvio.io/cluster-type" = "local-k8" ``` This will be important for other tasks, such as #3620. These changes are retrocompatible. Closes #3627 --- Cargo.lock | 2 +- crates/fluvio/Cargo.toml | 2 +- crates/fluvio/src/config/cluster.rs | 326 ++++++++++++++++++- crates/fluvio/src/config/config.rs | 76 ++++- crates/fluvio/test-data/profiles/config.toml | 61 ++-- 5 files changed, 427 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 77127a2c21..d1c11ccff6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2431,7 +2431,7 @@ dependencies = [ [[package]] name = "fluvio" -version = "0.21.1" +version = "0.21.2" dependencies = [ "anyhow", "async-channel", diff --git a/crates/fluvio/Cargo.toml b/crates/fluvio/Cargo.toml index 39b8a4abf7..193faaed05 100644 --- a/crates/fluvio/Cargo.toml +++ b/crates/fluvio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio" -version = "0.21.1" +version = "0.21.2" edition = "2021" license = "Apache-2.0" authors = ["Fluvio Contributors "] diff --git a/crates/fluvio/src/config/cluster.rs b/crates/fluvio/src/config/cluster.rs index 2a58ec433f..0bca363365 100644 --- a/crates/fluvio/src/config/cluster.rs +++ b/crates/fluvio/src/config/cluster.rs @@ -11,7 +11,7 @@ use super::ConfigFile; /// Fluvio Cluster Target Configuration /// This is part of profile -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[non_exhaustive] pub struct FluvioConfig { /// The address to connect to the Fluvio cluster @@ -29,6 +29,9 @@ pub struct FluvioConfig { #[serde(default)] pub tls: TlsPolicy, + /// Cluster custom metadata + pub metadata: Option, + /// This is not part of profile and doesn't persist. /// It is purely to override client id when creating ClientConfig #[serde(skip)] @@ -49,6 +52,7 @@ impl FluvioConfig { endpoint: addr.into(), use_spu_local_address: false, tls: TlsPolicy::Disabled, + metadata: None, client_id: None, } } @@ -58,6 +62,74 @@ impl FluvioConfig { self.tls = tls.into(); self } + + pub fn query_metadata_path<'de, T>(&self, path: &str) -> Option + where + T: Deserialize<'de>, + { + let mut metadata = self.metadata.as_ref().unwrap(); + + let (path, key) = { + let mut split = path.split(&['[', ']']); + (split.next().unwrap_or(path), split.next()) + }; + + for part in path.split('.') { + if let toml::Value::Table(table) = metadata { + metadata = table.get(part)?; + } + } + + if let Some(key) = key { + metadata = metadata.as_table()?.get(key)?; + } + + T::deserialize(metadata.clone()).ok() + } + + pub fn update_metadata_path(&mut self, path: &str, data: S) -> anyhow::Result<()> + where + S: Serialize, + { + use toml::{Value, map::Map}; + + let (path, key) = { + let mut split = path.split(&['[', ']']); + (split.next().unwrap_or(path), split.next()) + }; + + if let Some(mut metadata) = self.metadata.as_mut() { + for part in path.split('.') { + let Value::Table(table) = metadata else { + break; + }; + let nested = table + .entry(part) + .or_insert_with(|| Value::Table(Map::new())); + metadata = nested; + } + + if let Some(key) = key { + metadata = metadata + .as_table_mut() + .expect("metadata should be a table at this point") + .get_mut(key) + .ok_or_else(|| anyhow::anyhow!("key does not exist"))?; + } + + *metadata = Value::try_from(data)?; + } else { + // insert new metadata + if path.contains(|c| c == '.' || c == '[') { + return Err(anyhow::anyhow!("not supported")); + } + + let table = Map::from_iter([(path.to_string(), Value::try_from(data)?)]).into(); + self.metadata = Some(table); + } + + Ok(()) + } } impl TryFrom for fluvio_socket::ClientConfig { @@ -71,3 +143,255 @@ impl TryFrom for fluvio_socket::ClientConfig { )) } } + +#[cfg(test)] +mod test_metadata { + use serde::{Deserialize, Serialize}; + use crate::config::Config; + + #[test] + fn test_get_metadata_path() { + let toml = r#"version = "2" +[profile.local] +cluster = "name" + +[cluster.name] +endpoint = "127.0.0.1:9003" + +[cluster.name.metadata.custom] +name = "foo" +"#; + let profile: Config = toml::de::from_str(toml).unwrap(); + let config = profile.cluster("name").unwrap(); + + #[derive(Deserialize, Debug, PartialEq)] + struct Custom { + name: String, + } + + let custom: Option = config.query_metadata_path("custom"); + + assert_eq!( + custom, + Some(Custom { + name: "foo".to_owned() + }) + ); + } + + #[test] + fn test_query_specific_field_of_metadata() { + let toml = r#"version = "2" +[profile.local] +cluster = "name" + +[cluster.name] +endpoint = "127.0.0.1:9003" + +[cluster.name.metadata.deep.nested] +name = "foo" +"#; + let profile: Config = toml::de::from_str(toml).unwrap(); + let config = profile.cluster("name").unwrap(); + + let custom: Option = config.query_metadata_path("deep.nested[name]"); + + assert_eq!(custom, Some("foo".to_owned())); + } + + #[test] + fn test_create_metadata() { + let toml = r#"version = "2" +[profile.local] +cluster = "name" + +[cluster.name] +endpoint = "127.0.0.1:9003" +"#; + let mut profile: Config = toml::de::from_str(toml).unwrap(); + let config = profile.cluster_mut("name").unwrap(); + + #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] + struct Preference { + connection: String, + } + + let preference = Preference { + connection: "wired".to_owned(), + }; + + config + .update_metadata_path("preference", preference.clone()) + .expect("failed to add metadata"); + + let metadata = config.query_metadata_path("preference").expect(""); + + assert_eq!(preference, metadata); + } + + #[test] + fn test_update_old_metadata() { + let toml = r#"version = "2" +[profile.local] +cluster = "name" + +[cluster.name] +endpoint = "127.0.0.1:9003" + +[cluster.name.metadata.installation] +type = "local" +"#; + let mut profile: Config = toml::de::from_str(toml).unwrap(); + let config = profile.cluster_mut("name").unwrap(); + + #[derive(Debug, Serialize, Deserialize, PartialEq)] + struct Installation { + #[serde(rename = "type")] + typ: String, + } + + let mut install = config + .query_metadata_path::("installation") + .expect("message"); + + assert_eq!( + install, + Installation { + typ: "local".to_owned() + } + ); + + install.typ = "cloud".to_owned(); + + config + .update_metadata_path("installation", install) + .expect("failed to add metadata"); + + let metadata = config + .query_metadata_path::("installation") + .expect(""); + + assert_eq!("cloud", metadata.typ); + } + + #[test] + fn test_update_with_new_metadata() { + let toml = r#"version = "2" +[profile.local] +cluster = "name" + +[cluster.name] +endpoint = "127.0.0.1:9003" + +[cluster.name.metadata.installation] +type = "local" +"#; + let mut profile: Config = toml::de::from_str(toml).unwrap(); + let config = profile.cluster_mut("name").unwrap(); + + #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] + struct Preference { + connection: String, + } + + let preference = Preference { + connection: "wired".to_owned(), + }; + + config + .update_metadata_path("preference", preference.clone()) + .expect("failed to add metadata"); + + let installation_type: String = config + .query_metadata_path("installation.type") + .expect("could not get installation metadata"); + assert_eq!(installation_type, "local"); + + let preference_connection: String = config + .query_metadata_path("preference.connection") + .expect("could not get preference metadata"); + assert_eq!(preference_connection, "wired"); + } + + #[test] + fn test_update_specific_field() { + let toml = r#"version = "2" +[profile.local] +cluster = "name" + +[cluster.name] +endpoint = "127.0.0.1:9003" + +[cluster.name.metadata.installation] +type = "local" +"#; + let mut profile: Config = toml::de::from_str(toml).unwrap(); + let config = profile.cluster_mut("name").unwrap(); + + config + .update_metadata_path("installation[type]", "cloud") + .expect("could not find installation type field"); + + let installation_type: String = config + .query_metadata_path("installation[type]") + .expect("could not find installation type field"); + assert_eq!(installation_type, "cloud"); + } + + #[test] + fn test_create_dynamic_nested_field_errors() { + let toml = r#"version = "2" +[profile.local] +cluster = "name" + +[cluster.name] +endpoint = "127.0.0.1:9003" +"#; + let mut profile: Config = toml::de::from_str(toml).unwrap(); + let config = profile.cluster_mut("name").unwrap(); + + let update = config.update_metadata_path("deep.nested[field]", "value"); + + assert!(update.is_err()); + } + + #[test] + fn test_update_partial_existant_path_errors() { + let toml = r#"version = "2" +[profile.local] +cluster = "name" + +[cluster.name] +endpoint = "127.0.0.1:9003" + +[cluster.name.deep.nested] +key = "value" +"#; + let mut profile: Config = toml::de::from_str(toml).unwrap(); + let config = profile.cluster_mut("name").unwrap(); + + let update = config.update_metadata_path("deep.nonexistent[key]", "value"); + + assert!(update.is_err()); + } + + #[test] + fn test_update_path_with_no_key_errors() { + let toml = r#"version = "2" +[profile.local] +cluster = "name" + +[cluster.name] +endpoint = "127.0.0.1:9003" + +[cluster.name.deep.nested] +key = "value" +"#; + let mut profile: Config = toml::de::from_str(toml).unwrap(); + let config = profile.cluster_mut("name").unwrap(); + + let update = config.update_metadata_path("deep.nested[nonexistent]", "value"); + + assert!(update.is_err()); + } +} diff --git a/crates/fluvio/src/config/config.rs b/crates/fluvio/src/config/config.rs index 5be5c3fe42..36a57e3d50 100644 --- a/crates/fluvio/src/config/config.rs +++ b/crates/fluvio/src/config/config.rs @@ -137,7 +137,7 @@ impl ConfigFile { &mut self.config } - // save to file + /// Save to file pub fn save(&self) -> Result<(), FluvioError> { create_dir_all(self.path.parent().unwrap()) .map_err(|e| config_file_error(&format!("parent {:?}", self.path), e))?; @@ -191,7 +191,7 @@ impl ConfigFile { pub const LOCAL_PROFILE: &str = "local"; const CONFIG_VERSION: &str = "2.0"; -#[derive(Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize)] pub struct Config { version: String, current_profile: Option, @@ -250,7 +250,7 @@ impl Config { /// current profile pub fn current_profile_name(&self) -> Option<&str> { - self.current_profile.as_ref().map(|c| c.as_ref()) + self.current_profile.as_deref() } /// set current profile, if profile doesn't exists return false @@ -559,14 +559,6 @@ pub mod test { ); } - /* - #[test] - fn test_topic_config() { - let conf_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned())).expect("parse failed"); - let config = conf_file.config().resolve_replica_config("test3",0); - } - */ - #[test] fn test_local_cluster() { let config = Config::new_with_local_cluster("localhost:9003".to_owned()); @@ -575,4 +567,66 @@ pub mod test { let cluster = config.current_cluster().expect("cluster should exists"); assert_eq!(cluster.endpoint, "localhost:9003"); } + + #[test] + fn test_profile_with_metadata() { + let config_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned())) + .expect("could not parse config file"); + let config = config_file.config(); + + let cluster = config + .cluster("extra") + .expect("could not find `extra` cluster in test file"); + + let table = toml::toml! { + [deep.nesting.example] + key = "custom field" + + [installation] + type = "local" + } + .into(); + + assert_eq!(cluster.metadata, Some(table)); + } + + #[test] + fn test_save_updated_metadata() { + let mut config_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned())) + .expect("could not parse config file"); + let config = config_file.mut_config(); + + let cluster = config + .cluster_mut("updated") + .expect("could not find `updated` cluster in test file"); + + let table: toml::Value = toml::toml! { + [installation] + type = "local" + } + .into(); + assert_eq!(cluster.metadata, Some(table)); + + cluster + .update_metadata_path("installation[type]", "cloud") + .expect("should have updated key"); + + let updated_table: toml::Value = toml::toml! { + [installation] + type = "cloud" + } + .into(); + + assert_eq!(cluster.metadata, Some(updated_table.clone())); + + config_file.save().unwrap(); + + let config_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned())) + .expect("could not parse config file"); + let config = config_file.config(); + let cluster = config + .cluster("updated") + .expect("could not find `updated` cluster in test file"); + assert_eq!(cluster.metadata, Some(updated_table)); + } } diff --git a/crates/fluvio/test-data/profiles/config.toml b/crates/fluvio/test-data/profiles/config.toml index e651c450d2..b7d33a0643 100644 --- a/crates/fluvio/test-data/profiles/config.toml +++ b/crates/fluvio/test-data/profiles/config.toml @@ -1,42 +1,51 @@ version = "1.0" - current_profile = "local" -[cluster.local] -endpoint = "127.0.0.1:9003" - - -[cluster.ec2] -endpoint = "sandbox.xxxx.eksctl.io:9003" - - -# no default topic -[profile.local] -cluster = "local" - -# use topic test3 as default [profile.local2] cluster = "local" topic = "test3" -# use topic test3 and partition 3 [profile.local3] cluster = "local" topic = "test3" partition = 3 -client_id = "local" +[profile.local] +cluster = "local" + +[cluster.updated] +endpoint = "127.0.0.1" +use_spu_local_address = false + +[cluster.updated.tls] +tls_policy = "disabled" + +[cluster.updated.metadata.installation] +type = "local" + +[cluster.extra] +endpoint = "127.0.0.1:9003" +use_spu_local_address = false + +[cluster.extra.tls] +tls_policy = "disabled" -# default for all topics, this has lowest precedent -[topic."*"] -isolation = "uncommitted" -fetch_max_bytes = 10000 +[cluster.extra.metadata.deep.nesting.example] +key = "custom field" -# apply for all partition of test3 -[topic.test3] -isolation = "read_committed" +[cluster.extra.metadata.installation] +type = "local" +[cluster.local] +endpoint = "127.0.0.1:9003" +use_spu_local_address = false + +[cluster.local.tls] +tls_policy = "disabled" + +[cluster.ec2] +endpoint = "sandbox.xxxx.eksctl.io:9003" +use_spu_local_address = false -# only for topic test=3,replication=2 -[topic.test4.2] -isolation = "uncommitted" +[cluster.ec2.tls] +tls_policy = "disabled"