Skip to content

Commit

Permalink
feat: add support for custom metadata in cluster configuration (#3667)
Browse files Browse the repository at this point in the history
* Add support for cluster annotations

* Add metadata to cluster

* Bump crate version

* Add cluster metadata path methods

* Simplify metadata implementation
  • Loading branch information
matheus-consoli authored Nov 9, 2023
1 parent 087119e commit f4b9622
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 41 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/fluvio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio"
version = "0.21.1"
version = "0.21.2"
edition = "2021"
license = "Apache-2.0"
authors = ["Fluvio Contributors <[email protected]>"]
Expand Down Expand Up @@ -44,7 +44,7 @@ pin-project = { workspace = true }
siphasher = { workspace = true }


toml = { workspace = true, features = ["display"] }
toml = { workspace = true, features = ["display", "preserve_order"] }
tracing = { workspace = true }

# Fluvio dependencies
Expand Down
277 changes: 276 additions & 1 deletion crates/fluvio/src/config/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
//! Stores configuration parameter retrieved from the default or custom profile file.
//!
use serde::{Serialize, Deserialize};
use toml::Table as Metadata;

use crate::{config::TlsPolicy, FluvioError};

use super::ConfigFile;

/// Fluvio Cluster Target Configuration
/// This is part of profile
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct FluvioConfig {
/// The address to connect to the Fluvio cluster
Expand All @@ -29,6 +30,10 @@ pub struct FluvioConfig {
#[serde(default)]
pub tls: TlsPolicy,

/// Cluster custom metadata
#[serde(default = "Metadata::new", skip_serializing_if = "Metadata::is_empty")]
metadata: Metadata,

/// This is not part of profile and doesn't persist.
/// It is purely to override client id when creating ClientConfig
#[serde(skip)]
Expand All @@ -49,6 +54,7 @@ impl FluvioConfig {
endpoint: addr.into(),
use_spu_local_address: false,
tls: TlsPolicy::Disabled,
metadata: Metadata::new(),
client_id: None,
}
}
Expand All @@ -58,6 +64,33 @@ impl FluvioConfig {
self.tls = tls.into();
self
}

pub fn query_metadata_by_name<'de, T>(&self, name: &str) -> Option<T>
where
T: Deserialize<'de>,
{
let metadata = self.metadata.get(name)?;

T::deserialize(metadata.clone()).ok()
}

pub fn update_metadata_by_name<S>(&mut self, name: &str, data: S) -> anyhow::Result<()>
where
S: Serialize,
{
use toml::{Value, map::Entry};

match self.metadata.entry(name) {
Entry::Vacant(entry) => {
entry.insert(Value::try_from(data)?);
}
Entry::Occupied(mut entry) => {
*entry.get_mut() = Value::try_from(data)?;
}
}

Ok(())
}
}

impl TryFrom<FluvioConfig> for fluvio_socket::ClientConfig {
Expand All @@ -71,3 +104,245 @@ impl TryFrom<FluvioConfig> for fluvio_socket::ClientConfig {
))
}
}

#[cfg(test)]
mod test_metadata {
use serde::{Deserialize, Serialize};
use crate::config::{Config, ConfigFile};

#[test]
fn test_get_metadata_path() {
let toml = r#"version = "2"
[profile.local]
cluster = "local"
[cluster.local]
endpoint = "127.0.0.1:9003"
[cluster.local.metadata.custom]
name = "foo"
"#;
let profile: Config = toml::de::from_str(toml).unwrap();
let config = profile.cluster("local").unwrap();

#[derive(Deserialize, Debug, PartialEq)]
struct Custom {
name: String,
}

let custom: Option<Custom> = config.query_metadata_by_name("custom");

assert_eq!(
custom,
Some(Custom {
name: "foo".to_owned()
})
);
}

#[test]
fn test_create_metadata() {
let toml = r#"version = "2"
[profile.local]
cluster = "local"
[cluster.local]
endpoint = "127.0.0.1:9003"
"#;
let mut profile: Config = toml::de::from_str(toml).unwrap();
let config = profile.cluster_mut("local").unwrap();

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Preference {
connection: String,
}

let preference = Preference {
connection: "wired".to_owned(),
};

config
.update_metadata_by_name("preference", preference.clone())
.expect("failed to add metadata");

let metadata = config.query_metadata_by_name("preference").expect("");

assert_eq!(preference, metadata);
}

#[test]
fn test_update_old_metadata() {
let toml = r#"version = "2"
[profile.local]
cluster = "local"
[cluster.local]
endpoint = "127.0.0.1:9003"
[cluster.local.metadata.installation]
type = "local"
"#;
let mut profile: Config = toml::de::from_str(toml).unwrap();
let config = profile.cluster_mut("local").unwrap();

#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct Installation {
#[serde(rename = "type")]
typ: String,
}

let mut install = config
.query_metadata_by_name::<Installation>("installation")
.expect("message");

assert_eq!(
install,
Installation {
typ: "local".to_owned()
}
);

install.typ = "cloud".to_owned();

config
.update_metadata_by_name("installation", install)
.expect("failed to add metadata");

let metadata = config
.query_metadata_by_name::<Installation>("installation")
.expect("could not get Installation metadata");

assert_eq!("cloud", metadata.typ);
}

