Skip to content

Commit

Permalink
xadd-maxlen-support
Browse files Browse the repository at this point in the history
  • Loading branch information
Zach Sherbondy authored and Zach Sherbondy committed Sep 13, 2024
1 parent f4920a6 commit 5aa6a80
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 3 deletions.
6 changes: 5 additions & 1 deletion lib/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ interface RedisStreamOptionsXreadGroup {
deleteMessagesAfterAck?: boolean;
}

export type RedisStreamOptions = RedisStreamOptionsXreadGroup;
interface RedisStreamOptionsXadd {
maxLen?: number;
}

export type RedisStreamOptions = RedisStreamOptionsXreadGroup & RedisStreamOptionsXadd;

// [id, [key, value, key, value]]
export type RawStreamMessage = [id: string, payload: string[]];
Expand Down
10 changes: 9 additions & 1 deletion lib/redis.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { RequestsMap } from './requests-map';
import { deserialize, generateCorrelationId, serialize } from './streams.utils';
import { RedisStreamContext } from './stream.context';
import { firstValueFrom, share } from 'rxjs';
import { RedisValue } from 'ioredis';

@Injectable()
export class RedisStreamClient extends ClientProxy {
Expand Down Expand Up @@ -104,9 +105,16 @@ export class RedisStreamClient extends ClientProxy {
try {
if (!this.client) throw new Error('Redis client instance not found.');

const commandArgs: RedisValue[] = [];
if(this.options.streams?.maxLen){
commandArgs.push("MAXLEN")
commandArgs.push("~")
commandArgs.push(this.options.streams.maxLen.toString())
}
commandArgs.push("*")
let response = await this.client.xadd(
stream,
'*',
...commandArgs,
...serializedPayloadArray,
);
return response;
Expand Down
15 changes: 14 additions & 1 deletion lib/redis.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { CONNECT_EVENT, ERROR_EVENT } from '@nestjs/microservices/constants';
import { deserialize, serialize } from './streams.utils';
import { RedisStreamContext } from './stream.context';
import { Observable } from 'rxjs';
import { RedisValue } from 'ioredis';

export class RedisStreamStrategy
extends Server
Expand Down Expand Up @@ -140,7 +141,19 @@ export class RedisStreamStrategy

if (!this.client) throw new Error('Redis client instance not found.');

await this.client.xadd(responseObj.stream, '*', ...serializedEntries);
const commandArgs: RedisValue[] = [];
if(this.options.streams?.maxLen){
commandArgs.push("MAXLEN")
commandArgs.push("~")
commandArgs.push(this.options.streams.maxLen.toString())
}
commandArgs.push("*")

await this.client.xadd(
responseObj.stream,
...commandArgs,
...serializedEntries,
);
}),
);

Expand Down

0 comments on commit 5aa6a80

Please sign in to comment.