From 5b3271824432216a1a3c7b51063b3e19f3f5ee8f Mon Sep 17 00:00:00 2001 From: Carl Hedgren Date: Thu, 13 Jul 2023 17:20:59 +0200 Subject: [PATCH] Refactor Action datatypes Plus some aggressive clippy ` cargo clippy -- -W clippy::pedantic -W clippy::nursery -W clippy::unwrap_used -W clippy::expect_used` Co-Authored-By: x10an14-nav --- src/actions.rs | 93 +++++++++-------------------- src/api.rs | 149 +++++++++++++++++++++++----------------------- src/main.rs | 15 +++-- src/pod.rs | 67 +++++++++------------ src/prometheus.rs | 9 ++- src/reconciler.rs | 90 ++++++++++------------------ 6 files changed, 178 insertions(+), 245 deletions(-) diff --git a/src/actions.rs b/src/actions.rs index 2f611b1..ec3f267 100644 --- a/src/actions.rs +++ b/src/actions.rs @@ -1,73 +1,36 @@ -use std::collections::BTreeMap; - +use hyper::http::Method; +use hyper::Uri; +use std::{collections::BTreeMap}; /// Generate the action `BTreeMap` /// /// Modify this function to add or remove sidecar definitions and their associated shutdown procedures. pub fn generate() -> BTreeMap { - let mut actions = BTreeMap::new(); - - actions.exec("cloudsql-proxy", "kill -s INT 1"); - actions.exec("vks-sidecar", "/bin/kill -s INT 1"); - actions.exec("secure-logs-configmap-reload", "/bin/killall configmap-reload"); - actions.portforward("linkerd-proxy", "POST", "/shutdown", 4191); - actions.portforward("secure-logs-fluentd", "GET", "/api/processes.killWorkers", 24444); - actions + BTreeMap::from([ + ( + "cloudsql-proxy".into(), + Action::Exec("kill -s INT 1".split(' ').map(String::from).collect()), + ), + ( + "vks-sidecar".into(), + Action::Exec("/bin/kill -s INT 1".split(' ').map(String::from).collect()), + ), + ( + "secure-logs-configmap-reload".into(), + Action::Exec("/bin/killall configmap-reload".split(' ').map(String::from).collect()), + ), + ( + "linkerd-proxy".into(), + Action::Portforward(Method::POST, "/shutdown".parse::().unwrap(), 4191), + ), + ( + "secure-logs-fluentd".into(), + Action::Portforward(Method::GET, "/api/processes.killWorkers".parse::().unwrap(), 24444), + ), + ]) } #[derive(Debug)] -pub enum ActionType { - Portforward, - Exec, - None, -} - -impl Default for ActionType { - fn default() -> Self { - ActionType::None - } -} - -#[derive(Default, Debug)] -pub struct Action { - pub action_type: ActionType, - pub method: Option, - pub path: Option, - pub port: Option, - pub command: Option, -} - -/// Helper trait for inserting different `Action`s with different `ActionType`s into a `BTreeMap` -/// -/// Using this trait will allow us to catch simple misconfigurations in Actions during compile time. -trait ActionInsertions { - /// Inserts an action with `ActionType::Exec` into a `BTreeMap` - fn exec(&mut self, target_container: &str, command: &str); - /// Inserts an action with `ActionType::Portforward` into a `BTreeMap` - fn portforward(&mut self, target_container: &str, method: &str, path: &str, port: u16); -} - -impl ActionInsertions for BTreeMap { - fn exec(&mut self, target_container: &str, command: &str) { - self.insert( - target_container.into(), - Action { - action_type: ActionType::Exec, - command: Some(command.into()), - ..Default::default() - }, - ); - } - - fn portforward(&mut self, target_container: &str, method: &str, path: &str, port: u16) { - self.insert( - target_container.into(), - Action { - action_type: ActionType::Portforward, - method: Some(method.into()), - path: Some(path.into()), - port: Some(port.into()), - ..Default::default() - }, - ); - } +pub enum Action { + Portforward(Method, Uri, u16), + Exec(Vec), } diff --git a/src/api.rs b/src/api.rs index dc8f5ea..be1131c 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,10 +1,11 @@ #[cfg(test)] use mockall::automock; -use crate::actions::{Action, ActionType}; +use crate::actions::Action; use anyhow::anyhow; use async_trait::async_trait; -use hyper::{body, Body, Request}; +use hyper::http::Method; +use hyper::{body, Body, Request, Uri}; use k8s_openapi::api::core::v1::Pod; use kube::api::{Api, AttachParams}; use std::time::Duration; @@ -20,92 +21,92 @@ pub trait Destroyer { async fn shutdown(&self, action: &Action, pod_name: &str, container_name: &str) -> anyhow::Result<()>; } -/// Private trait for the actual business of shutting down pods -#[async_trait] -trait DestroyerActions { - /// Helper to shut down a container via `exec` - async fn shutdown_exec(&self, action: &Action, pod_name: &str, container_name: &str) -> anyhow::Result<()>; - /// Helper to shut down a container via `portforward` - async fn shutdown_portforward(&self, action: &Action, pod_name: &str, container_name: &str) -> anyhow::Result<()>; -} - #[async_trait] impl Destroyer for Api { async fn shutdown(&self, action: &Action, pod_name: &str, container_name: &str) -> anyhow::Result<()> { - match action.action_type { - ActionType::Exec => self.shutdown_exec(action, pod_name, container_name).await?, - ActionType::Portforward => self.shutdown_portforward(action, pod_name, container_name).await?, - _ => (), - }; - Ok(()) + shutdown_pod(self, action, pod_name, container_name).await } } -#[async_trait] -impl DestroyerActions for Api { - async fn shutdown_exec(&self, action: &Action, pod_name: &str, container_name: &str) -> anyhow::Result<()> { - let command: Vec<&str> = action.command.as_ref().unwrap().split(' ').collect(); - debug!("{pod_name}: running command: {command:?}"); - match self - .exec( - pod_name, - command.clone(), - &AttachParams::default().container(container_name).stdout(false), - ) - .await - { - Ok(_) => info!("{pod_name}: sent `{command:?}` to {container_name}",), - Err(err) => return Err(anyhow!(format!("{pod_name}: exec failed in {container_name}: {err}"))), - }; - Ok(()) +async fn shutdown_pod(pod: &Api, action: &Action, pod_name: &str, container_name: &str) -> anyhow::Result<()> { + match action { + Action::Exec(command) => shutdown_exec(pod, command, pod_name, container_name).await, + Action::Portforward(method, path, port) => { + shutdown_portforward(pod, method, path, *port, pod_name, container_name).await + } } +} + +async fn shutdown_exec( + pod: &Api, + command: &Vec, + pod_name: &str, + container_name: &str, +) -> anyhow::Result<()> { + debug!("{pod_name}: running command: {command:?}"); + match pod + .exec( + pod_name, + command, + &AttachParams::default().container(container_name).stdout(false), + ) + .await + { + Ok(_) => info!("{pod_name}: sent `{command:?}` to {container_name}",), + Err(err) => return Err(anyhow!(format!("{pod_name}: exec failed in {container_name}: {err}"))), + }; + Ok(()) +} - async fn shutdown_portforward(&self, action: &Action, pod_name: &str, container_name: &str) -> anyhow::Result<()> { - let port = action.port.unwrap(); - let mut pf = self.portforward(pod_name, &[port]).await?; - let stream = pf.take_stream(port).unwrap(); - let (mut sender, connection) = hyper::client::conn::handshake(stream).await?; +async fn shutdown_portforward( + pod: &Api, + method: &Method, + path: &Uri, + port: u16, + pod_name: &str, + container_name: &str, +) -> anyhow::Result<()> { + let mut pf = pod.portforward(pod_name, &[port]).await?; + let (mut sender, connection) = match pf.take_stream(port) { + None => return Err(anyhow!(format!("Unable to attach to port: {port}"))), + Some(s) => hyper::client::conn::handshake(s).await?, + }; - let inner_pod_name = pod_name.to_string(); - tokio::spawn(async move { - if let Err(e) = connection.await { - error!("{inner_pod_name}: error in portforward connection: {e}"); - } - }); + let inner_pod_name = pod_name.to_string(); + tokio::spawn(async move { + if let Err(e) = connection.await { + error!("{inner_pod_name}: error in portforward connection: {e}"); + } + }); - let method = action.method.as_ref().unwrap().as_str(); - let path = action.path.as_ref().unwrap().as_str(); - let req = Request::builder() - .uri(path) - .header("Connection", "close") - .header("Host", "127.0.0.1") - .method(method) - .body(Body::from("")) - .unwrap(); + let req = Request::builder() + .uri(path) + .header("Connection", "close") + .header("Host", "127.0.0.1") + .method(method) + .body(Body::from(""))?; - debug!("{pod_name}: sending HTTP request ({method} {path} at {port})"); + debug!("{pod_name}: sending HTTP request ({method} {path} at {port})"); - let req_future = sender.send_request(req); + let req_future = sender.send_request(req); - let (parts, body) = match tokio::time::timeout(Duration::from_secs(1), req_future).await { - Ok(req) => req?.into_parts(), - Err(_) => { - return Err(anyhow!(format!( - "{pod_name}: HTTP request ({method} {path} at port {port}) failed: request timeout" - ))) - } - }; - let status_code = parts.status; - debug!("{pod_name}: got status code {status_code}"); - if status_code != 200 { - let body_bytes = body::to_bytes(body).await?; - let body_str = std::str::from_utf8(&body_bytes)?; + let (parts, body) = match tokio::time::timeout(Duration::from_secs(1), req_future).await { + Ok(req) => req?.into_parts(), + Err(_) => { return Err(anyhow!(format!( - "{pod_name}: HTTP request ({method} {path} at port {port}) failed: code {status_code}: {body_str}" - ))); - } else { - info!("{pod_name}: sent HTTP request `{method} {path}` at port {port} to {container_name}",) + "{pod_name}: HTTP request ({method} {path} at port {port}) failed: request timeout" + ))) } - Ok(()) + }; + let status_code = parts.status; + debug!("{pod_name}: got status code {status_code}"); + if status_code != 200 { + let body_bytes = body::to_bytes(body).await?; + let body_str = std::str::from_utf8(&body_bytes)?; + return Err(anyhow!(format!( + "{pod_name}: HTTP request ({method} {path} at port {port}) failed: code {status_code}: {body_str}" + ))); } + info!("{pod_name}: sent HTTP request `{method} {path}` at port {port} to {container_name}",); + Ok(()) } diff --git a/src/main.rs b/src/main.rs index ad7dfaf..04ac017 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,7 +27,9 @@ static PROMETHEUS_PORT: u16 = 8999; #[tokio::main] async fn main() -> anyhow::Result<()> { let rust_log_env = env::var("RUST_LOG").unwrap_or_else(|_| "hahaha=info,kube=warn".to_string()); - let filter_layer = tracing_subscriber::EnvFilter::builder().with_regex(false).parse_lossy(&rust_log_env); + let filter_layer = tracing_subscriber::EnvFilter::builder() + .with_regex(false) + .parse_lossy(&rust_log_env); let format_layer = tracing_subscriber::fmt::layer().json().flatten_event(true); tracing_subscriber::registry() .with(filter_layer) @@ -41,10 +43,7 @@ async fn main() -> anyhow::Result<()> { let lp = ListParams::default().labels("nais.io/naisjob=true"); let h = hostname::get()?; - let host_name = match h.to_str() { - Some(s) => s, - None => "hahaha-1337", // consider dying here, this should never happen after all. - }; + let host_name = h.to_str().unwrap_or("hahaha-1337"); let reporter = Reporter { controller: "hahaha".into(), @@ -64,7 +63,11 @@ async fn main() -> anyhow::Result<()> { .run( reconciler::reconcile, reconciler::error_policy, - Arc::new(reconciler::Data::new(client, reporter, actions)), + Arc::new(reconciler::Data { + client, + reporter, + actions, + }), ) .for_each(|res| async move { match res { diff --git a/src/pod.rs b/src/pod.rs index f485d66..b886149 100644 --- a/src/pod.rs +++ b/src/pod.rs @@ -19,7 +19,7 @@ trait SidecarStates { fn main_container(&self) -> Result; } -/// Extension trait for ContainerStatus +/// Extension trait for `ContainerStatus` trait ContainerState { /// Helper to determine if a Container is terminated fn is_terminated(&self) -> bool; @@ -28,7 +28,7 @@ trait ContainerState { impl Sidecars for Pod { fn sidecars(&self) -> anyhow::Result> { let sidecars = self.running_sidecars()?; - if sidecars.len() == 0 { + if sidecars.is_empty() { // if there's nothing to be found, we're probably still starting up. return Ok(sidecars); } @@ -40,15 +40,13 @@ impl Sidecars for Pod { } fn job_name(&self) -> anyhow::Result { - let labels = match &self.metadata.labels { - Some(l) => l, - None => return Err(anyhow!("no labels found on pod")), + let Some(labels) = &self.metadata.labels else { + return Err(anyhow!("no labels found on pod")); }; - let app_name = match labels.get("app") { - Some(name) => name, - None => return Err(anyhow!("no app name found on pod")), + let Some(app_name) = labels.get("app") else { + return Err(anyhow!("no app name found on pod")); }; - return Ok(app_name.into()); + Ok(app_name.into()) } } @@ -65,10 +63,10 @@ impl SidecarStates for Pod { fn main_container(&self) -> Result { let app = JobPod::from(self)?; - match app.statuses.iter().find(|c| c.name == app.name) { - Some(c) => Ok(c.clone()), - None => Err(anyhow!("couldn't determine main container")), - } + app.statuses + .iter() + .find(|c| c.name == app.name) + .map_or_else(|| Err(anyhow!("couldn't determine main containter")), |c| Ok(c.clone())) } } @@ -78,27 +76,25 @@ struct JobPod { } impl JobPod { - pub fn from(pod: &Pod) -> Result { - let labels = match &pod.metadata.labels { - Some(l) => l, - None => return Err(anyhow!("no labels found on pod")), - }; - let app_name = match labels.get("app") { - Some(name) => name, - None => return Err(anyhow!("no app name found on pod")), - }; - let pod_status = match &pod.status { - Some(spec) => spec, - None => return Err(anyhow!("no spec found on pod")), - }; + pub fn from(pod: &Pod) -> Result { + let app_name = &pod + .metadata + .labels + .as_ref() + .ok_or_else(|| anyhow!("no labels found on pod"))? + .get("app") + .ok_or_else(|| anyhow!("no app name found on pod"))?; - let container_statuses: Vec = match &pod_status.container_statuses { - Some(status) => status.to_vec(), - None => Vec::new(), // if no container statuses are found, return an empty Vec. we're probably starting up - }; + let container_statuses: Vec = pod + .status + .as_ref() + .ok_or_else(|| anyhow!("no spec found on pod"))? + .container_statuses + .as_ref() + .map_or_else(Vec::new, Clone::clone); - Ok(JobPod { - name: app_name.into(), + Ok(Self { + name: (*app_name).to_string(), statuses: container_statuses, }) } @@ -108,11 +104,6 @@ impl ContainerState for ContainerStatus { // We could probably see if the termination reason is // a good one to avoid taking unnecessary measures.. but whatever fn is_terminated(&self) -> bool { - let last_state = match &self.state { - Some(state) => state, - None => return false, - }; - - last_state.terminated.is_some() + self.state.as_ref().map_or(false, |c| c.terminated.is_some()) } } diff --git a/src/prometheus.rs b/src/prometheus.rs index 3be5e26..0edf77e 100644 --- a/src/prometheus.rs +++ b/src/prometheus.rs @@ -31,15 +31,14 @@ lazy_static! { } /// The function which triggers on any request to the server (incl. any path) -async fn metric_service(_req: Request) -> hyper::Result> { +async fn metric_service(_req: Request) -> anyhow::Result> { let encoder = TextEncoder::new(); let mut buffer = vec![]; let mf = prometheus::gather(); - encoder.encode(&mf, &mut buffer).unwrap(); + encoder.encode(&mf, &mut buffer)?; Ok(Response::builder() .header(hyper::header::CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer)) - .unwrap()) + .body(Body::from(buffer))?) } /// The function which spawns the prometheus server @@ -87,7 +86,7 @@ async fn server_functions_and_shuts_down_gracefully() { .unwrap(); let mut buffer = String::new(); while let Some(chunk) = res.body_mut().data().await { - buffer += &String::from_utf8_lossy(&chunk.unwrap().to_vec()); + buffer += &String::from_utf8_lossy(&chunk.unwrap()); } assert!(buffer.contains("hahaha_total_unsuccessful_event_posts 2")); diff --git a/src/reconciler.rs b/src/reconciler.rs index 3d3e244..dd9fbc4 100644 --- a/src/reconciler.rs +++ b/src/reconciler.rs @@ -3,8 +3,8 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; use k8s_openapi::api::core::v1::Pod; use kube::{ runtime::{ - controller::{Action as ReconcilerAction}, - events::{Reporter, Recorder, Event, EventType}, + controller::Action as ReconcilerAction, + events::{Event, EventType, Recorder, Reporter}, }, Api, Client, Resource, ResourceExt, }; @@ -22,35 +22,18 @@ pub enum Error { } pub struct Data { - client: Client, - reporter: Reporter, - actions: BTreeMap, -} - -impl Data { - pub fn new(client: Client, reporter: Reporter, actions: BTreeMap) -> Data { - Data { - client, - reporter, - actions, - } - } + pub(crate) client: Client, + pub(crate) reporter: Reporter, + pub(crate) actions: BTreeMap, } pub async fn reconcile(pod: Arc, ctx: Arc) -> Result { - let namespace = match pod.namespace() { - Some(namespace) => namespace, - None => "default".into(), - }; + let namespace = pod.namespace().unwrap_or("default".into()); let api: Api = Api::namespaced(ctx.client.clone(), &namespace); reconcile_inner(api, pod, ctx).await } -pub async fn reconcile_inner( - api: impl Destroyer, - pod: Arc, - ctx: Arc, -) -> Result { +pub async fn reconcile_inner(api: impl Destroyer, pod: Arc, ctx: Arc) -> Result { let pod_name = pod.name_any(); let namespace = match pod.namespace() { Some(namespace) => namespace, @@ -68,11 +51,7 @@ pub async fn reconcile_inner( } // set up a recorder for publishing events to the Pod - let recorder = Recorder::new( - ctx.client.clone(), - ctx.reporter.clone(), - pod.object_ref(&()), - ); + let recorder = Recorder::new(ctx.client.clone(), ctx.reporter.clone(), pod.object_ref(&())); debug!("{pod_name}: needs help shutting down some residual containers"); @@ -88,26 +67,25 @@ pub async fn reconcile_inner( for sidecar in running_sidecars { let sidecar_name = sidecar.name; debug!("{pod_name}: found sidecar {sidecar_name}"); - let action = match ctx.actions.get(&sidecar_name) { - Some(action) => action, - None => { - warn!("{pod_name}: missing defined action: {sidecar_name}"); - UNSUPPORTED_SIDECARS - .with_label_values(&[&sidecar_name, &job_name, &namespace]) - .inc(); - continue; - } + let Some(action) = ctx.actions.get(&sidecar_name) else { + warn!("{pod_name}: missing defined action: {sidecar_name}"); + UNSUPPORTED_SIDECARS + .with_label_values(&[&sidecar_name, &job_name, &namespace]) + .inc(); + continue; }; - let res = api.shutdown(action, &pod_name, &sidecar_name).await; - if let Err(err) = res { + + let res = api.shutdown(action, &pod_name, &sidecar_name); + if let Err(err) = res.await { if let Err(e) = recorder .publish(Event { action: "Killing".into(), reason: "Killing".into(), - note: Some(format!("Unsuccessfully shut down container {sidecar_name}: {err}").into()), + note: Some(format!("Unsuccessfully shut down container {sidecar_name}: {err}")), type_: EventType::Warning, - secondary: None - }).await + secondary: None, + }) + .await { warn!("{pod_name}: couldn't publish Kubernetes Event: {e}"); TOTAL_UNSUCCESSFUL_EVENT_POSTS.inc(); @@ -117,13 +95,16 @@ pub async fn reconcile_inner( .inc(); return Err(Error::SidecarShutdownFailed(pod_name, sidecar_name, err)); } - if let Err(e) = recorder.publish(Event { - action: "Killing".into(), - reason: "Killing".into(), - note: Some(format!("Shut down container {sidecar_name}").into()), - type_: EventType::Normal, - secondary: None - }).await { + if let Err(e) = recorder + .publish(Event { + action: "Killing".into(), + reason: "Killing".into(), + note: Some(format!("Shut down container {sidecar_name}")), + type_: EventType::Normal, + secondary: None, + }) + .await + { warn!("{pod_name}: couldn't publish Kubernetes Event: {e}"); TOTAL_UNSUCCESSFUL_EVENT_POSTS.inc(); } @@ -155,16 +136,11 @@ mod tests { apimachinery::pkg::apis::meta::v1::Time, chrono::Utc, }; - use kube::{ - api::ObjectMeta, - client::ConfigExt, - runtime::events::Reporter, - Client, Config, - }; + use kube::{api::ObjectMeta, client::ConfigExt, runtime::events::Reporter, Client, Config}; use tower::ServiceBuilder; /// creates a bogus kube client that doesn't connect anywhere useful - fn make_data() -> Data { + fn make_data() -> Data { let config = Config::new("/".parse::().unwrap()); let service = ServiceBuilder::new() .layer(config.base_uri_layer())