Skip to content

Commit

Permalink
Implement healthcheck for cached api
Browse files Browse the repository at this point in the history
  • Loading branch information
Angelin01 committed Sep 6, 2024
1 parent 942b809 commit 239bdd2
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 7 deletions.
32 changes: 29 additions & 3 deletions src/handler/health.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use axum::extract::State;
use axum::http::StatusCode;
use axum::Json;
use axum::response::IntoResponse;
use serde::Serialize;
use crate::server::AppState;
use crate::service::KubernetesService;

pub async fn health() -> impl IntoResponse {
Json(HealthResponse { status: "ok".into() })
pub async fn health<S: AppState>(State(app_state): State<S>) -> impl IntoResponse {
match app_state.kubernetes().healthy().await {
true => (StatusCode::OK, Json(HealthResponse { status: "ok".into() })),
false => (StatusCode::INTERNAL_SERVER_ERROR, Json(HealthResponse { status: "err".into() })),
}
}

#[derive(Serialize)]
Expand All @@ -24,7 +31,7 @@ mod tests {
use crate::server::state::tests::TestAppState;

#[tokio::test]
async fn health_test() {
async fn health_ok_test() {
let app = server::build_app(TestAppState::new(Config::default()));

let request = Request::builder().uri("/health").body(Body::empty()).unwrap();
Expand All @@ -39,4 +46,23 @@ mod tests {
let body: Value = serde_json::from_slice(&body_bytes).unwrap();
assert_eq!(body, json!({"status": "ok"}));
}

#[tokio::test]
async fn health_err_test() {
let mut app_state = TestAppState::new(Config::default());
app_state.kubernetes.set_error(true);
let app = server::build_app(app_state);

let request = Request::builder().uri("/health").body(Body::empty()).unwrap();

let response = app
.oneshot(request)
.await
.unwrap();

assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
let body_bytes = response.into_body().collect().await.unwrap().to_bytes();
let body: Value = serde_json::from_slice(&body_bytes).unwrap();
assert_eq!(body, json!({"status": "err"}));
}
}
2 changes: 1 addition & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod state;

pub fn build_app<S: AppState>(state: S) -> Router {
Router::new()
.route("/health", get(handler::health))
.route("/health", get(handler::health::<S>))
.route("/mutate", post(handler::mutate::<S>))
.with_state(state)
}
Expand Down
32 changes: 29 additions & 3 deletions src/service/kubernetes.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use axum::async_trait;
use k8s_openapi::api::core::v1::Namespace;
use kube::{Api, Client, ResourceExt};
Expand All @@ -8,12 +10,15 @@ use futures::{future, StreamExt};
#[async_trait]
pub trait KubernetesService: Send + Sync + Clone {
async fn namespace_group<S: AsRef<str> + Send + Sync>(&self, namespace: S) -> Option<String>;

async fn healthy(&self) -> bool;
}

#[derive(Clone)]
pub struct StandardKubernetesService {
store: Store<Namespace>,
group_label: String,
healthy: Arc<AtomicBool>,
}

impl StandardKubernetesService {
Expand All @@ -24,13 +29,22 @@ impl StandardKubernetesService {

let (reader, writer) = kube::runtime::reflector::store();

let healthy = Arc::new(AtomicBool::new(true));

let healthy_clone = Arc::clone(&healthy);
let stream = reflector(writer, watcher)
.default_backoff()
.touched_objects()
.for_each(|r| {
.for_each(move |r| {
let reflector_healthy = Arc::clone(&healthy_clone);
future::ready(match r {
Ok(o) => println!("Saw {}", o.name_any()),
Err(e) => println!("watcher error: {e}"),
Ok(_) => {
reflector_healthy.store(true, Ordering::Relaxed);
}
Err(e) => {
reflector_healthy.store(false, Ordering::Relaxed);
println!("watcher error: {e}")
}
})
});
tokio::spawn(stream);
Expand All @@ -40,6 +54,7 @@ impl StandardKubernetesService {
Ok(StandardKubernetesService {
store: reader,
group_label: group_label.as_ref().to_string(),
healthy,
})
}
}
Expand All @@ -60,6 +75,8 @@ impl KubernetesService for StandardKubernetesService {

result
}

async fn healthy(&self) -> bool { self.healthy.load(Ordering::Relaxed) }
}

#[cfg(test)]
Expand All @@ -71,23 +88,32 @@ pub mod tests {
#[derive(Clone)]
pub struct MockKubernetesService {
namespace_group_map: BTreeMap<String, String>,
is_error: bool,
}

impl MockKubernetesService {
pub fn new() -> Self {
MockKubernetesService {
namespace_group_map: BTreeMap::new(),
is_error: false,
}
}

pub fn set_namespace_group<S: AsRef<str>, R: AsRef<str>>(&mut self, namespace: S, group: R) {
self.namespace_group_map.insert(namespace.as_ref().into(), group.as_ref().into());
}

pub fn set_error(&mut self, is_erroring: bool) {
self.is_error = is_erroring;
}
}

#[async_trait]
impl KubernetesService for MockKubernetesService {
async fn namespace_group<S: AsRef<str> + Send + Sync>(&self, namespace: S) -> Option<String> {
self.namespace_group_map.get(namespace.as_ref()).map(String::to_owned)
}

async fn healthy(&self) -> bool { !self.is_error }
}
}

0 comments on commit 239bdd2

Please sign in to comment.