Skip to content

Commit

Permalink
Merge pull request #9 from nais/carlfoo
Browse files Browse the repository at this point in the history
A little bit of cleanup, full-on clippy + refactor action types
  • Loading branch information
Reasonable-Solutions authored Aug 14, 2023
2 parents 231f27e + 5b32718 commit cd59a05
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 245 deletions.
93 changes: 28 additions & 65 deletions src/actions.rs
Original file line number Diff line number Diff line change
@@ -1,73 +1,36 @@
use std::collections::BTreeMap;

use hyper::http::Method;
use hyper::Uri;
use std::{collections::BTreeMap};
/// Generate the action `BTreeMap`
///
/// Modify this function to add or remove sidecar definitions and their associated shutdown procedures.
pub fn generate() -> BTreeMap<String, Action> {
let mut actions = BTreeMap::new();

actions.exec("cloudsql-proxy", "kill -s INT 1");
actions.exec("vks-sidecar", "/bin/kill -s INT 1");
actions.exec("secure-logs-configmap-reload", "/bin/killall configmap-reload");
actions.portforward("linkerd-proxy", "POST", "/shutdown", 4191);
actions.portforward("secure-logs-fluentd", "GET", "/api/processes.killWorkers", 24444);
actions
BTreeMap::from([
(
"cloudsql-proxy".into(),
Action::Exec("kill -s INT 1".split(' ').map(String::from).collect()),
),
(
"vks-sidecar".into(),
Action::Exec("/bin/kill -s INT 1".split(' ').map(String::from).collect()),
),
(
"secure-logs-configmap-reload".into(),
Action::Exec("/bin/killall configmap-reload".split(' ').map(String::from).collect()),
),
(
"linkerd-proxy".into(),
Action::Portforward(Method::POST, "/shutdown".parse::<Uri>().unwrap(), 4191),
),
(
"secure-logs-fluentd".into(),
Action::Portforward(Method::GET, "/api/processes.killWorkers".parse::<Uri>().unwrap(), 24444),
),
])
}

#[derive(Debug)]
pub enum ActionType {
Portforward,
Exec,
None,
}

impl Default for ActionType {
fn default() -> Self {
ActionType::None
}
}

#[derive(Default, Debug)]
pub struct Action {
pub action_type: ActionType,
pub method: Option<String>,
pub path: Option<String>,
pub port: Option<u16>,
pub command: Option<String>,
}

/// Helper trait for inserting different `Action`s with different `ActionType`s into a `BTreeMap`
///
/// Using this trait will allow us to catch simple misconfigurations in Actions during compile time.
trait ActionInsertions {
/// Inserts an action with `ActionType::Exec` into a `BTreeMap`
fn exec(&mut self, target_container: &str, command: &str);
/// Inserts an action with `ActionType::Portforward` into a `BTreeMap`
fn portforward(&mut self, target_container: &str, method: &str, path: &str, port: u16);
}

impl ActionInsertions for BTreeMap<String, Action> {
fn exec(&mut self, target_container: &str, command: &str) {
self.insert(
target_container.into(),
Action {
action_type: ActionType::Exec,
command: Some(command.into()),
..Default::default()
},
);
}

fn portforward(&mut self, target_container: &str, method: &str, path: &str, port: u16) {
self.insert(
target_container.into(),
Action {
action_type: ActionType::Portforward,
method: Some(method.into()),
path: Some(path.into()),
port: Some(port.into()),
..Default::default()
},
);
}
pub enum Action {
Portforward(Method, Uri, u16),
Exec(Vec<String>),
}
149 changes: 75 additions & 74 deletions src/api.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#[cfg(test)]
use mockall::automock;

use crate::actions::{Action, ActionType};
use crate::actions::Action;
use anyhow::anyhow;
use async_trait::async_trait;
use hyper::{body, Body, Request};
use hyper::http::Method;
use hyper::{body, Body, Request, Uri};
use k8s_openapi::api::core::v1::Pod;
use kube::api::{Api, AttachParams};
use std::time::Duration;
Expand All @@ -20,92 +21,92 @@ pub trait Destroyer {
async fn shutdown(&self, action: &Action, pod_name: &str, container_name: &str) -> anyhow::Result<()>;
}

/// Private trait for the actual business of shutting down pods
#[async_trait]
trait DestroyerActions {
/// Helper to shut down a container via `exec`
async fn shutdown_exec(&self, action: &Action, pod_name: &str, container_name: &str) -> anyhow::Result<()>;
/// Helper to shut down a container via `portforward`
async fn shutdown_portforward(&self, action: &Action, pod_name: &str, container_name: &str) -> anyhow::Result<()>;
}

#[async_trait]
impl Destroyer for Api<Pod> {
async fn shutdown(&self, action: &Action, pod_name: &str, container_name: &str) -> anyhow::Result<()> {
match action.action_type {
ActionType::Exec => self.shutdown_exec(action, pod_name, container_name).await?,
ActionType::Portforward => self.shutdown_portforward(action, pod_name, container_name).await?,
_ => (),
};
Ok(())
shutdown_pod(self, action, pod_name, container_name).await
}
}

#[async_trait]
impl DestroyerActions for Api<Pod> {
async fn shutdown_exec(&self, action: &Action, pod_name: &str, container_name: &str) -> anyhow::Result<()> {
let command: Vec<&str> = action.command.as_ref().unwrap().split(' ').collect();
debug!("{pod_name}: running command: {command:?}");
match self
.exec(
pod_name,
command.clone(),
&AttachParams::default().container(container_name).stdout(false),
)
.await
{
Ok(_) => info!("{pod_name}: sent `{command:?}` to {container_name}",),
Err(err) => return Err(anyhow!(format!("{pod_name}: exec failed in {container_name}: {err}"))),
};
Ok(())
async fn shutdown_pod(pod: &Api<Pod>, action: &Action, pod_name: &str, container_name: &str) -> anyhow::Result<()> {
match action {
Action::Exec(command) => shutdown_exec(pod, command, pod_name, container_name).await,
Action::Portforward(method, path, port) => {
shutdown_portforward(pod, method, path, *port, pod_name, container_name).await
}
}
}

async fn shutdown_exec(
pod: &Api<Pod>,
command: &Vec<String>,
pod_name: &str,
container_name: &str,
) -> anyhow::Result<()> {
debug!("{pod_name}: running command: {command:?}");
match pod
.exec(
pod_name,
command,
&AttachParams::default().container(container_name).stdout(false),
)
.await
{
Ok(_) => info!("{pod_name}: sent `{command:?}` to {container_name}",),
Err(err) => return Err(anyhow!(format!("{pod_name}: exec failed in {container_name}: {err}"))),
};
Ok(())
}

async fn shutdown_portforward(&self, action: &Action, pod_name: &str, container_name: &str) -> anyhow::Result<()> {
let port = action.port.unwrap();
let mut pf = self.portforward(pod_name, &[port]).await?;
let stream = pf.take_stream(port).unwrap();
let (mut sender, connection) = hyper::client::conn::handshake(stream).await?;
async fn shutdown_portforward(
pod: &Api<Pod>,
method: &Method,
path: &Uri,
port: u16,
pod_name: &str,
container_name: &str,
) -> anyhow::Result<()> {
let mut pf = pod.portforward(pod_name, &[port]).await?;
let (mut sender, connection) = match pf.take_stream(port) {
None => return Err(anyhow!(format!("Unable to attach to port: {port}"))),
Some(s) => hyper::client::conn::handshake(s).await?,
};

let inner_pod_name = pod_name.to_string();
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("{inner_pod_name}: error in portforward connection: {e}");
}
});
let inner_pod_name = pod_name.to_string();
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("{inner_pod_name}: error in portforward connection: {e}");
}
});

