Skip to content

Commit

Permalink
Remove the client from subscription manager on a disconnect (#2170)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsdt authored Jan 24, 2025
1 parent 85ffb28 commit 729dbb1
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
1 change: 1 addition & 0 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ impl ModuleHost {
}

pub async fn disconnect_client(&self, client_id: ClientActorId) {
log::trace!("disconnecting client {}", client_id);
let this = self.clone();
let _ = tokio::task::spawn_blocking(move || {
this.subscriptions().remove_subscriber(client_id);
Expand Down
10 changes: 9 additions & 1 deletion crates/core/src/subscription/module_subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ impl SubscriptionManager {
self.queries.contains_key(hash)
}

#[cfg(test)]
fn contains_client(&self, subscriber: &ClientId) -> bool {
self.clients.contains_key(subscriber)
}

#[cfg(test)]
fn contains_legacy_subscription(&self, subscriber: &ClientId, query: &QueryHash) -> bool {
self.queries
Expand Down Expand Up @@ -284,7 +289,7 @@ impl SubscriptionManager {
#[tracing::instrument(level = "trace", skip_all)]
pub fn remove_all_subscriptions(&mut self, client: &ClientId) {
self.remove_legacy_subscriptions(client);
let Some(client_info) = self.clients.get(client) else {
let Some(client_info) = self.clients.remove(client) else {
return;
};
debug_assert!(client_info.legacy_subscriptions.is_empty());
Expand Down Expand Up @@ -688,14 +693,17 @@ mod tests {
.map(|client| (client.id.identity, client.id.address))
.collect::<Vec<_>>();
subscriptions.remove_all_subscriptions(&client_ids[0]);
assert!(!subscriptions.contains_client(&client_ids[0]));
// There are still two left.
assert!(subscriptions.query_reads_from_table(&hash, &table_id));
subscriptions.remove_all_subscriptions(&client_ids[1]);
// There is still one left.
assert!(subscriptions.query_reads_from_table(&hash, &table_id));
assert!(!subscriptions.contains_client(&client_ids[1]));
subscriptions.remove_all_subscriptions(&client_ids[2]);
// Now there are no subscribers.
assert!(!subscriptions.query_reads_from_table(&hash, &table_id));
assert!(!subscriptions.contains_client(&client_ids[2]));

Ok(())
}
Expand Down

1 comment on commit 729dbb1

@github-actions
Copy link

@github-actions github-actions bot commented on 729dbb1 Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking failed. Please check the workflow run for details.

Please sign in to comment.