Skip to content

Commit

Permalink
fix: Mux lock being blocked on shared parent RefContext
Browse files Browse the repository at this point in the history
  • Loading branch information
zach-robinson-dev committed Jul 16, 2024
1 parent 1564a4b commit 9d6abd1
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 85 deletions.
7 changes: 1 addition & 6 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use kube::{
Api, Client, Resource, ResourceExt,
};
use serde_json::json;
use tokio_context::context::RefContext;
#[allow(unused_imports)]
use tracing::{debug, error, info, warn};

Expand Down Expand Up @@ -236,10 +235,8 @@ pub async fn run(client: Client) -> Result<()> {
.applied_objects()
.predicate_filter(predicates::generation);

let (ctx, handle) = RefContext::new();

let (remote_watcher_manager, remote_objects_trigger) =
RemoteWatcherManager::new(ctx, client.clone());
RemoteWatcherManager::new(client.clone());

Controller::for_stream(resource_syncs, reader)
.reconcile_on(remote_objects_trigger)
Expand All @@ -256,8 +253,6 @@ pub async fn run(client: Client) -> Result<()> {
.for_each(|_| futures::future::ready(()))
.await;

handle.cancel();

Ok(())
}

Expand Down
143 changes: 72 additions & 71 deletions src/remote_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use kube::Resource;
use kubert::client::Client;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::sleep;
use tokio_context::context::{Context, RefContext};
use tokio_context::context::Context;
use tracing::{debug, error};

use crate::filters::Filterable;
Expand All @@ -28,7 +28,6 @@ pub struct RemoteWatcherKey {
pub struct RemoteWatcher {
key: RemoteWatcherKey,
sender: UnboundedSender<Result<ObjectRef<ResourceSync>, watcher::Error>>,
ctx: RefContext,
client: Client,
}

Expand Down Expand Up @@ -56,13 +55,11 @@ impl RemoteWatcher {
pub fn new(
key: RemoteWatcherKey,
sender: UnboundedSender<Result<ObjectRef<ResourceSync>, watcher::Error>>,
ctx: RefContext,
client: Client,
) -> Self {
Self {
key,
sender,
ctx,
client,
}
}
Expand All @@ -78,15 +75,14 @@ impl RemoteWatcher {
self.send_reconcile();
}

pub async fn run(self) {
pub async fn run(self, mut ctx: Context) {
let mut backoff = DefaultBackoff::default();

let (mut ctx, handle) = Context::with_parent(&self.ctx, None);

loop {
tokio::select! {
biased;

_ = ctx.done() => {
handle.cancel();
return;
},
Err(err) = self.start(&mut backoff) => {
Expand Down Expand Up @@ -140,77 +136,82 @@ impl RemoteWatcher {
resource_version: &str,
backoff: &mut DefaultBackoff,
) -> Result<()> {
let (mut ctx, handle) = Context::with_parent(&self.ctx, None);

let watch_params = WatchParams::default().fields(&format!("metadata.name={}", object_name));
let mut resource_version = resource_version.to_string();

loop {
tokio::select! {
_ = ctx.done() => {
handle.cancel();
return Ok(());
},
stream = api.watch(&watch_params, &resource_version) => {
debug!("Started watch at ResourceVersion {:#?} for remote object: {:#?}", resource_version, self.key);

let mut stream = stream?.boxed();

while let Some(event) = stream.try_next().await? {
resource_version = match event {
WatchEvent::Deleted(obj) => {
let event_rv = rv_for!(obj);

debug!("Sending reconcile on watch event at ResourceVersion {:#?} for deleted object: {:#?}", event_rv, self.key);
self.send_reconcile_on_success(backoff);

event_rv
}
WatchEvent::Added(obj) | WatchEvent::Modified(obj) => {
let event_rv = rv_for!(obj);

match obj.was_last_modified_by(&ResourceSync::group(&())) {
None => {
debug!("Sending reconcile on watch event at ResourceVersion {:#?} because it is impossible to determine if the object was last modified by us for object: {:#?}", event_rv, self.key);
self.send_reconcile_on_success(backoff);
}
Some(was_last_modified_by_us) if !was_last_modified_by_us => {
debug!("Sending reconcile on watch event at ResourceVersion {:#?} for externally modified object: {:#?}", event_rv, self.key);
self.send_reconcile_on_success(backoff);
}
_ => {
debug!("Ignoring watch event at ResourceVersion {:#?} for object modified by us: {:#?}", event_rv, self.key);
}
}

event_rv
}
WatchEvent::Bookmark(bookmark) => {
let bookmark_rv = bookmark.metadata.resource_version.clone();

debug!("Bookmark event received at ResourceVersion {:#?} for object: {:#?}", bookmark_rv, self.key);
backoff.reset();

bookmark_rv
}
WatchEvent::Error(err) if err.code == 410 => {
debug!("ResourceVersion {:#?} is expired, so we restart from the beginning for object: {:#?}", resource_version, self.key);
"0".to_string()
}
WatchEvent::Error(err) => {
send_reconcile_on_fail!(
self,
backoff,
"Error watching remote object: {}",
err
);

resource_version
}
resource_version = self
.listen(api, resource_version, &watch_params, backoff)
.await?;
}
}

async fn listen(
&self,
api: &NamespacedApi,
mut resource_version: String,
watch_params: &WatchParams,
backoff: &mut DefaultBackoff,
) -> Result<String> {
debug!(
"Started watch at ResourceVersion {:#?} for remote object: {:#?}",
resource_version, self.key
);

let mut stream = api.watch(watch_params, &resource_version).await?.boxed();

while let Some(event) = stream.try_next().await? {
resource_version = match event {
WatchEvent::Deleted(obj) => {
let event_rv = rv_for!(obj);

debug!("Sending reconcile on watch event at ResourceVersion {:#?} for deleted object: {:#?}", event_rv, self.key);
self.send_reconcile_on_success(backoff);

event_rv
}
WatchEvent::Added(obj) | WatchEvent::Modified(obj) => {
let event_rv = rv_for!(obj);

match obj.was_last_modified_by(&ResourceSync::group(&())) {
None => {
debug!("Sending reconcile on watch event at ResourceVersion {:#?} because it is impossible to determine if the object was last modified by us for object: {:#?}", event_rv, self.key);
self.send_reconcile_on_success(backoff);
}
Some(was_last_modified_by_us) if !was_last_modified_by_us => {
debug!("Sending reconcile on watch event at ResourceVersion {:#?} for externally modified object: {:#?}", event_rv, self.key);
self.send_reconcile_on_success(backoff);
}
_ => {
debug!("Ignoring watch event at ResourceVersion {:#?} for object modified by us: {:#?}", event_rv, self.key);
}
}

event_rv
}
WatchEvent::Bookmark(bookmark) => {
let bookmark_rv = bookmark.metadata.resource_version.clone();

debug!(
"Bookmark event received at ResourceVersion {:#?} for object: {:#?}",
bookmark_rv, self.key
);
backoff.reset();

bookmark_rv
}
WatchEvent::Error(err) if err.code == 410 => {
debug!("ResourceVersion {:#?} is expired, so we restart from the beginning for object: {:#?}", resource_version, self.key);
"0".to_string()
}
WatchEvent::Error(err) => {
send_reconcile_on_fail!(self, backoff, "Error watching remote object: {}", err);

resource_version
}
}
}

Ok(resource_version)
}
}
12 changes: 4 additions & 8 deletions src/remote_watcher_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use kubert::client::Client;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinHandle;
use tokio_context::context::{Handle, RefContext};
use tokio_context::context::{Context, Handle};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error};

Expand All @@ -18,23 +18,20 @@ type ContextAndThreadHandle = (Handle, JoinHandle<()>);
type SyncMap<K, V> = Arc<Mutex<HashMap<K, V>>>;

pub struct RemoteWatcherManager {
ctx: RefContext,
watchers: SyncMap<RemoteWatcherKey, ContextAndThreadHandle>,
sender: UnboundedSender<Result<ObjectRef<ResourceSync>, watcher::Error>>,
client: Client,
}

impl RemoteWatcherManager {
pub fn new(
ctx: RefContext,
client: Client,
) -> (
Self,
UnboundedReceiverStream<Result<ObjectRef<ResourceSync>, watcher::Error>>,
) {
let (sender, receiver) = mpsc::unbounded_channel();
let manager = RemoteWatcherManager {
ctx,
watchers: Arc::new(Mutex::new(HashMap::new())),
sender,
client,
Expand All @@ -52,11 +49,10 @@ impl RemoteWatcherManager {

debug!("Starting remote watcher for: {:#?}", key);

let (ctx, handle) = RefContext::with_parent(&self.ctx, None);
let watcher =
RemoteWatcher::new(key.clone(), self.sender.clone(), ctx, self.client.clone());
let (ctx, handle) = Context::new();
let watcher = RemoteWatcher::new(key.clone(), self.sender.clone(), self.client.clone());

let join_handle = tokio::spawn(watcher.run());
let join_handle = tokio::spawn(watcher.run(ctx));

watchers.insert(key.clone(), (handle, join_handle));
}
Expand Down

0 comments on commit 9d6abd1

Please sign in to comment.