From 2c924160abbca9c9f18607c2d209a003be2db8fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Alonso?= <32513992+nicoalonso@users.noreply.github.com> Date: Thu, 4 Jan 2024 16:44:34 +0100 Subject: [PATCH] Allow delete messages option after acknowledge (#12) * Allow delete message after acknowledge * Update Readme * Create node package * Revert name of package * Change option name to deleteMessagesAfterAck * Update Readme --- README.md | 1 + lib/interfaces.ts | 1 + lib/redis.server.ts | 7 +++++++ 3 files changed, 9 insertions(+) diff --git a/README.md b/README.md index f74aecd..0fa4db5 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,7 @@ async function bootstrap() { block: 5000, consumer: 'users-1', consumerGroup: 'users', + deleteMessagesAfterAck: true, // optional: delete message from stream }, // optional. See our example main.ts file for more details... // serialization: {}, diff --git a/lib/interfaces.ts b/lib/interfaces.ts index a3b6646..6c7bdc4 100644 --- a/lib/interfaces.ts +++ b/lib/interfaces.ts @@ -15,6 +15,7 @@ interface RedisStreamOptionsXreadGroup { block?: number; consumerGroup: string; consumer: string; + deleteMessagesAfterAck?: boolean; } export type RedisStreamOptions = RedisStreamOptionsXreadGroup; diff --git a/lib/redis.server.ts b/lib/redis.server.ts index e776709..82f4f1a 100644 --- a/lib/redis.server.ts +++ b/lib/redis.server.ts @@ -153,6 +153,13 @@ export class RedisStreamStrategy inboundContext.getMessageId(), ); + if (true === this.options?.streams?.deleteMessagesAfterAck) { + await this.client.xdel( + inboundContext.getStream(), + inboundContext.getMessageId(), + ); + } + return true; } catch (error) { this.logger.error(error);