Skip to content

Commit

Permalink
Added a healthcheck server, which is optional. Will simplify k8s heal…
Browse files Browse the repository at this point in the history
…thchecks
  • Loading branch information
Christopher Woods authored and Christopher Woods committed Oct 24, 2024
1 parent e9ce09b commit 99e7fae
Show file tree
Hide file tree
Showing 23 changed files with 240 additions and 21 deletions.
1 change: 1 addition & 0 deletions bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async fn main() -> Result<()> {
Some("ws://localhost:8044".to_owned()),
Some("127.0.0.1".to_owned()),
Some(8044),
None,
Some("http://localhost:3000".to_owned()),
Some("127.0.0.1".to_owned()),
Some(3000),
Expand Down
1 change: 1 addition & 0 deletions cluster/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ async fn main() -> Result<()> {
Some("ws://localhost:8045".to_owned()),
Some("127.0.0.1".to_owned()),
Some(8045),
None,
Some(AgentType::Platform),
);

Expand Down
1 change: 1 addition & 0 deletions docs/cmdline/cluster/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ async fn main() -> Result<()> {
Some("ws://localhost:8091".to_owned()),
Some("127.0.0.1".to_owned()),
Some(8091),
None,
Some(AgentType::Instance),
);

Expand Down
1 change: 1 addition & 0 deletions docs/cmdline/portal/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ async fn main() -> Result<()> {
Some("ws://localhost:8090".to_owned()),
Some("127.0.0.1".to_owned()),
Some(8090),
None,
Some(AgentType::Portal),
);

Expand Down
11 changes: 8 additions & 3 deletions docs/echo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,13 @@ async fn run_client(invitation: &Path) -> Result<(), Error> {
// create the echo-client service - note that the url, ip and
// port aren't used, as this service won't be listening for any
// connecting clients
let mut service: ServiceConfig =
ServiceConfig::new("echo-client", "http://localhost:6502", "127.0.0.1", &6502)?;
let mut service: ServiceConfig = ServiceConfig::new(
"echo-client",
"http://localhost:6502",
"127.0.0.1",
&6502,
&None,
)?;

// now give the invitation to connect to the server to the client
service.add_server(invite)?;
Expand Down Expand Up @@ -257,7 +262,7 @@ async fn run_server(
invitation: &Path,
) -> Result<(), Error> {
// create the echo-server service
let mut service = ServiceConfig::new("echo-server", url, ip, port)?;
let mut service = ServiceConfig::new("echo-server", url, ip, port, &None)?;

let invite = service.add_client("echo-client", range)?;

Expand Down
11 changes: 8 additions & 3 deletions docs/job/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,13 @@ async fn run_cluster(invitation: &Path) -> Result<(), Error> {
// create the paddington service for the cluster agent
// - note that the url, ip and port aren't used, as this
// agent won't be listening for any connecting clients
let mut service: ServiceConfig =
ServiceConfig::new("cluster", "http://localhost:6502", "127.0.0.1", &6502)?;
let mut service: ServiceConfig = ServiceConfig::new(
"cluster",
"http://localhost:6502",
"127.0.0.1",
&6502,
&None,
)?;

// now give the invitation to connect to the server to the client
service.add_server(invite)?;
Expand Down Expand Up @@ -255,7 +260,7 @@ async fn run_portal(
invitation: &Path,
) -> Result<(), Error> {
// create a paddington service configuration for the portal agent
let mut service = ServiceConfig::new("portal", url, ip, port)?;
let mut service = ServiceConfig::new("portal", url, ip, port, &None)?;

// add the cluster to the portal, returning an invitation
let invite = service.add_client("cluster", range)?;
Expand Down
1 change: 1 addition & 0 deletions filesystem/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async fn main() -> Result<()> {
Some("ws://localhost:8047".to_owned()),
Some("127.0.0.1".to_owned()),
Some(8047),
None,
Some(AgentType::Filesystem),
);

Expand Down
1 change: 1 addition & 0 deletions freeipa/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async fn main() -> Result<()> {
Some("ws://localhost:8046".to_owned()),
Some("127.0.0.1".to_owned()),
Some(8046),
None,
Some(AgentType::Account),
);

Expand Down
1 change: 1 addition & 0 deletions paddington/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ built = { version = "0.7", default-features = false, features = ["git2"] }

[dependencies]
anyhow = { version="1.0.86", features = ["backtrace"] }
axum = { version = "0.7", features = ["tracing", "query"] }
dirs = "5.0.1"
futures = "0.3.30"
futures-channel = "0.3.30"
Expand Down
6 changes: 6 additions & 0 deletions paddington/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::config::{PeerConfig, ServiceConfig};
use crate::connection::Connection;
use crate::error::Error;
use crate::exchange;
use crate::healthcheck;

pub async fn run_once(config: ServiceConfig, peer: PeerConfig) -> Result<(), Error> {
let service_name = config.name();
Expand Down Expand Up @@ -44,6 +45,11 @@ pub async fn run(config: ServiceConfig, peer: PeerConfig) -> Result<(), Error> {
// set the name of the service in the exchange
exchange::set_name(&config.name()).await?;

if let Some(healthcheck_port) = config.healthcheck_port() {
// spawn the health check server
healthcheck::spawn(config.ip(), healthcheck_port).await?;
}

loop {
match run_once(config.clone(), peer.clone()).await {
Ok(_) => {
Expand Down
44 changes: 33 additions & 11 deletions paddington/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub struct Defaults {
url: String,
ip: String,
port: u16,
healthcheck_port: Option<u16>,
}

impl Defaults {
Expand All @@ -82,6 +83,7 @@ impl Defaults {
url: Option<String>,
ip: Option<String>,
port: Option<u16>,
healthcheck_port: Option<u16>,
) -> Self {
let config_file = config_file.unwrap_or(
dirs::config_local_dir()
Expand All @@ -99,6 +101,7 @@ impl Defaults {
url: url.unwrap_or("http://localhost:8000".to_owned()),
ip: ip.unwrap_or("127.0.0.1".to_owned()),
port: port.unwrap_or(8042),
healthcheck_port,
}
}

Expand All @@ -121,6 +124,10 @@ impl Defaults {
pub fn port(&self) -> u16 {
self.port
}

pub fn healthcheck_port(&self) -> Option<u16> {
self.healthcheck_port
}
}

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand Down Expand Up @@ -412,21 +419,29 @@ pub struct ServiceConfig {
url: String,
ip: IpAddr,
port: u16,
heathcheck_port: Option<u16>,

servers: Vec<ServerConfig>,
clients: Vec<ClientConfig>,
encryption: Option<EncryptionScheme>,
}

impl ServiceConfig {
pub fn new(name: &str, url: &str, ip: &str, port: &u16) -> Result<Self, Error> {
pub fn new(
name: &str,
url: &str,
ip: &str,
port: &u16,
healthcheck_port: &Option<u16>,
) -> Result<Self, Error> {
Ok(ServiceConfig {
name: name.to_string(),
url: create_websocket_url(url)?,
ip: ip
.parse()
.with_context(|| format!("Could not parse IP address: {}", ip))?,
port: *port,
heathcheck_port: *healthcheck_port,
servers: Vec::new(),
clients: Vec::new(),
encryption: None,
Expand Down Expand Up @@ -493,6 +508,10 @@ impl ServiceConfig {
self.port
}

pub fn healthcheck_port(&self) -> Option<u16> {
self.heathcheck_port
}

pub fn add_client(&mut self, name: &str, ip: &str) -> Result<Invite, Error> {
let ip = IpOrRange::new(ip)
.with_context(|| format!("Could not parse into an IP address or IP range: {}", ip))?;
Expand Down Expand Up @@ -573,6 +592,7 @@ impl ServiceConfig {
url: String,
ip: IpAddr,
port: u16,
healthcheck_port: &Option<u16>,
) -> Result<ServiceConfig, Error> {
// see if this config_dir exists - return an error if it does
let config_file = path::absolute(config_file).with_context(|| {
Expand All @@ -586,7 +606,7 @@ impl ServiceConfig {
return Err(Error::NotExists(config_file.to_string_lossy().to_string()));
}

let config = ServiceConfig::new(&name, &url, &ip.to_string(), &port)?;
let config = ServiceConfig::new(&name, &url, &ip.to_string(), &port, healthcheck_port)?;
save::<ServiceConfig>(config.clone(), &config_file)?;

// check we can read the config and return it
Expand Down Expand Up @@ -649,15 +669,17 @@ mod tests {

#[test]
fn test_invitations() {
let mut primary = ServiceConfig::new("primary", "http://localhost", "127.0.0.1", &5544)
.unwrap_or_else(|e| {
unreachable!("Cannot create service config: {}", e);
});

let mut secondary = ServiceConfig::new("secondary", "http://localhost", "127.0.0.1", &5545)
.unwrap_or_else(|e| {
unreachable!("Cannot create service config: {}", e);
});
let mut primary =
ServiceConfig::new("primary", "http://localhost", "127.0.0.1", &5544, &None)
.unwrap_or_else(|e| {
unreachable!("Cannot create service config: {}", e);
});

let mut secondary =
ServiceConfig::new("secondary", "http://localhost", "127.0.0.1", &5545, &None)
.unwrap_or_else(|e| {
unreachable!("Cannot create service config: {}", e);
});

// introduce the secondary to the primary
let invite = primary
Expand Down
3 changes: 2 additions & 1 deletion paddington/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ mod tests {
async fn test_run() -> Result<()> {
// this tests that the service can be configured and will run
// (it will exit immediately as there are no clients or servers)
let config = ServiceConfig::new("test_server", "http://localhost", "127.0.0.1", &5544)?;
let config =
ServiceConfig::new("test_server", "http://localhost", "127.0.0.1", &5544, &None)?;
run(config).await?;

Ok(())
Expand Down
118 changes: 118 additions & 0 deletions paddington/src/healthcheck.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// SPDX-FileCopyrightText: © 2024 Christopher Woods <[email protected]>
// SPDX-License-Identifier: MIT

use crate::Error;

use anyhow::Result;
use axum::{
extract::Json,
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
Router,
};
use once_cell::sync::Lazy;
use serde_json::json;
use std::net::IpAddr;
use std::sync::RwLock;
use tokio::net::TcpListener;

//
// Health check endpoint for the web API
//
#[tracing::instrument(skip_all)]
async fn health() -> Result<Json<serde_json::Value>, AppError> {
Ok(Json(json!({"status": "ok"})))
}

///
/// Function spawned to run the API server in a background thread
///
async fn run_server(app: Router, listener: TcpListener) -> Result<()> {
match axum::serve(listener, app).await {
Ok(_) => {
tracing::info!("Server ran successfully");
}
Err(e) => {
tracing::error!("Error starting server: {}", e);
}
}

Ok(())
}

static IS_RUNNING: Lazy<RwLock<bool>> = Lazy::new(|| RwLock::new(false));

///
/// Spawn a small http server that responds to health checks
///
pub async fn spawn(ip: IpAddr, port: u16) -> Result<(), Error> {
// check if the server is already running
match IS_RUNNING.read() {
Ok(guard) => {
if *guard {
// already running
return Ok(());
}
}
Err(e) => {
// not running?
tracing::error!("Error getting read lock: {}", e);
return Ok(());
}
}

// set the flag to indicate the server is running
match IS_RUNNING.write() {
Ok(mut guard) => {
if *guard {
// someone else set it first
return Ok(());
}

*guard = true;
}
Err(e) => {
// not running?
tracing::error!("Error getting write lock: {}", e);
return Ok(());
}
}

tracing::info!("Starting health check server on {}:{}/health", ip, port);

// create the web API
let app = Router::new().route("/health", get(health));

// create a TCP listener on the specified port
let listener = tokio::net::TcpListener::bind(&std::net::SocketAddr::new(ip, port)).await?;

// spawn a new task to run the web server to listen for requests
tokio::spawn(run_server(app, listener));

Ok(())
}

// Errors

#[derive(Debug)]
struct AppError(anyhow::Error, Option<axum::http::StatusCode>);

impl IntoResponse for AppError {
fn into_response(self) -> Response {
(
self.1.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
Json(json!({"message":format!("Something went wrong: {:?}", self.0)})),
)
.into_response()
}
}

impl<E> From<E> for AppError
where
E: Into<anyhow::Error>,
{
fn from(err: E) -> Self {
Self(err.into(), None)
}
}
1 change: 1 addition & 0 deletions paddington/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod crypto;
mod error;
mod eventloop;
mod exchange;
mod healthcheck;
mod server;

// public API
Expand Down
Loading

0 comments on commit 99e7fae

Please sign in to comment.