From 9d6abd10bd7b55f1cb422137c08192f3131769ef Mon Sep 17 00:00:00 2001 From: Zach Robinson Date: Tue, 16 Jul 2024 12:30:46 -0500 Subject: [PATCH] fix: Mux lock being blocked on shared parent RefContext --- src/controller.rs | 7 +- src/remote_watcher.rs | 143 +++++++++++++++++----------------- src/remote_watcher_manager.rs | 12 +-- 3 files changed, 77 insertions(+), 85 deletions(-) diff --git a/src/controller.rs b/src/controller.rs index 0dd0c10..932bb20 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -14,7 +14,6 @@ use kube::{ Api, Client, Resource, ResourceExt, }; use serde_json::json; -use tokio_context::context::RefContext; #[allow(unused_imports)] use tracing::{debug, error, info, warn}; @@ -236,10 +235,8 @@ pub async fn run(client: Client) -> Result<()> { .applied_objects() .predicate_filter(predicates::generation); - let (ctx, handle) = RefContext::new(); - let (remote_watcher_manager, remote_objects_trigger) = - RemoteWatcherManager::new(ctx, client.clone()); + RemoteWatcherManager::new(client.clone()); Controller::for_stream(resource_syncs, reader) .reconcile_on(remote_objects_trigger) @@ -256,8 +253,6 @@ pub async fn run(client: Client) -> Result<()> { .for_each(|_| futures::future::ready(())) .await; - handle.cancel(); - Ok(()) } diff --git a/src/remote_watcher.rs b/src/remote_watcher.rs index 05ba1ea..a945e54 100644 --- a/src/remote_watcher.rs +++ b/src/remote_watcher.rs @@ -11,7 +11,7 @@ use kube::Resource; use kubert::client::Client; use tokio::sync::mpsc::UnboundedSender; use tokio::time::sleep; -use tokio_context::context::{Context, RefContext}; +use tokio_context::context::Context; use tracing::{debug, error}; use crate::filters::Filterable; @@ -28,7 +28,6 @@ pub struct RemoteWatcherKey { pub struct RemoteWatcher { key: RemoteWatcherKey, sender: UnboundedSender, watcher::Error>>, - ctx: RefContext, client: Client, } @@ -56,13 +55,11 @@ impl RemoteWatcher { pub fn new( key: RemoteWatcherKey, sender: UnboundedSender, watcher::Error>>, - ctx: RefContext, client: Client, ) -> Self { Self { key, sender, - ctx, client, } } @@ -78,15 +75,14 @@ impl RemoteWatcher { self.send_reconcile(); } - pub async fn run(self) { + pub async fn run(self, mut ctx: Context) { let mut backoff = DefaultBackoff::default(); - let (mut ctx, handle) = Context::with_parent(&self.ctx, None); - loop { tokio::select! { + biased; + _ = ctx.done() => { - handle.cancel(); return; }, Err(err) = self.start(&mut backoff) => { @@ -140,77 +136,82 @@ impl RemoteWatcher { resource_version: &str, backoff: &mut DefaultBackoff, ) -> Result<()> { - let (mut ctx, handle) = Context::with_parent(&self.ctx, None); - let watch_params = WatchParams::default().fields(&format!("metadata.name={}", object_name)); let mut resource_version = resource_version.to_string(); loop { - tokio::select! { - _ = ctx.done() => { - handle.cancel(); - return Ok(()); - }, - stream = api.watch(&watch_params, &resource_version) => { - debug!("Started watch at ResourceVersion {:#?} for remote object: {:#?}", resource_version, self.key); - - let mut stream = stream?.boxed(); - - while let Some(event) = stream.try_next().await? { - resource_version = match event { - WatchEvent::Deleted(obj) => { - let event_rv = rv_for!(obj); - - debug!("Sending reconcile on watch event at ResourceVersion {:#?} for deleted object: {:#?}", event_rv, self.key); - self.send_reconcile_on_success(backoff); - - event_rv - } - WatchEvent::Added(obj) | WatchEvent::Modified(obj) => { - let event_rv = rv_for!(obj); - - match obj.was_last_modified_by(&ResourceSync::group(&())) { - None => { - debug!("Sending reconcile on watch event at ResourceVersion {:#?} because it is impossible to determine if the object was last modified by us for object: {:#?}", event_rv, self.key); - self.send_reconcile_on_success(backoff); - } - Some(was_last_modified_by_us) if !was_last_modified_by_us => { - debug!("Sending reconcile on watch event at ResourceVersion {:#?} for externally modified object: {:#?}", event_rv, self.key); - self.send_reconcile_on_success(backoff); - } - _ => { - debug!("Ignoring watch event at ResourceVersion {:#?} for object modified by us: {:#?}", event_rv, self.key); - } - } - - event_rv - } - WatchEvent::Bookmark(bookmark) => { - let bookmark_rv = bookmark.metadata.resource_version.clone(); - - debug!("Bookmark event received at ResourceVersion {:#?} for object: {:#?}", bookmark_rv, self.key); - backoff.reset(); - - bookmark_rv - } - WatchEvent::Error(err) if err.code == 410 => { - debug!("ResourceVersion {:#?} is expired, so we restart from the beginning for object: {:#?}", resource_version, self.key); - "0".to_string() - } - WatchEvent::Error(err) => { - send_reconcile_on_fail!( - self, - backoff, - "Error watching remote object: {}", - err - ); - - resource_version - } + resource_version = self + .listen(api, resource_version, &watch_params, backoff) + .await?; + } + } + + async fn listen( + &self, + api: &NamespacedApi, + mut resource_version: String, + watch_params: &WatchParams, + backoff: &mut DefaultBackoff, + ) -> Result { + debug!( + "Started watch at ResourceVersion {:#?} for remote object: {:#?}", + resource_version, self.key + ); + + let mut stream = api.watch(watch_params, &resource_version).await?.boxed(); + + while let Some(event) = stream.try_next().await? { + resource_version = match event { + WatchEvent::Deleted(obj) => { + let event_rv = rv_for!(obj); + + debug!("Sending reconcile on watch event at ResourceVersion {:#?} for deleted object: {:#?}", event_rv, self.key); + self.send_reconcile_on_success(backoff); + + event_rv + } + WatchEvent::Added(obj) | WatchEvent::Modified(obj) => { + let event_rv = rv_for!(obj); + + match obj.was_last_modified_by(&ResourceSync::group(&())) { + None => { + debug!("Sending reconcile on watch event at ResourceVersion {:#?} because it is impossible to determine if the object was last modified by us for object: {:#?}", event_rv, self.key); + self.send_reconcile_on_success(backoff); + } + Some(was_last_modified_by_us) if !was_last_modified_by_us => { + debug!("Sending reconcile on watch event at ResourceVersion {:#?} for externally modified object: {:#?}", event_rv, self.key); + self.send_reconcile_on_success(backoff); + } + _ => { + debug!("Ignoring watch event at ResourceVersion {:#?} for object modified by us: {:#?}", event_rv, self.key); } } + + event_rv + } + WatchEvent::Bookmark(bookmark) => { + let bookmark_rv = bookmark.metadata.resource_version.clone(); + + debug!( + "Bookmark event received at ResourceVersion {:#?} for object: {:#?}", + bookmark_rv, self.key + ); + backoff.reset(); + + bookmark_rv + } + WatchEvent::Error(err) if err.code == 410 => { + debug!("ResourceVersion {:#?} is expired, so we restart from the beginning for object: {:#?}", resource_version, self.key); + "0".to_string() + } + WatchEvent::Error(err) => { + send_reconcile_on_fail!(self, backoff, "Error watching remote object: {}", err); + + resource_version } } } + + Ok(resource_version) } } diff --git a/src/remote_watcher_manager.rs b/src/remote_watcher_manager.rs index f26d7f5..c381f6f 100644 --- a/src/remote_watcher_manager.rs +++ b/src/remote_watcher_manager.rs @@ -7,7 +7,7 @@ use kubert::client::Client; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::{mpsc, Mutex}; use tokio::task::JoinHandle; -use tokio_context::context::{Handle, RefContext}; +use tokio_context::context::{Context, Handle}; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error}; @@ -18,7 +18,6 @@ type ContextAndThreadHandle = (Handle, JoinHandle<()>); type SyncMap = Arc>>; pub struct RemoteWatcherManager { - ctx: RefContext, watchers: SyncMap, sender: UnboundedSender, watcher::Error>>, client: Client, @@ -26,7 +25,6 @@ pub struct RemoteWatcherManager { impl RemoteWatcherManager { pub fn new( - ctx: RefContext, client: Client, ) -> ( Self, @@ -34,7 +32,6 @@ impl RemoteWatcherManager { ) { let (sender, receiver) = mpsc::unbounded_channel(); let manager = RemoteWatcherManager { - ctx, watchers: Arc::new(Mutex::new(HashMap::new())), sender, client, @@ -52,11 +49,10 @@ impl RemoteWatcherManager { debug!("Starting remote watcher for: {:#?}", key); - let (ctx, handle) = RefContext::with_parent(&self.ctx, None); - let watcher = - RemoteWatcher::new(key.clone(), self.sender.clone(), ctx, self.client.clone()); + let (ctx, handle) = Context::new(); + let watcher = RemoteWatcher::new(key.clone(), self.sender.clone(), self.client.clone()); - let join_handle = tokio::spawn(watcher.run()); + let join_handle = tokio::spawn(watcher.run(ctx)); watchers.insert(key.clone(), (handle, join_handle)); }