Skip to content

Commit

Permalink
dekaf: Gracefully handle invalid connection attempts without shutting…
Browse files Browse the repository at this point in the history
… down the whole server
  • Loading branch information
jshearer committed Sep 19, 2024
1 parent 8c314dd commit ee15c0b
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions crates/dekaf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ async fn main() -> anyhow::Result<()> {
tls_cfg.certificate_file.clone().unwrap(),
tls_cfg.certificate_key_file.clone().unwrap(),
)
.await?;
.await
.context("failed to open or read certificate or certificate key file")?;

let schema_server_task = axum_server::bind_rustls(schema_addr, axum_rustls_config.clone())
.serve(schema_router.into_make_service());
Expand Down Expand Up @@ -195,8 +196,12 @@ async fn main() -> anyhow::Result<()> {
let acceptor = acceptor.clone();
tokio::select! {
accept = kafka_listener.accept() => {
let (socket, addr) = accept?;
let socket = acceptor.accept(socket).await?;
let Ok((socket, addr)) = accept else {
continue
};
let Ok(socket) = acceptor.accept(socket).await else {
continue
};

tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone()));
}
Expand All @@ -214,7 +219,9 @@ async fn main() -> anyhow::Result<()> {
loop {
tokio::select! {
accept = kafka_listener.accept() => {
let (socket, addr) = accept?;
let Ok((socket, addr)) = accept else {
continue
};
socket.set_nodelay(true)?;

tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone()));
Expand Down

0 comments on commit ee15c0b

Please sign in to comment.