From 106894b4640b23c7854002e31b0bf04c250ccf01 Mon Sep 17 00:00:00 2001 From: Angelin01 Date: Sat, 4 May 2024 10:20:00 -0300 Subject: [PATCH 1/3] Add tolerations to config --- src/config.rs | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/src/config.rs b/src/config.rs index c22fb36..2f75b2f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,6 +6,7 @@ use anyhow::{Error, Result}; use axum_server::tls_rustls::RustlsConfig; use figment::{error, Figment, providers::{Env, Format, Yaml}}; use figment::providers::Serialized; +use k8s_openapi::api::core::v1::Toleration; use serde::{Deserialize, Serialize}; use crate::error::ConfigError; @@ -87,7 +88,7 @@ impl Default for ServerConfig { pub struct GroupConfig { pub node_selector: Option>, pub affinity: Option>, - pub tolerations: Option>, + pub tolerations: Option>, #[serde(default)] pub on_conflict: Conflict, } @@ -98,6 +99,7 @@ mod tests { use figment::Jail; use indoc::indoc; + use k8s_openapi::api::core::v1::Toleration; use super::{Config, Conflict, DEFAULT_CONFIG_FILE, ENV_CONFIG_FILE, GroupConfig}; @@ -112,12 +114,20 @@ mod tests { b: "2" c: "3" bar: - tolerations: ["1", "2"] + tolerations: + - key: foo + operator: Equals + value: bar + effect: NoSchedule bazz: affinity: [] all: nodeSelector: {"a": "1", "b": "2", "c": "3"} - tolerations: ["1", "2"] + tolerations: + - key: foo + operator: Equals + value: bar + effect: NoSchedule affinity: [] onConflict: Override "# })?; @@ -138,7 +148,14 @@ mod tests { groups.insert("bar".into(), GroupConfig { node_selector: None, affinity: None, - tolerations: Some(vec!["1".into(), "2".into()]), + tolerations: Some(vec![Toleration { + effect: Some("NoSchedule".into()), + key: Some("foo".into()), + operator: Some("Equals".into()), + toleration_seconds: None, + value: Some("bar".into()), + } + ]), on_conflict: Default::default(), }); groups.insert("bazz".into(), GroupConfig { @@ -154,7 +171,14 @@ mod tests { ("c".into(), "3".into()), ])), affinity: Some(vec![]), - tolerations: Some(vec!["1".into(), "2".into()]), + tolerations: Some(vec![Toleration { + effect: Some("NoSchedule".into()), + key: Some("foo".into()), + operator: Some("Equals".into()), + toleration_seconds: None, + value: Some("bar".into()), + } + ]), on_conflict: Conflict::Override, }); From 826d11e1907cb9fd47660c54d3d7a232af29b612 Mon Sep 17 00:00:00 2001 From: Angelin01 Date: Sat, 4 May 2024 11:05:42 -0300 Subject: [PATCH 2/3] Implement support for tolerations --- src/handler/mutate.rs | 219 +++++++++++++++++++++++++++++++++++++++++- src/test_utils/pod.rs | 25 ++--- src/utils/patch.rs | 33 ++++++- 3 files changed, 256 insertions(+), 21 deletions(-) diff --git a/src/handler/mutate.rs b/src/handler/mutate.rs index 82e9d07..83f512e 100644 --- a/src/handler/mutate.rs +++ b/src/handler/mutate.rs @@ -35,9 +35,11 @@ pub async fn mutate( }; let mut patches = Vec::new(); + let pod = request.object.as_ref().expect("Request object is missing"); + if let Some(node_selector_config) = &group_config.node_selector { let node_selector_patches = patch::calculate_node_selector_patches( - request.object.as_ref().expect("Request object is missing"), + pod, node_selector_config, &group_config.on_conflict, ); @@ -53,6 +55,11 @@ pub async fn mutate( } } + if let Some(tolerations_config) = &group_config.tolerations { + let toleration_patches = patch::calculate_toleration_patches(pod, tolerations_config); + patches.extend(toleration_patches); + } + Ok(Json( AdmissionResponse::from(&request) .with_patch(json_patch::Patch(patches)) @@ -68,6 +75,7 @@ mod tests { use axum::body::Body; use axum::http::{Request, StatusCode}; use axum::response::Response; + use k8s_openapi::api::core::v1::Toleration; use serde_json::json; use tower::ServiceExt; @@ -413,4 +421,213 @@ mod tests { let result = ParsedResponse::from_response(response).await; assert_eq!(result.status, StatusCode::INTERNAL_SERVER_ERROR); } + + #[tokio::test] + async fn when_pod_has_no_tolerations_should_insert_tolerations_and_pd_tolerations() { + let mut config = Config::default(); + let group_config = GroupConfig { + node_selector: None, + affinity: None, + tolerations: Some(vec![Toleration { + key: Some("some-key".into()), + value: Some("some-value".into()), + operator: Some("Equals".into()), + effect: Some("NoSchedule".into()), + toleration_seconds: None, + }]), + on_conflict: Conflict::Reject, + }; + config.groups = HashMap::from([ + ("bar".into(), group_config) + ]); + + let mut state = TestAppState::new(config); + state.kubernetes.set_namespace_group("foo", "bar"); + + let body = PodCreateRequestBuilder::new() + .with_namespace("foo") + .build(); + + let response = mutate_request(state, body).await; + let result = ParsedResponse::from_response(response).await; + assert_eq!(result.status, StatusCode::OK); + assert_eq!(result.admission_response.allowed, true); + + let expected_patches = vec![ + patch::add("/spec/tolerations".into(), json!([])), + patch::add("/spec/tolerations/-".into(), json!({ + "key": "some-key", + "value": "some-value", + "operator": "Equals", + "effect": "NoSchedule" + })), + ]; + + assert_eq!(result.patches, expected_patches); + } + + #[tokio::test] + async fn when_pod_has_existing_tolerations_not_matching_config_should_only_insert_pd_tolerations() { + let mut config = Config::default(); + let group_config = GroupConfig { + node_selector: None, + affinity: None, + tolerations: Some(vec![Toleration { + key: Some("some-key".into()), + value: Some("some-value".into()), + operator: Some("Equals".into()), + effect: Some("NoSchedule".into()), + toleration_seconds: None, + }]), + on_conflict: Conflict::Reject, + }; + config.groups = HashMap::from([ + ("bar".into(), group_config) + ]); + + let mut state = TestAppState::new(config); + state.kubernetes.set_namespace_group("foo", "bar"); + + let body = PodCreateRequestBuilder::new() + .with_namespace("foo") + .with_toleration(Toleration { + effect: Some("NoExecute".into()), + key: Some("other".into()), + operator: Some("Exists".into()), + toleration_seconds: None, + value: None, + }) + .build(); + + let response = mutate_request(state, body).await; + let result = ParsedResponse::from_response(response).await; + assert_eq!(result.status, StatusCode::OK); + assert_eq!(result.admission_response.allowed, true); + + let expected_patches = vec![ + patch::add("/spec/tolerations/-".into(), json!({ + "key": "some-key", + "value": "some-value", + "operator": "Equals", + "effect": "NoSchedule" + })), + ]; + + assert_eq!(result.patches, expected_patches); + } + + #[tokio::test] + async fn when_pod_has_existing_tolerations_with_some_matching_config_should_only_insert_necessary_tolerations() { + let mut config = Config::default(); + let group_config = GroupConfig { + node_selector: None, + affinity: None, + tolerations: Some(vec![ + Toleration { + key: Some("some-key".into()), + value: Some("some-value".into()), + operator: Some("Equals".into()), + effect: Some("NoSchedule".into()), + toleration_seconds: None, + }, + Toleration { + effect: Some("NoExecute".into()), + key: Some("other".into()), + operator: Some("Exists".into()), + toleration_seconds: None, + value: None, + }, + ]), + on_conflict: Conflict::Reject, + }; + config.groups = HashMap::from([ + ("bar".into(), group_config) + ]); + + let mut state = TestAppState::new(config); + state.kubernetes.set_namespace_group("foo", "bar"); + + let body = PodCreateRequestBuilder::new() + .with_namespace("foo") + .with_toleration(Toleration { + effect: Some("NoExecute".into()), + key: Some("other".into()), + operator: Some("Exists".into()), + toleration_seconds: None, + value: None, + }) + .build(); + + let response = mutate_request(state, body).await; + let result = ParsedResponse::from_response(response).await; + assert_eq!(result.status, StatusCode::OK); + assert_eq!(result.admission_response.allowed, true); + + let expected_patches = vec![ + patch::add("/spec/tolerations/-".into(), json!({ + "key": "some-key", + "value": "some-value", + "operator": "Equals", + "effect": "NoSchedule" + })), + ]; + + assert_eq!(result.patches, expected_patches); + } + + #[tokio::test] + async fn when_pod_has_existing_tolerations_with_perfect_matching_config_should_should_do_nothing() { + let mut config = Config::default(); + let group_config = GroupConfig { + node_selector: None, + affinity: None, + tolerations: Some(vec![ + Toleration { + key: Some("some-key".into()), + value: Some("some-value".into()), + operator: Some("Equals".into()), + effect: Some("NoSchedule".into()), + toleration_seconds: None, + }, + Toleration { + effect: Some("NoExecute".into()), + key: Some("other".into()), + operator: Some("Exists".into()), + toleration_seconds: None, + value: None, + }, + ]), + on_conflict: Conflict::Reject, + }; + config.groups = HashMap::from([ + ("bar".into(), group_config) + ]); + + let mut state = TestAppState::new(config); + state.kubernetes.set_namespace_group("foo", "bar"); + + let body = PodCreateRequestBuilder::new() + .with_namespace("foo") + .with_toleration(Toleration { + effect: Some("NoExecute".into()), + key: Some("other".into()), + operator: Some("Exists".into()), + toleration_seconds: None, + value: None, + }). + with_toleration(Toleration { + key: Some("some-key".into()), + value: Some("some-value".into()), + operator: Some("Equals".into()), + effect: Some("NoSchedule".into()), + toleration_seconds: None, + }) + .build(); + + let response = mutate_request(state, body).await; + let result = ParsedResponse::from_response(response).await; + assert_eq!(result.status, StatusCode::OK); + assert_eq!(result.admission_response.allowed, true); + assert!(result.patches.is_empty()); + } } diff --git a/src/test_utils/pod.rs b/src/test_utils/pod.rs index 8018ba1..b0bb820 100644 --- a/src/test_utils/pod.rs +++ b/src/test_utils/pod.rs @@ -1,15 +1,17 @@ use std::collections::BTreeMap; use axum::body::Body; +use k8s_openapi::api::core::v1::Toleration; use serde_json::json; pub struct PodCreateRequestBuilder { namespace: Option, node_selector: Option>, + tolerations: Option> } impl PodCreateRequestBuilder { pub fn new() -> Self { - Self { namespace: None, node_selector: None } + Self { namespace: None, node_selector: None, tolerations: None } } pub fn with_namespace>(mut self, namespace: S) -> Self { @@ -23,6 +25,12 @@ impl PodCreateRequestBuilder { self } + pub fn with_toleration(mut self, toleration: Toleration) -> Self { + self.tolerations.get_or_insert_with(Vec::new) + .push(toleration); + self + } + pub fn build(self) -> Body { let data = json!({ "apiVersion": "admission.k8s.io/v1", @@ -103,20 +111,7 @@ impl PodCreateRequestBuilder { "serviceAccount": "default", "serviceAccountName": "default", "terminationGracePeriodSeconds": 30, - "tolerations": [ - { - "effect": "NoExecute", - "key": "node.kubernetes.io/not-ready", - "operator": "Exists", - "tolerationSeconds": 300 - }, - { - "effect": "NoExecute", - "key": "node.kubernetes.io/unreachable", - "operator": "Exists", - "tolerationSeconds": 300 - } - ], + "tolerations": self.tolerations, "volumes": [] }, "status": {} diff --git a/src/utils/patch.rs b/src/utils/patch.rs index ad3f3ca..346a462 100644 --- a/src/utils/patch.rs +++ b/src/utils/patch.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use json_patch::PatchOperation; -use k8s_openapi::api::core::v1::Pod; +use k8s_openapi::api::core::v1::{Pod, Toleration}; use serde_json::{json, Value}; use crate::config::Conflict; @@ -37,10 +37,10 @@ pub fn calculate_node_selector_patches<'a>( let mut patches = Vec::new(); let maybe_node_selector = pod.spec.as_ref() - .map_or(None, |spec| spec.node_selector.as_ref()); + .and_then(|s| s.node_selector.as_ref()); if let Some(node_selector) = maybe_node_selector { - for (k, v) in node_selector_config.iter() { + for (k, v) in node_selector_config { match node_selector.get(k) { None => patches.push(add(format!("/spec/nodeSelector/{k}"), json!(v))), Some(existing_value) if existing_value == v => continue, @@ -59,10 +59,33 @@ pub fn calculate_node_selector_patches<'a>( } } else { patches.push(add("/spec/nodeSelector".into(), json!({}))); - node_selector_config.iter().for_each(|(k, v)| { + for (k, v) in node_selector_config { patches.push(add(format!("/spec/nodeSelector/{k}"), json!(v))); - }); + }; } PatchResult::Allow(patches) } + +pub fn calculate_toleration_patches(pod: &Pod, tolerations_config: &[Toleration]) -> Vec { + let mut patches = Vec::new(); + + let maybe_tolerations = pod.spec.as_ref() + .and_then(|s| s.tolerations.as_ref()); + + if let Some(tolerations) = maybe_tolerations { + tolerations_config.iter() + .filter(|t| !tolerations.contains(t)) + .for_each(|t| patches.push( + add("/spec/tolerations/-".into(), json!(t)) + )); + } + else { + patches.push(add("/spec/tolerations".into(), json!([]))); + for t in tolerations_config { + patches.push(add("/spec/tolerations/-".into(), json!(t))) + } + } + + patches +} From 149775f5cfccac0215e29a12dcc8e225b11af70a Mon Sep 17 00:00:00 2001 From: Angelin01 Date: Sat, 4 May 2024 11:08:37 -0300 Subject: [PATCH 3/3] Extract pod spec logic outside patch logic --- src/handler/mutate.rs | 10 +++++++--- src/utils/patch.rs | 12 +++++------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/handler/mutate.rs b/src/handler/mutate.rs index 83f512e..2edbd80 100644 --- a/src/handler/mutate.rs +++ b/src/handler/mutate.rs @@ -34,12 +34,16 @@ pub async fn mutate( }), }; + let pod_spec = request.object.as_ref() + .expect("Request object is missing") + .spec.as_ref() + .expect("Pod spec is missing"); + let mut patches = Vec::new(); - let pod = request.object.as_ref().expect("Request object is missing"); if let Some(node_selector_config) = &group_config.node_selector { let node_selector_patches = patch::calculate_node_selector_patches( - pod, + pod_spec, node_selector_config, &group_config.on_conflict, ); @@ -56,7 +60,7 @@ pub async fn mutate( } if let Some(tolerations_config) = &group_config.tolerations { - let toleration_patches = patch::calculate_toleration_patches(pod, tolerations_config); + let toleration_patches = patch::calculate_toleration_patches(pod_spec, tolerations_config); patches.extend(toleration_patches); } diff --git a/src/utils/patch.rs b/src/utils/patch.rs index 346a462..898963a 100644 --- a/src/utils/patch.rs +++ b/src/utils/patch.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use json_patch::PatchOperation; -use k8s_openapi::api::core::v1::{Pod, Toleration}; +use k8s_openapi::api::core::v1::{PodSpec, Toleration}; use serde_json::{json, Value}; use crate::config::Conflict; @@ -30,14 +30,13 @@ pub enum PatchResult<'a> { } pub fn calculate_node_selector_patches<'a>( - pod: &'a Pod, + pod_spec: &'a PodSpec, node_selector_config: &'a HashMap, conflict_config: &'a Conflict, ) -> PatchResult<'a> { let mut patches = Vec::new(); - let maybe_node_selector = pod.spec.as_ref() - .and_then(|s| s.node_selector.as_ref()); + let maybe_node_selector = pod_spec.node_selector.as_ref(); if let Some(node_selector) = maybe_node_selector { for (k, v) in node_selector_config { @@ -67,11 +66,10 @@ pub fn calculate_node_selector_patches<'a>( PatchResult::Allow(patches) } -pub fn calculate_toleration_patches(pod: &Pod, tolerations_config: &[Toleration]) -> Vec { +pub fn calculate_toleration_patches(pod_spec: &PodSpec, tolerations_config: &[Toleration]) -> Vec { let mut patches = Vec::new(); - let maybe_tolerations = pod.spec.as_ref() - .and_then(|s| s.tolerations.as_ref()); + let maybe_tolerations = pod_spec.tolerations.as_ref(); if let Some(tolerations) = maybe_tolerations { tolerations_config.iter()