Skip to content

Commit

Permalink
feat: add support for cluster metadata (#3628)
Browse files Browse the repository at this point in the history
Add support for custom metadata in the cluster configuration.

```toml
[cluster.my_favorite_cluster]
endpoint = "127.0.0.1:9003"

[cluster.my_favorite_cluster.metadata.installation]
"fluvio.io/cluster-type" = "local-k8"
```

This will be important for other tasks, such as #3620.

These changes are retrocompatible.

Closes #3627
  • Loading branch information
matheus-consoli committed Nov 3, 2023
1 parent 4d6b2fe commit b0f846d
Show file tree
Hide file tree
Showing 5 changed files with 427 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/fluvio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio"
version = "0.21.1"
version = "0.21.2"
edition = "2021"
license = "Apache-2.0"
authors = ["Fluvio Contributors <[email protected]>"]
Expand Down
326 changes: 325 additions & 1 deletion crates/fluvio/src/config/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::ConfigFile;

/// Fluvio Cluster Target Configuration
/// This is part of profile
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct FluvioConfig {
/// The address to connect to the Fluvio cluster
Expand All @@ -29,6 +29,9 @@ pub struct FluvioConfig {
#[serde(default)]
pub tls: TlsPolicy,

/// Cluster custom metadata
pub metadata: Option<toml::Value>,

/// This is not part of profile and doesn't persist.
/// It is purely to override client id when creating ClientConfig
#[serde(skip)]
Expand All @@ -49,6 +52,7 @@ impl FluvioConfig {
endpoint: addr.into(),
use_spu_local_address: false,
tls: TlsPolicy::Disabled,
metadata: None,
client_id: None,
}
}
Expand All @@ -58,6 +62,74 @@ impl FluvioConfig {
self.tls = tls.into();
self
}

pub fn query_metadata_path<'de, T>(&self, path: &str) -> Option<T>
where
T: Deserialize<'de>,
{
let mut metadata = self.metadata.as_ref().unwrap();

let (path, key) = {
let mut split = path.split(&['[', ']']);
(split.next().unwrap_or(path), split.next())
};

for part in path.split('.') {
if let toml::Value::Table(table) = metadata {
metadata = table.get(part)?;
}
}

if let Some(key) = key {
metadata = metadata.as_table()?.get(key)?;
}

T::deserialize(metadata.clone()).ok()
}

pub fn update_metadata_path<S>(&mut self, path: &str, data: S) -> anyhow::Result<()>
where
S: Serialize,
{
use toml::{Value, map::Map};

let (path, key) = {
let mut split = path.split(&['[', ']']);
(split.next().unwrap_or(path), split.next())
};

if let Some(mut metadata) = self.metadata.as_mut() {
for part in path.split('.') {
let Value::Table(table) = metadata else {
break;
};
let nested = table
.entry(part)
.or_insert_with(|| Value::Table(Map::new()));
metadata = nested;
}

if let Some(key) = key {
metadata = metadata
.as_table_mut()
.expect("metadata should be a table at this point")
.get_mut(key)
.ok_or_else(|| anyhow::anyhow!("key does not exist"))?;
}

*metadata = Value::try_from(data)?;
} else {
// insert new metadata
if path.contains(|c| c == '.' || c == '[') {
return Err(anyhow::anyhow!("not supported"));
}

let table = Map::from_iter([(path.to_string(), Value::try_from(data)?)]).into();
self.metadata = Some(table);
}

Ok(())
}
}

impl TryFrom<FluvioConfig> for fluvio_socket::ClientConfig {
Expand All @@ -71,3 +143,255 @@ impl TryFrom<FluvioConfig> for fluvio_socket::ClientConfig {
))
}
}