let method = action.method.as_ref().unwrap().as_str();
let path = action.path.as_ref().unwrap().as_str();
let req = Request::builder()
.uri(path)
.header("Connection", "close")
.header("Host", "127.0.0.1")
.method(method)
.body(Body::from(""))
.unwrap();
let req = Request::builder()
.uri(path)
.header("Connection", "close")
.header("Host", "127.0.0.1")
.method(method)
.body(Body::from(""))?;

debug!("{pod_name}: sending HTTP request ({method} {path} at {port})");
debug!("{pod_name}: sending HTTP request ({method} {path} at {port})");

let req_future = sender.send_request(req);
let req_future = sender.send_request(req);

let (parts, body) = match tokio::time::timeout(Duration::from_secs(1), req_future).await {
Ok(req) => req?.into_parts(),
Err(_) => {
return Err(anyhow!(format!(
"{pod_name}: HTTP request ({method} {path} at port {port}) failed: request timeout"
)))
}
};
let status_code = parts.status;
debug!("{pod_name}: got status code {status_code}");
if status_code != 200 {
let body_bytes = body::to_bytes(body).await?;
let body_str = std::str::from_utf8(&body_bytes)?;
let (parts, body) = match tokio::time::timeout(Duration::from_secs(1), req_future).await {
Ok(req) => req?.into_parts(),
Err(_) => {
return Err(anyhow!(format!(
"{pod_name}: HTTP request ({method} {path} at port {port}) failed: code {status_code}: {body_str}"
)));
} else {
info!("{pod_name}: sent HTTP request `{method} {path}` at port {port} to {container_name}",)
"{pod_name}: HTTP request ({method} {path} at port {port}) failed: request timeout"
)))
}
Ok(())
};
let status_code = parts.status;
debug!("{pod_name}: got status code {status_code}");
if status_code != 200 {
let body_bytes = body::to_bytes(body).await?;
let body_str = std::str::from_utf8(&body_bytes)?;
return Err(anyhow!(format!(
"{pod_name}: HTTP request ({method} {path} at port {port}) failed: code {status_code}: {body_str}"
)));
}
info!("{pod_name}: sent HTTP request `{method} {path}` at port {port} to {container_name}",);
Ok(())
}
15 changes: 9 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ static PROMETHEUS_PORT: u16 = 8999;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let rust_log_env = env::var("RUST_LOG").unwrap_or_else(|_| "hahaha=info,kube=warn".to_string());
let filter_layer = tracing_subscriber::EnvFilter::builder().with_regex(false).parse_lossy(&rust_log_env);
let filter_layer = tracing_subscriber::EnvFilter::builder()
.with_regex(false)
.parse_lossy(&rust_log_env);
let format_layer = tracing_subscriber::fmt::layer().json().flatten_event(true);
tracing_subscriber::registry()
.with(filter_layer)
Expand All @@ -41,10 +43,7 @@ async fn main() -> anyhow::Result<()> {
let lp = ListParams::default().labels("nais.io/naisjob=true");

let h = hostname::get()?;
let host_name = match h.to_str() {
Some(s) => s,
None => "hahaha-1337", // consider dying here, this should never happen after all.
};
let host_name = h.to_str().unwrap_or("hahaha-1337");

let reporter = Reporter {
controller: "hahaha".into(),
Expand All @@ -64,7 +63,11 @@ async fn main() -> anyhow::Result<()> {
.run(
reconciler::reconcile,
reconciler::error_policy,
Arc::new(reconciler::Data::new(client, reporter, actions)),
Arc::new(reconciler::Data {
client,
reporter,
actions,
}),
)
.for_each(|res| async move {
match res {
Expand Down
Loading

0 comments on commit cd59a05

Please sign in to comment.