diff --git a/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt b/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt index 69c9a5d12..7bc4df43b 100644 --- a/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt +++ b/android/src/main/java/expo/modules/xmtpreactnativesdk/XMTPModule.kt @@ -441,14 +441,14 @@ class XMTPModule : Module() { ) } - Function("unsubscribeFromConversations") { + Function("unsubscribeFromConversations") { clientAddress: String -> logV("unsubscribeFromConversations") - subscriptions["conversations"]?.cancel() + subscriptions[getConversationsKey(clientAddress)]?.cancel() } - Function("unsubscribeFromAllMessages") { + Function("unsubscribeFromAllMessages") { clientAddress: String -> logV("unsubscribeFromAllMessages") - subscriptions["messages"]?.cancel() + subscriptions[getMessagesKey(clientAddress)]?.cancel() } AsyncFunction("unsubscribeFromMessages") { clientAddress: String, topic: String -> @@ -517,8 +517,8 @@ class XMTPModule : Module() { private fun subscribeToConversations(clientAddress: String) { val client = clients[clientAddress] ?: throw XMTPException("No client") - subscriptions["conversations"]?.cancel() - subscriptions["conversations"] = CoroutineScope(Dispatchers.IO).launch { + subscriptions[getConversationsKey(clientAddress)]?.cancel() + subscriptions[getConversationsKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch { try { client.conversations.stream().collect { conversation -> sendEvent( @@ -531,7 +531,7 @@ class XMTPModule : Module() { } } catch (e: Exception) { Log.e("XMTPModule", "Error in conversations subscription: $e") - subscriptions["conversations"]?.cancel() + subscriptions[getConversationsKey(clientAddress)]?.cancel() } } } @@ -539,8 +539,8 @@ class XMTPModule : Module() { private fun subscribeToAllMessages(clientAddress: String) { val client = clients[clientAddress] ?: throw XMTPException("No client") - subscriptions["messages"]?.cancel() - subscriptions["messages"] = CoroutineScope(Dispatchers.IO).launch { + subscriptions[getMessagesKey(clientAddress)]?.cancel() + subscriptions[getMessagesKey(clientAddress)] = CoroutineScope(Dispatchers.IO).launch { try { client.conversations.streamAllMessages().collect { message -> sendEvent( @@ -553,7 +553,7 @@ class XMTPModule : Module() { } } catch (e: Exception) { Log.e("XMTPModule", "Error in all messages subscription: $e") - subscriptions["messages"]?.cancel() + subscriptions[getMessagesKey(clientAddress)]?.cancel() } } } @@ -584,6 +584,14 @@ class XMTPModule : Module() { } } + private fun getMessagesKey(clientAddress: String): String { + return "messages:$clientAddress" + } + + private fun getConversationsKey(clientAddress: String): String { + return "conversations:$clientAddress" + } + private fun unsubscribeFromMessages( clientAddress: String, topic: String, diff --git a/ios/XMTPModule.swift b/ios/XMTPModule.swift index 30df006fe..b3586a023 100644 --- a/ios/XMTPModule.swift +++ b/ios/XMTPModule.swift @@ -474,12 +474,12 @@ public class XMTPModule: Module { try await subscribeToMessages(clientAddress: clientAddress, topic: topic) } - AsyncFunction("unsubscribeFromConversations") { - await subscriptionsManager.getSubscription(key: "conversations")?.cancel() + AsyncFunction("unsubscribeFromConversations") { (clientAddress: String) in + await subscriptionsManager.getSubscription(key: getConversationsKey(clientAddress: clientAddress))?.cancel() } - AsyncFunction("unsubscribeFromAllMessages") { - await subscriptionsManager.getSubscription(key: "messages")?.cancel() + AsyncFunction("unsubscribeFromAllMessages") { (clientAddress: String) in + await subscriptionsManager.getSubscription(key: getMessagesKey(clientAddress: clientAddress))?.cancel() } AsyncFunction("unsubscribeFromMessages") { (clientAddress: String, topic: String) in @@ -570,10 +570,10 @@ public class XMTPModule: Module { return } - await subscriptionsManager.getSubscription(key: "conversations")?.cancel() - await subscriptionsManager.updateSubscription(key: "conversations", task: Task { + await subscriptionsManager.getSubscription(key: getConversationsKey(clientAddress: clientAddress))?.cancel() + await subscriptionsManager.updateSubscription(key: getConversationsKey(clientAddress: clientAddress), task: Task { do { - for try await conversation in try await client.conversations.stream() { + for try await conversation in await client.conversations.stream() { try sendEvent("conversation", [ "clientAddress": clientAddress, "conversation": ConversationWrapper.encodeToObj(conversation, client: client), @@ -581,7 +581,7 @@ public class XMTPModule: Module { } } catch { print("Error in conversations subscription: \(error)") - await subscriptionsManager.getSubscription(key: "conversations")?.cancel() + await subscriptionsManager.getSubscription(key: getConversationsKey(clientAddress: clientAddress))?.cancel() } }) } @@ -591,8 +591,8 @@ public class XMTPModule: Module { return } - await subscriptionsManager.getSubscription(key: "messages")?.cancel() - await subscriptionsManager.updateSubscription(key: "messages", task: Task { + await subscriptionsManager.getSubscription(key: getMessagesKey(clientAddress: clientAddress))?.cancel() + await subscriptionsManager.updateSubscription(key: getMessagesKey(clientAddress: clientAddress), task: Task { do { for try await message in try await client.conversations.streamAllMessages() { do { @@ -606,7 +606,7 @@ public class XMTPModule: Module { } } catch { print("Error in all messages subscription: \(error)") - await subscriptionsManager.getSubscription(key: "messages")?.cancel() + await subscriptionsManager.getSubscription(key: getMessagesKey(clientAddress: clientAddress))?.cancel() } }) } @@ -643,4 +643,12 @@ public class XMTPModule: Module { await subscriptionsManager.getSubscription(key: conversation.cacheKey(clientAddress))?.cancel() } + + func getMessagesKey(clientAddress: String) -> String { + return "messages:\(clientAddress)" + } + + func getConversationsKey(clientAddress: String) -> String { + return "conversations:\(clientAddress)" + } } diff --git a/src/index.ts b/src/index.ts index dc7bdfd50..22ed13739 100644 --- a/src/index.ts +++ b/src/index.ts @@ -228,12 +228,12 @@ export async function subscribeToMessages( return await XMTPModule.subscribeToMessages(clientAddress, topic); } -export function unsubscribeFromConversations() { - return XMTPModule.unsubscribeFromConversations(); +export function unsubscribeFromConversations(clientAddress: string) { + return XMTPModule.unsubscribeFromConversations(clientAddress); } -export function unsubscribeFromAllMessages() { - return XMTPModule.unsubscribeFromAllMessages(); +export function unsubscribeFromAllMessages(clientAddress: string) { + return XMTPModule.unsubscribeFromAllMessages(clientAddress); } export async function unsubscribeFromMessages( diff --git a/src/lib/Conversations.ts b/src/lib/Conversations.ts index 0e471521a..ce99fb821 100644 --- a/src/lib/Conversations.ts +++ b/src/lib/Conversations.ts @@ -92,10 +92,10 @@ export default class Conversations { } cancelStream() { - XMTPModule.unsubscribeFromConversations(); + XMTPModule.unsubscribeFromConversations(this.client.address); } cancelStreamAllMessages() { - XMTPModule.unsubscribeFromAllMessages(); + XMTPModule.unsubscribeFromAllMessages(this.client.address); } }