Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refact: Apply strict mode #16

Merged
merged 4 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions lib/redis-stream-client.core-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,13 @@ export class RedisStreamClientCoreModule {
);
}

if (options.useExisting || options.useFactory) {
return [this.createAsyncClientProvider(options)];
const providers: Provider[] = [this.createAsyncClientProvider(options)];

if (!options.useExisting && !options.useFactory && options.useClass) {
providers.push({ provide: options.useClass, useClass: options.useClass });
}

return [
this.createAsyncClientProvider(options),
{ provide: options.useClass, useClass: options.useClass },
];
return providers
}

/* createAsyncOptionsProvider */
Expand All @@ -86,12 +85,18 @@ export class RedisStreamClientCoreModule {
};
}

const inject = options.useClass
? [options.useClass]
: options.useExisting
? [options.useExisting]
: []

return {
provide: REDIS_STREAM_CLIENT_MODULE_OPTIONS,
useFactory: async (
optionsFactory: RedisStreamClientModuleOptionsFactory,
) => optionsFactory.createRedisStreamClientModuleOptions(),
inject: [options.useClass || options.useExisting],
inject,
};
}
}
30 changes: 19 additions & 11 deletions lib/redis.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import { firstValueFrom, share } from 'rxjs';
export class RedisStreamClient extends ClientProxy {
protected readonly logger = new Logger(RedisStreamClient.name);

private redis: RedisInstance; // server instance for listening on response streams.
private redis: RedisInstance | null = null; // server instance for listening on response streams.

private client: RedisInstance; // client instance for publishing streams.
private client: RedisInstance | null = null; // client instance for publishing streams.

protected connection: Promise<any>; // client connection logic is required by framework.
protected connection: Promise<any> | null = null; // client connection logic is required by framework.

private streamsToListenOn: string[] = []; // response streams to listen on.

Expand All @@ -40,9 +40,9 @@ export class RedisStreamClient extends ClientProxy {
this.logger.log(
'Redis Client Responses Listener connected successfully on ' +
(this.options.connection?.url ??
this.options.connection.host +
this.options.connection?.host +
':' +
this.options.connection.port),
this.options.connection?.port),
);

this.initListener();
Expand Down Expand Up @@ -102,6 +102,8 @@ export class RedisStreamClient extends ClientProxy {

public async handleXadd(stream: string, serializedPayloadArray: any[]) {
try {
if (!this.client) throw new Error('Redis client instance not found.');

let response = await this.client.xadd(
stream,
'*',
Expand Down Expand Up @@ -222,12 +224,14 @@ export class RedisStreamClient extends ClientProxy {

private async createConsumerGroup(stream: string, consumerGroup: string) {
try {
if (!this.redis) throw new Error('Redis instance not found.');

await this.redis.xgroup('CREATE', stream, consumerGroup, '$', 'MKSTREAM');

return true;
} catch (error) {
// if group exist for this stream. log debug.
if (error?.message.includes('BUSYGROUP')) {
if (error instanceof Error && error?.message.includes('BUSYGROUP')) {
this.logger.debug(
'Consumer Group "' +
consumerGroup +
Expand All @@ -242,14 +246,16 @@ export class RedisStreamClient extends ClientProxy {
}
}

private async listenOnStreams() {
private async listenOnStreams(): Promise<void> {
try {
if (!this.redis) throw new Error('Redis instance not found.');

let results: any[];

results = await this.redis.xreadgroup(
'GROUP',
this.options?.streams?.consumerGroup || undefined,
this.options?.streams?.consumer || undefined, // need to make it throw an error.
this.options?.streams?.consumerGroup || '',
this.options?.streams?.consumer || '', // need to make it throw an error.
'BLOCK',
this.options?.streams?.block || 0,
'STREAMS',
Expand Down Expand Up @@ -315,8 +321,8 @@ export class RedisStreamClient extends ClientProxy {

// after message
private async deliverToHandler(
correlationId,
parsedPayload,
correlationId: string,
parsedPayload: any,
ctx: RedisStreamContext,
) {
try {
Expand Down Expand Up @@ -369,6 +375,8 @@ export class RedisStreamClient extends ClientProxy {

private async handleAck(inboundContext: RedisStreamContext) {
try {
if (!this.client) throw new Error('Redis client instance not found.');

await this.client.xack(
inboundContext.getStream(),
inboundContext.getConsumerGroup(),
Expand Down
30 changes: 19 additions & 11 deletions lib/redis.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ export class RedisStreamStrategy
extends Server
implements CustomTransportStrategy
{
private streamHandlerMap = {};
private streamHandlerMap: { [key: string]: any } = {};

private redis: RedisInstance;

private client: RedisInstance;
private redis: RedisInstance | null = null;
private client: RedisInstance | null = null;

constructor(private readonly options: ConstructorOptions) {
super();
Expand All @@ -40,7 +40,7 @@ export class RedisStreamStrategy
this.logger.log(
'Redis connected successfully on ' +
(this.options.connection?.url ??
this.options.connection.host + ':' + this.options.connection.port),
this.options.connection?.host + ':' + this.options.connection?.port),
);

this.bindHandlers();
Expand Down Expand Up @@ -85,20 +85,22 @@ export class RedisStreamStrategy
return true;
} catch (error) {
// JSON.parse will throw error, if is not parsable.
this.logger.debug(error + '. Handler Pattern is: ' + pattern);
this.logger.debug!(error + '. Handler Pattern is: ' + pattern);
return false;
}
}

private async createConsumerGroup(stream: string, consumerGroup: string) {
try {
if (!this.redis) throw new Error('Redis instance not found.');

await this.redis.xgroup('CREATE', stream, consumerGroup, '$', 'MKSTREAM');

return true;
} catch (error) {
// if group exist for this stream. log debug.
if (error?.message.includes('BUSYGROUP')) {
this.logger.debug(
if (error instanceof Error && error.message.includes('BUSYGROUP')) {
this.logger.debug!(
'Consumer Group "' +
consumerGroup +
'" already exists for stream: ' +
Expand Down Expand Up @@ -134,6 +136,8 @@ export class RedisStreamStrategy
);
}

if (!this.client) throw new Error('Redis client instance not found.');

await this.client.xadd(responseObj.stream, '*', ...serializedEntries);
}),
);
Expand All @@ -147,6 +151,8 @@ export class RedisStreamStrategy

private async handleAck(inboundContext: RedisStreamContext) {
try {
if (!this.client) throw new Error('Redis client instance not found.');

await this.client.xack(
inboundContext.getStream(),
inboundContext.getConsumerGroup(),
Expand Down Expand Up @@ -248,14 +254,16 @@ export class RedisStreamStrategy
}
}

private async listenOnStreams() {
private async listenOnStreams(): Promise<void> {
try {
if (!this.redis) throw new Error('Redis instance not found.');

let results: any[];

results = await this.redis.xreadgroup(
'GROUP',
this.options?.streams?.consumerGroup || undefined,
this.options?.streams?.consumer || undefined, // need to make it throw an error.
this.options?.streams?.consumerGroup || '',
this.options?.streams?.consumer || '', // need to make it throw an error.
'BLOCK',
this.options?.streams?.block || 0,
'STREAMS',
Expand Down
3 changes: 2 additions & 1 deletion lib/redis.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export function createRedisConnection(
if (connection?.url) {
return new Redis(connection?.url, connection);
} else {
return new Redis(connection);
// TODO appropriate verification is required when the connection is undefined.
tamimaj marked this conversation as resolved.
Show resolved Hide resolved
return new Redis(connection!);
}
}
10 changes: 5 additions & 5 deletions lib/requests-map.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
export class RequestsMap<T, S> {
private map = {};
export class RequestsMap<T extends string | number | symbol, S> {
private map: Record<T, S> = {} as Record<T, S>;

constructor() {}

public addEntry(requestId, handler) {
public addEntry(requestId: T, handler: S) {
this.map[requestId] = handler;
return true;
}

public getEntry(requestId) {
public getEntry(requestId: T) {
return this.map[requestId];
}

public removeEntry(requestId) {
public removeEntry(requestId: T) {
delete this.map[requestId];
return true;
}
Expand Down
4 changes: 2 additions & 2 deletions lib/streams.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export async function serialize(
return stringifiedResponse;
} catch (error) {
logger.error(error);
return null;
return [];
}
}

Expand All @@ -61,7 +61,7 @@ export async function parseJson(data: string): Promise<any> {
export function parseRawMessage(rawMessage: any): any {
let payload = rawMessage[1];

let obj = {};
let obj: {[key: string]: any} = {};

for (let i = 0; i < payload.length; i += 2) {
obj[payload[i]] = payload[i + 1];
Expand Down
61 changes: 55 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading