Skip to content

Commit

Permalink
fix: subscription failures when multiple clients are attempting to st…
Browse files Browse the repository at this point in the history
…ream

Fix subscription failures when multiple clients are attempting to stream
  • Loading branch information
nakajima authored Sep 21, 2023
2 parents c50ec6f + 87a7571 commit e412cd0
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 27 deletions.
28 changes: 18 additions & 10 deletions android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,14 @@ class XMTPModule : Module() {
)
}

Function("unsubscribeFromConversations") {
Function("unsubscribeFromConversations") { clientAddress: String ->
logV("unsubscribeFromConversations")
subscriptions["conversations"]?.cancel()
subscriptions[getConversationsKey(clientAddress)]?.cancel()
}

Function("unsubscribeFromAllMessages") {
Function("unsubscribeFromAllMessages") { clientAddress: String ->
logV("unsubscribeFromAllMessages")
subscriptions["messages"]?.cancel()
subscriptions[getMessagesKey(clientAddress)]?.cancel()
}

AsyncFunction("unsubscribeFromMessages") { clientAddress: String, topic: String ->
Expand Down Expand Up @@ -517,8 +517,8 @@ class XMTPModule : Module() {
private fun subscribeToConversations(clientAddress: String) {
val client = clients[clientAddress] ?: throw XMTPException("No client")

subscriptions["conversations"]?.cancel()
subscriptions["conversations"] = CoroutineScope(Dispatchers.IO).launch {
subscriptions[getConversationsKey(clientAddress)]?.cancel()
subscriptions[getConversationsKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch {
try {
client.conversations.stream().collect { conversation ->
sendEvent(
Expand All @@ -531,16 +531,16 @@ class XMTPModule : Module() {
}
} catch (e: Exception) {
Log.e("XMTPModule", "Error in conversations subscription: $e")
subscriptions["conversations"]?.cancel()
subscriptions[getConversationsKey(clientAddress)]?.cancel()
}
}
}

private fun subscribeToAllMessages(clientAddress: String) {
val client = clients[clientAddress] ?: throw XMTPException("No client")

subscriptions["messages"]?.cancel()
subscriptions["messages"] = CoroutineScope(Dispatchers.IO).launch {
subscriptions[getMessagesKey(clientAddress)]?.cancel()
subscriptions[getMessagesKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch {
try {
client.conversations.streamAllMessages().collect { message ->
sendEvent(
Expand All @@ -553,7 +553,7 @@ class XMTPModule : Module() {
}
} catch (e: Exception) {
Log.e("XMTPModule", "Error in all messages subscription: $e")
subscriptions["messages"]?.cancel()
subscriptions[getMessagesKey(clientAddress)]?.cancel()
}
}
}
Expand Down Expand Up @@ -584,6 +584,14 @@ class XMTPModule : Module() {
}
}

private fun getMessagesKey(clientAddress: String): String {
return "messages:$clientAddress"
}

private fun getConversationsKey(clientAddress: String): String {
return "conversations:$clientAddress"
}

private fun unsubscribeFromMessages(
clientAddress: String,
topic: String,
Expand Down
30 changes: 19 additions & 11 deletions ios/XMTPModule.swift
Original file line number Diff line number Diff line change
Expand Up @@ -474,12 +474,12 @@ public class XMTPModule: Module {
try await subscribeToMessages(clientAddress: clientAddress, topic: topic)
}

AsyncFunction("unsubscribeFromConversations") {
await subscriptionsManager.getSubscription(key: "conversations")?.cancel()
AsyncFunction("unsubscribeFromConversations") { (clientAddress: String) in
await subscriptionsManager.getSubscription(key: getConversationsKey(clientAddress: clientAddress))?.cancel()
}

AsyncFunction("unsubscribeFromAllMessages") {
await subscriptionsManager.getSubscription(key: "messages")?.cancel()
AsyncFunction("unsubscribeFromAllMessages") { (clientAddress: String) in
await subscriptionsManager.getSubscription(key: getMessagesKey(clientAddress: clientAddress))?.cancel()
}

AsyncFunction("unsubscribeFromMessages") { (clientAddress: String, topic: String) in
Expand Down Expand Up @@ -570,18 +570,18 @@ public class XMTPModule: Module {
return
}

await subscriptionsManager.getSubscription(key: "conversations")?.cancel()
await subscriptionsManager.updateSubscription(key: "conversations", task: Task {
await subscriptionsManager.getSubscription(key: getConversationsKey(clientAddress: clientAddress))?.cancel()
await subscriptionsManager.updateSubscription(key: getConversationsKey(clientAddress: clientAddress), task: Task {
do {
for try await conversation in try await client.conversations.stream() {
for try await conversation in await client.conversations.stream() {
try sendEvent("conversation", [
"clientAddress": clientAddress,
"conversation": ConversationWrapper.encodeToObj(conversation, client: client),
])
}
} catch {
print("Error in conversations subscription: \(error)")
await subscriptionsManager.getSubscription(key: "conversations")?.cancel()
await subscriptionsManager.getSubscription(key: getConversationsKey(clientAddress: clientAddress))?.cancel()
}
})
}
Expand All @@ -591,8 +591,8 @@ public class XMTPModule: Module {
return
}

await subscriptionsManager.getSubscription(key: "messages")?.cancel()
await subscriptionsManager.updateSubscription(key: "messages", task: Task {
await subscriptionsManager.getSubscription(key: getMessagesKey(clientAddress: clientAddress))?.cancel()
await subscriptionsManager.updateSubscription(key: getMessagesKey(clientAddress: clientAddress), task: Task {
do {
for try await message in try await client.conversations.streamAllMessages() {
do {
Expand All @@ -606,7 +606,7 @@ public class XMTPModule: Module {
}
} catch {
print("Error in all messages subscription: \(error)")
await subscriptionsManager.getSubscription(key: "messages")?.cancel()
await subscriptionsManager.getSubscription(key: getMessagesKey(clientAddress: clientAddress))?.cancel()
}
})
}
Expand Down Expand Up @@ -643,4 +643,12 @@ public class XMTPModule: Module {

await subscriptionsManager.getSubscription(key: conversation.cacheKey(clientAddress))?.cancel()
}

func getMessagesKey(clientAddress: String) -> String {
return "messages:\(clientAddress)"
}

func getConversationsKey(clientAddress: String) -> String {
return "conversations:\(clientAddress)"
}
}
8 changes: 4 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,12 @@ export async function subscribeToMessages(
return await XMTPModule.subscribeToMessages(clientAddress, topic);
}

export function unsubscribeFromConversations() {
return XMTPModule.unsubscribeFromConversations();
export function unsubscribeFromConversations(clientAddress: string) {
return XMTPModule.unsubscribeFromConversations(clientAddress);
}

export function unsubscribeFromAllMessages() {
return XMTPModule.unsubscribeFromAllMessages();
export function unsubscribeFromAllMessages(clientAddress: string) {
return XMTPModule.unsubscribeFromAllMessages(clientAddress);
}

export async function unsubscribeFromMessages(
Expand Down
4 changes: 2 additions & 2 deletions src/lib/Conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ export default class Conversations {
}

cancelStream() {
XMTPModule.unsubscribeFromConversations();
XMTPModule.unsubscribeFromConversations(this.client.address);
}

cancelStreamAllMessages() {
XMTPModule.unsubscribeFromAllMessages();
XMTPModule.unsubscribeFromAllMessages(this.client.address);
}
}

0 comments on commit e412cd0

Please sign in to comment.