#[cfg(test)]
mod test_metadata {
use serde::{Deserialize, Serialize};
use crate::config::Config;

#[test]
fn test_get_metadata_path() {
let toml = r#"version = "2"
[profile.local]
cluster = "name"
[cluster.name]
endpoint = "127.0.0.1:9003"
[cluster.name.metadata.custom]
name = "foo"
"#;
let profile: Config = toml::de::from_str(toml).unwrap();
let config = profile.cluster("name").unwrap();

#[derive(Deserialize, Debug, PartialEq)]
struct Custom {
name: String,
}

let custom: Option<Custom> = config.query_metadata_path("custom");

assert_eq!(
custom,
Some(Custom {
name: "foo".to_owned()
})
);
}

#[test]
fn test_query_specific_field_of_metadata() {
let toml = r#"version = "2"
[profile.local]
cluster = "name"
[cluster.name]
endpoint = "127.0.0.1:9003"
[cluster.name.metadata.deep.nested]
name = "foo"
"#;
let profile: Config = toml::de::from_str(toml).unwrap();
let config = profile.cluster("name").unwrap();

let custom: Option<String> = config.query_metadata_path("deep.nested[name]");

assert_eq!(custom, Some("foo".to_owned()));
}

#[test]
fn test_create_metadata() {
let toml = r#"version = "2"
[profile.local]
cluster = "name"
[cluster.name]
endpoint = "127.0.0.1:9003"
"#;
let mut profile: Config = toml::de::from_str(toml).unwrap();
let config = profile.cluster_mut("name").unwrap();

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Preference {
connection: String,
}

let preference = Preference {
connection: "wired".to_owned(),
};

config
.update_metadata_path("preference", preference.clone())
.expect("failed to add metadata");

let metadata = config.query_metadata_path("preference").expect("");

assert_eq!(preference, metadata);
}

#[test]
fn test_update_old_metadata() {
let toml = r#"version = "2"
[profile.local]
cluster = "name"
[cluster.name]
endpoint = "127.0.0.1:9003"
[cluster.name.metadata.installation]
type = "local"
"#;
let mut profile: Config = toml::de::from_str(toml).unwrap();
let config = profile.cluster_mut("name").unwrap();

#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct Installation {
#[serde(rename = "type")]
typ: String,
}

let mut install = config
.query_metadata_path::<Installation>("installation")
.expect("message");

assert_eq!(
install,
Installation {
typ: "local".to_owned()
}
);

install.typ = "cloud".to_owned();

config
.update_metadata_path("installation", install)
.expect("failed to add metadata");

let metadata = config
.query_metadata_path::<Installation>("installation")
.expect("");

assert_eq!("cloud", metadata.typ);
}

#[test]
fn test_update_with_new_metadata() {
let toml = r#"version = "2"
[profile.local]
cluster = "name"
[cluster.name]
endpoint = "127.0.0.1:9003"
[cluster.name.metadata.installation]
type = "local"
"#;
let mut profile: Config = toml::de::from_str(toml).unwrap();
let config = profile.cluster_mut("name").unwrap();

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Preference {
connection: String,
}

let preference = Preference {
connection: "wired".to_owned(),
};

config
.update_metadata_path("preference", preference.clone())
.expect("failed to add metadata");

let installation_type: String = config
.query_metadata_path("installation.type")
.expect("could not get installation metadata");
assert_eq!(installation_type, "local");

let preference_connection: String = config
.query_metadata_path("preference.connection")
.expect("could not get preference metadata");
assert_eq!(preference_connection, "wired");
}

#[test]
fn test_update_specific_field() {
let toml = r#"version = "2"
[profile.local]
cluster = "name"
[cluster.name]
endpoint = "127.0.0.1:9003"
[cluster.name.metadata.installation]
type = "local"
"#;
let mut profile: Config = toml::de::from_str(toml).unwrap();
let config = profile.cluster_mut("name").unwrap();

config
.update_metadata_path("installation[type]", "cloud")
.expect("could not find installation type field");

let installation_type: String = config
.query_metadata_path("installation[type]")
.expect("could not find installation type field");
assert_eq!(installation_type, "cloud");
}

#[test]
fn test_create_dynamic_nested_field_errors() {
let toml = r#"version = "2"
[profile.local]
cluster = "name"
[cluster.name]
endpoint = "127.0.0.1:9003"
"#;
let mut profile: Config = toml::de::from_str(toml).unwrap();
let config = profile.cluster_mut("name").unwrap();

let update = config.update_metadata_path("deep.nested[field]", "value");

assert!(update.is_err());
}

#[test]
fn test_update_partial_existant_path_errors() {
let toml = r#"version = "2"
[profile.local]
cluster = "name"
[cluster.name]
endpoint = "127.0.0.1:9003"
[cluster.name.deep.nested]
key = "value"
"#;
let mut profile: Config = toml::de::from_str(toml).unwrap();
let config = profile.cluster_mut("name").unwrap();

let update = config.update_metadata_path("deep.nonexistent[key]", "value");

assert!(update.is_err());
}

#[test]
fn test_update_path_with_no_key_errors() {
let toml = r#"version = "2"
[profile.local]
cluster = "name"
[cluster.name]
endpoint = "127.0.0.1:9003"
[cluster.name.deep.nested]
key = "value"
"#;
let mut profile: Config = toml::de::from_str(toml).unwrap();
let config = profile.cluster_mut("name").unwrap();

let update = config.update_metadata_path("deep.nested[nonexistent]", "value");

assert!(update.is_err());
}
}
Loading

0 comments on commit b0f846d

Please sign in to comment.