diff --git a/README.md b/README.md index f74aecd..0fa4db5 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,7 @@ async function bootstrap() { block: 5000, consumer: 'users-1', consumerGroup: 'users', + deleteMessagesAfterAck: true, // optional: delete message from stream }, // optional. See our example main.ts file for more details... // serialization: {}, diff --git a/lib/interfaces.ts b/lib/interfaces.ts index a3b6646..6c7bdc4 100644 --- a/lib/interfaces.ts +++ b/lib/interfaces.ts @@ -15,6 +15,7 @@ interface RedisStreamOptionsXreadGroup { block?: number; consumerGroup: string; consumer: string; + deleteMessagesAfterAck?: boolean; } export type RedisStreamOptions = RedisStreamOptionsXreadGroup; diff --git a/lib/redis.server.ts b/lib/redis.server.ts index e776709..82f4f1a 100644 --- a/lib/redis.server.ts +++ b/lib/redis.server.ts @@ -153,6 +153,13 @@ export class RedisStreamStrategy inboundContext.getMessageId(), ); + if (true === this.options?.streams?.deleteMessagesAfterAck) { + await this.client.xdel( + inboundContext.getStream(), + inboundContext.getMessageId(), + ); + } + return true; } catch (error) { this.logger.error(error);