From ce2f7b9c4672efad1130c136f8f32c0485fd66f8 Mon Sep 17 00:00:00 2001 From: Tamim <66521954+tamimaj@users.noreply.github.com> Date: Thu, 29 Feb 2024 17:22:11 +0100 Subject: [PATCH] Fix: Process multiple received streams concurrently. (#13) --- lib/redis.client.ts | 7 ++++--- lib/redis.server.ts | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/lib/redis.client.ts b/lib/redis.client.ts index b988480..c3844c2 100644 --- a/lib/redis.client.ts +++ b/lib/redis.client.ts @@ -260,9 +260,10 @@ export class RedisStreamClient extends ClientProxy { // if BLOCK time ended, and results are null, listen again. if (!results) return this.listenOnStreams(); - const [key, messages] = results[0]; - - await this.notifyHandlers(key, messages); + for (let result of results) { + let [stream, messages] = result; + await this.notifyHandlers(stream, messages); + } return this.listenOnStreams(); } catch (error) { diff --git a/lib/redis.server.ts b/lib/redis.server.ts index 82f4f1a..6d3e254 100644 --- a/lib/redis.server.ts +++ b/lib/redis.server.ts @@ -268,9 +268,10 @@ export class RedisStreamStrategy // if BLOCK time ended, and results are null, listen again. if (!results) return this.listenOnStreams(); - const [key, messages] = results[0]; - - await this.notifyHandlers(key, messages); + for (let result of results) { + let [stream, messages] = result; + await this.notifyHandlers(stream, messages); + } return this.listenOnStreams(); } catch (error) {