#[test]
fn test_update_with_new_metadata() {
let toml = r#"version = "2"
[profile.local]
cluster = "local"
[cluster.local]
endpoint = "127.0.0.1:9003"
[cluster.local.metadata.installation]
type = "local"
"#;
let mut profile: Config = toml::de::from_str(toml).unwrap();
let config = profile.cluster_mut("local").unwrap();

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Preference {
connection: String,
}

let preference = Preference {
connection: "wired".to_owned(),
};

config
.update_metadata_by_name("preference", preference.clone())
.expect("failed to add metadata");

#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct Installation {
#[serde(rename = "type")]
typ: String,
}

let installation: Installation = config
.query_metadata_by_name("installation")
.expect("could not get installation metadata");
assert_eq!(installation.typ, "local");

let preference: Preference = config
.query_metadata_by_name("preference")
.expect("could not get preference metadata");
assert_eq!(preference.connection, "wired");
}

#[test]
fn test_profile_with_metadata() {
let config_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned()))
.expect("could not parse config file");
let config = config_file.config();

let cluster = config
.cluster("extra")
.expect("could not find `extra` cluster in test file");

let table = toml::toml! {
[deep.nesting.example]
key = "custom field"

[installation]
type = "local"
}
.into();

assert_eq!(cluster.metadata, table);
}

#[test]
fn test_save_updated_metadata() {
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Installation {
#[serde(rename = "type")]
typ: String,
}

let mut config_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned()))
.expect("could not parse config file");
let config = config_file.mut_config();

let cluster = config
.cluster_mut("updated")
.expect("could not find `updated` cluster in test file");

let table: toml::Table = toml::toml! {
[installation]
type = "local"
}
.into();
assert_eq!(cluster.metadata, table);

cluster
.update_metadata_by_name(
"installation",
Installation {
typ: "cloud".to_owned(),
},
)
.expect("should have updated key");

let updated_table: toml::Table = toml::toml! {
[installation]
type = "cloud"
}
.into();

assert_eq!(cluster.metadata, updated_table.clone());

config_file.save().expect("failed to save config file");

let mut config_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned()))
.expect("could not parse config file");
let config = config_file.mut_config();
let cluster = config
.cluster_mut("updated")
.expect("could not find `updated` cluster in test file");
assert_eq!(cluster.metadata, updated_table);

cluster
.update_metadata_by_name(
"installation",
Installation {
typ: "local".to_owned(),
},
)
.expect("teardown: failed to set installation type back to local");

config_file
.save()
.expect("teardown: failed to set installation type back to local");
}
}
14 changes: 3 additions & 11 deletions crates/fluvio/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl ConfigFile {
&mut self.config
}

// save to file
/// Save to file
pub fn save(&self) -> Result<(), FluvioError> {
create_dir_all(self.path.parent().unwrap())
.map_err(|e| config_file_error(&format!("parent {:?}", self.path), e))?;
Expand Down Expand Up @@ -191,7 +191,7 @@ impl ConfigFile {
pub const LOCAL_PROFILE: &str = "local";
const CONFIG_VERSION: &str = "2.0";

#[derive(Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Config {
version: String,
current_profile: Option<String>,
Expand Down Expand Up @@ -250,7 +250,7 @@ impl Config {

/// current profile
pub fn current_profile_name(&self) -> Option<&str> {
self.current_profile.as_ref().map(|c| c.as_ref())
self.current_profile.as_deref()
}

/// set current profile, if profile doesn't exists return false
Expand Down Expand Up @@ -559,14 +559,6 @@ pub mod test {
);
}

/*
#[test]
fn test_topic_config() {
let conf_file = ConfigFile::load(Some("test-data/profiles/config.toml".to_owned())).expect("parse failed");
let config = conf_file.config().resolve_replica_config("test3",0);
}
*/

#[test]
fn test_local_cluster() {
let config = Config::new_with_local_cluster("localhost:9003".to_owned());
Expand Down
Loading

0 comments on commit f4b9622

Please sign in to comment.