diff --git a/lib/amqp-client.provider.ts b/lib/amqp-client.provider.ts deleted file mode 100644 index 14d8198..0000000 --- a/lib/amqp-client.provider.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { Logger, Provider } from '@nestjs/common' -import amqpConnectionManager from 'amqp-connection-manager' - -import { AMQP_CLIENT, AMQP_MODULE_OPTIONS } from './amqp.constants' -import { AMQPModuleOptions, ClientTuple } from './amqp.interface' - -export interface AMQPClient { - defaultKey: string - clients: Map - clientOptions: Map - size: number -} - -export const createClient = (): Provider => ({ - provide: AMQP_CLIENT, - useFactory: async (options: AMQPModuleOptions): Promise => { - const logger = new Logger('AMQPModule') - - const clients = new Map() - const clientOptions = new Map() - - let defaultKey = 'default' - - if (options.name && options.name.length !== 0) { - defaultKey = options.name - } - - const connection = amqpConnectionManager.connect(options) - - connection.on('connect', () => { - logger.log(`Connected to RabbitMQ broker.`) - }) - - connection.on('disconnect', ({ err }) => { - logger.error(`Lost connection to RabbitMQ broker.\n${err.stack}`) - }) - - const channel = connection.createChannel() - - clients.set(defaultKey, { - channel, - connection, - }) - clientOptions.set(defaultKey, options) - - return { - defaultKey, - clients, - clientOptions, - size: clients.size, - } - }, - inject: [AMQP_MODULE_OPTIONS], -}) diff --git a/lib/amqp-core.module.ts b/lib/amqp-core.module.ts index 9f176a7..02669f8 100644 --- a/lib/amqp-core.module.ts +++ b/lib/amqp-core.module.ts @@ -5,56 +5,205 @@ import { Inject, OnApplicationShutdown, Logger, + Provider, + Type, } from '@nestjs/common' -import { ModuleRef } from '@nestjs/core' -import { AMQPClient } from './amqp-client.provider' -import { createClient } from './amqp-client.provider' -import { AMQP_CLIENT, AMQP_MODULE_OPTIONS } from './amqp.constants' -import { AMQPModuleOptions, ClientTuple } from './amqp.interface' -import { AMQPService } from './amqp.service' +import { DiscoveryModule, ModuleRef } from '@nestjs/core' +import { Channel, Options } from 'amqplib' +import amqpConnectionManager, { AmqpConnectionManager } from 'amqp-connection-manager' + +import { getAMQPChannelToken, getAMQPConnectionToken } from './amqp.utils' +import { AMQPModuleOptions, AMQPModuleAsyncOptions, AMQPOptionsFactory } from './amqp.interface' +import { AMQP_CONNECTION_NAME, AMQP_MODULE_OPTIONS } from './amqp.constants' +import { AMQPExplorer } from './amqp.explorer' +import { AMQPMetadataAccessor } from './amqp-metadata.accessor' @Global() @Module({ - providers: [AMQPService], - exports: [AMQPService], + imports: [DiscoveryModule], + providers: [AMQPExplorer, AMQPMetadataAccessor], }) export class AMQPCoreModule implements OnApplicationShutdown { constructor( - @Inject(AMQP_MODULE_OPTIONS) - private readonly options: AMQPModuleOptions, + @Inject(AMQP_CONNECTION_NAME) + private readonly connectionName: string, private readonly moduleRef: ModuleRef, ) {} - static register(options: AMQPModuleOptions | AMQPModuleOptions[]): DynamicModule { + static forRoot(connection: string | Options.Connect, options: AMQPModuleOptions): DynamicModule { + const logger = new Logger('AMQPModule') + + const amqpConnectionName = getAMQPConnectionToken(options.name) + + const AMQPConnectionProvider: Provider = { + provide: amqpConnectionName, + useFactory: (): AmqpConnectionManager => { + const amqpConnection = amqpConnectionManager.connect(connection) + + amqpConnection.on('connect', () => { + logger.log(`Connected to RabbitMQ broker.`) + }) + + amqpConnection.on('disconnect', ({ err }) => { + logger.error(`Lost connection to RabbitMQ broker.\n${err.stack}`) + console.log('lost', options.name) + }) + + return amqpConnection + }, + } + + const AMQPChannelProvider: Provider = { + provide: getAMQPChannelToken(options.name), + useFactory: async (connection: AmqpConnectionManager) => { + const channel = connection.createChannel() + + channel.addSetup((channel: Channel) => { + if (options.exchange && options.exchange.assert && options.exchange.type) { + channel.assertExchange(options.exchange.name, options.exchange.type) + } else if (options.exchange && options.exchange.assert && !options.exchange.type) { + throw new Error("Can't assert an exchange without specifying the type") + } + }) + + await channel.waitForConnect() + return channel + }, + inject: [amqpConnectionName], + } + + const AMQPConfigProvider: Provider = { + provide: AMQP_MODULE_OPTIONS, + useValue: options, + } + + const AMQPConnectionNameProvider: Provider = { + provide: AMQP_CONNECTION_NAME, + useValue: amqpConnectionName, + } + return { module: AMQPCoreModule, - providers: [createClient(), { provide: AMQP_MODULE_OPTIONS, useValue: options }], - exports: [AMQPService], + providers: [ + AMQPConnectionProvider, + AMQPChannelProvider, + AMQPConfigProvider, + AMQPConnectionNameProvider, + ], + exports: [AMQPChannelProvider], + imports: [], } } - onApplicationShutdown(): void { + static forRootAsync(options: AMQPModuleAsyncOptions): DynamicModule { const logger = new Logger('AMQPModule') - const closeConnection = - ({ clients, defaultKey }) => - (options) => { - const connectionName = options.name || defaultKey - const client: ClientTuple = clients.get(connectionName) + const amqpConnectionName = getAMQPConnectionToken(options.name) + + const AMQPConnectionProvider: Provider = { + provide: amqpConnectionName, + useFactory: async (amqpModuleOptions: AMQPModuleOptions): Promise => { + const amqpConnection = amqpConnectionManager.connect(amqpModuleOptions.uri) + + amqpConnection.on('connect', () => { + logger.log(`Connected to RabbitMQ broker.`) + }) + + amqpConnection.on('disconnect', ({ err }) => { + logger.error(`Lost connection to RabbitMQ broker.\n${err.stack}`) + }) + + return amqpConnection + }, + inject: [AMQP_MODULE_OPTIONS], + } + + const AMQPChannelProvider: Provider = { + provide: getAMQPChannelToken(options.name), + useFactory: async ( + connection: AmqpConnectionManager, + amqpModuleOptions: AMQPModuleOptions, + ) => { + const channel = connection.createChannel() + + channel.addSetup((channel: Channel) => { + if ( + amqpModuleOptions.exchange && + amqpModuleOptions.exchange.assert && + amqpModuleOptions.exchange.type + ) { + channel.assertExchange(amqpModuleOptions.exchange.name, amqpModuleOptions.exchange.type) + } else if ( + amqpModuleOptions.exchange && + amqpModuleOptions.exchange.assert && + !amqpModuleOptions.exchange.type + ) { + throw new Error("Can't assert an exchange without specifying the type") + } + }) - if (client.connection) { - logger.log('Disconnected from RabbitMQ broker') - client.connection.close() - } + await channel.waitForConnect() + return channel + }, + inject: [amqpConnectionName, AMQP_MODULE_OPTIONS], + } + + const AMQPConnectionNameProvider: Provider = { + provide: AMQP_CONNECTION_NAME, + useValue: amqpConnectionName, + } + + const asyncProviders = this.createAsyncProviders(options) + + return { + module: AMQPCoreModule, + imports: options.imports, + providers: [ + ...asyncProviders, + AMQPConnectionProvider, + AMQPChannelProvider, + AMQPConnectionNameProvider, + ], + exports: [AMQPChannelProvider], + } + } + + async onApplicationShutdown() { + const connection = this.moduleRef.get(this.connectionName) + + connection && (await connection.close()) + } + + private static createAsyncProviders(options: AMQPModuleAsyncOptions): Provider[] { + if (options.useExisting || options.useFactory) { + return [this.createAsyncOptionsProvider(options)] + } + const useClass = options.useClass as Type + return [ + this.createAsyncOptionsProvider(options), + { + provide: useClass, + useClass, + }, + ] + } + + private static createAsyncOptionsProvider(options: AMQPModuleAsyncOptions): Provider { + if (options.useFactory) { + return { + provide: AMQP_MODULE_OPTIONS, + useFactory: options.useFactory, + inject: options.inject || [], } + } - const amqpClient = this.moduleRef.get(AMQP_CLIENT) - const closeClientConnection = closeConnection(amqpClient) + const inject = [(options.useClass || options.useExisting) as Type] - if (Array.isArray(this.options)) { - this.options.forEach(closeClientConnection) - } else { - closeClientConnection(this.options) + return { + provide: AMQP_MODULE_OPTIONS, + useFactory: async (optionsFactory: AMQPOptionsFactory) => + await optionsFactory.createAMQPOptions(), + inject, } } } diff --git a/lib/amqp-metadata.accessor.ts b/lib/amqp-metadata.accessor.ts index fde7ceb..99a1436 100644 --- a/lib/amqp-metadata.accessor.ts +++ b/lib/amqp-metadata.accessor.ts @@ -1,8 +1,9 @@ import { Injectable, Type } from '@nestjs/common' import { Controller } from '@nestjs/common/interfaces' import { Reflector } from '@nestjs/core' + import { AMQP_QUEUE_CONSUMER, AMQP_CONTROLLER } from './amqp.constants' -import { AMQPMetadataConfiguration, ControllerMetadata } from './amqp.interface' +import { AMQPHandlerMetadata, ControllerMetadata } from './amqp.interface' @Injectable() export class AMQPMetadataAccessor { @@ -23,7 +24,7 @@ export class AMQPMetadataAccessor { instancePrototype: Controller, methodKey: string, controllerMetadata: ControllerMetadata, - ): AMQPMetadataConfiguration { + ): AMQPHandlerMetadata { const targetCallback = instancePrototype[methodKey] const metadata = Reflect.getMetadata(AMQP_QUEUE_CONSUMER, targetCallback) diff --git a/lib/amqp.constants.ts b/lib/amqp.constants.ts index c6bbf0e..06a95bf 100644 --- a/lib/amqp.constants.ts +++ b/lib/amqp.constants.ts @@ -1,4 +1,8 @@ +export const DEFAULT_AMQP_CONNECTION = 'default-AMQP_CONNECTION' +export const DEFAULT_AMQP_CHANNEL = 'default-AMQP_CHANNEL' +export const DEFAULT_AMQP_CONFIG = 'default-AMQP_CONFIG' +// TODO: Delete AMQP_MODULE_OPTIONS export const AMQP_MODULE_OPTIONS = 'AMQP_MODULE_OPTIONS' -export const AMQP_CLIENT = 'AMQP_CLIENT' export const AMQP_QUEUE_CONSUMER = '__amqp_module_queue_consumer' export const AMQP_CONTROLLER = '__amqp_module_controller' +export const AMQP_CONNECTION_NAME = 'AMQP_CONNECTION_NAME' diff --git a/lib/amqp.explorer.ts b/lib/amqp.explorer.ts index 108bad3..055015e 100644 --- a/lib/amqp.explorer.ts +++ b/lib/amqp.explorer.ts @@ -1,19 +1,32 @@ -import { Injectable } from '@nestjs/common' +import { Inject, Injectable, Logger, OnModuleInit, Options } from '@nestjs/common' import { MetadataScanner } from '@nestjs/core/metadata-scanner' -import { DiscoveryService } from '@nestjs/core' +import { DiscoveryService, ModuleRef } from '@nestjs/core' import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper' -import { AMQPMetadataConfiguration } from './amqp.interface' +import { ChannelWrapper } from 'amqp-connection-manager' + import { AMQPMetadataAccessor } from './amqp-metadata.accessor' +import { getAMQPChannelToken } from './amqp.utils' +import { AMQPHandlerMetadata, AMQPModuleOptions } from './amqp.interface' +import { AMQP_MODULE_OPTIONS } from './amqp.constants' @Injectable() -export class AMQPExplorer { +export class AMQPExplorer implements OnModuleInit { + private readonly logger = new Logger('AMQPModule') + constructor( + /* @Inject(AMQP_MODULE_OPTIONS) + private readonly amqpModuleOptions: AMQPModuleOptions, */ + private readonly moduleRef: ModuleRef, private readonly metadataScanner: MetadataScanner, private readonly discoveryService: DiscoveryService, private readonly metadataAccessor: AMQPMetadataAccessor, ) {} - explore(): AMQPMetadataConfiguration[] { + onModuleInit() { + this.explore() + } + + explore(): void { const controllers: InstanceWrapper[] = this.discoveryService .getControllers() .filter((wrapper: InstanceWrapper) => @@ -21,10 +34,10 @@ export class AMQPExplorer { ) if (!controllers) { - return [] + return } - return controllers + const handlers = controllers .map((wrapper: InstanceWrapper) => { const { instance, metatype } = wrapper @@ -46,5 +59,55 @@ export class AMQPExplorer { return prev.concat(curr) }, []) .filter((handler) => handler.queueName) + + /* handlers.forEach((handler: AMQPHandlerMetadata) => { + const channelToken = getAMQPChannelToken(handler.connectionName) + const channel = this.getChannel(channelToken) + + let serviceName = '' + if (this.amqpModuleOptions.serviceName) { + serviceName = `-${this.amqpModuleOptions.serviceName}` + } + + // This is something we want to do on every (re)connection + channel.addSetup(() => { + if (this.amqpModuleOptions.assertQueues === true) { + // TODO: Assert with options like durable + channel.assertQueue(`${handler.queueName}${serviceName}`) + this.logger.log(`Asserted queue: ${handler.queueName}`) + } + + channel.bindQueue( + `${handler.queueName}${serviceName}`, + this.amqpModuleOptions?.exchange?.name || '', + `${handler.queueName}`, + ) + }) + + channel.consume(`${handler.queueName}${serviceName}`, async (msg) => { + const f = await handler.callback( + Buffer.isBuffer(msg?.content) ? msg?.content.toString() : msg?.content, + ) + + // if noAck, the broker won’t expect an acknowledgement of messages delivered to this consumer + if (!handler?.noAck && (await f) !== false && msg) { + channel.ack(msg) + } + }) + this.logger.log( + `Registered function handler ${handler.methodName.toString()} for queue ${ + handler.queueName + }`, + ) + }) */ + } + + getChannel(connectionToken: string) { + try { + return this.moduleRef.get(connectionToken, { strict: false }) + } catch (err) { + this.logger.error(`No Channel found for connection "${connectionToken}"`) + throw err + } } } diff --git a/lib/amqp.interface.ts b/lib/amqp.interface.ts index 49f26c7..1ab6f9a 100644 --- a/lib/amqp.interface.ts +++ b/lib/amqp.interface.ts @@ -1,7 +1,8 @@ -import { ChannelWrapper, AmqpConnectionManager } from 'amqp-connection-manager' +import { ModuleMetadata, Type } from '@nestjs/common' import { Options } from 'amqplib' -export interface AMQPModuleOptions extends Partial { +export interface AMQPModuleOptions { + uri?: string name?: string assertQueues?: boolean exchange?: AMQPExchange @@ -14,12 +15,11 @@ export interface EventMetadata { callback: any } -export interface AMQPMetadataConfiguration extends Partial { +export interface AMQPHandlerMetadata extends Partial { + connectionName?: string queueName: string - target: any methodName: string | symbol - callback: any - prefix: string + callback: Function } interface AMQPExchange { @@ -32,7 +32,18 @@ export interface ControllerMetadata { patternPrefix: string } -export interface ClientTuple { - connection: AmqpConnectionManager - channel: ChannelWrapper +export interface ConsumerOptions extends Partial { + connectionName?: string +} + +export interface AMQPOptionsFactory { + createAMQPOptions(): Promise | AMQPModuleOptions +} + +export interface AMQPModuleAsyncOptions extends Pick { + name?: string + useExisting?: Type + useClass?: Type + useFactory?: (...args: any[]) => Promise | AMQPModuleOptions + inject?: any[] } diff --git a/lib/amqp.module.ts b/lib/amqp.module.ts index 7d02f16..bf14d82 100644 --- a/lib/amqp.module.ts +++ b/lib/amqp.module.ts @@ -1,91 +1,49 @@ -import { DynamicModule, Module, OnModuleInit } from '@nestjs/common' +import { DynamicModule, Module, Provider } from '@nestjs/common' +import { isFunction } from '@nestjs/common/utils/shared.utils' import { DiscoveryModule } from '@nestjs/core' -import { Logger } from '@nestjs/common/services/logger.service' +import { Options } from 'amqplib' import { AMQPCoreModule } from './amqp-core.module' -import { AMQPModuleOptions } from './amqp.interface' -import { AMQPService } from './amqp.service' -import { AMQPExplorer } from './amqp.explorer' import { AMQPMetadataAccessor } from './amqp-metadata.accessor' -import { isFunction } from '@nestjs/common/utils/shared.utils' -import { Channel } from 'amqplib' - -@Module({ - imports: [DiscoveryModule], - providers: [AMQPExplorer, AMQPMetadataAccessor], -}) -export class AMQPModule implements OnModuleInit { - private readonly logger = new Logger('AMQPModule') - - constructor(private readonly amqpService: AMQPService, private readonly explorer: AMQPExplorer) {} +import { AMQPExplorer } from './amqp.explorer' +import { AMQPModuleOptions } from './amqp.interface' +import { createChannelProvider } from './amqp.providers' +import { getAMQPConnectionToken } from './amqp.utils' + +@Module({}) +export class AMQPModule { + static forRoot(connection: string | Options.Connect, options: AMQPModuleOptions): DynamicModule { + const AMQPConfigProvider: Provider = { + provide: getAMQPConnectionToken, + useValue: options, + } - static forRoot(options: AMQPModuleOptions | AMQPModuleOptions[]): DynamicModule { return { + global: true, module: AMQPModule, - imports: [AMQPCoreModule.register(options)], + providers: [AMQPConfigProvider], + exports: [AMQPConfigProvider], } } - async onModuleInit(): Promise { - const { consumers, amqp, options } = { - consumers: this.explorer.explore(), - amqp: this.amqpService.getChannel(), - options: this.amqpService.getConnectionOptions(), + static registerHandlers(): DynamicModule { + return { + module: AMQPModule, + imports: [AMQPModule.registerCore()], + providers: [], } + } - await amqp.addSetup(async (channel: Channel) => { - if (options.exchange && options.exchange.assert && options.exchange.type) { - await channel.assertExchange(options.exchange.name, options.exchange.type) - } else if (options.exchange && options.exchange.assert && !options.exchange.type) { - throw new Error("Can't assert an exchange without specifying the type") - } - - for (const consumer of consumers) { - this.logger.log( - `Mapped function ${consumer.methodName.toString()} with queue ${consumer.queueName}`, - ) - - let serviceName = '' - if (options.serviceName) { - serviceName = `-${options.serviceName}` - } - - if (options.assertQueues === true) { - await channel.assertQueue(`${consumer.queueName}${serviceName}`) - } - - /** - * bind queue to defined exchange in options, else bind to default exchange ('') - * - * The default exchange is a direct exchange with no name (empty string) pre-declared by the broker - * https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-default - */ - await channel.bindQueue( - `${consumer.queueName}${serviceName}`, - options?.exchange?.name || '', - `${consumer.queueName}`, - ) - - await channel.consume( - `${consumer.queueName}${serviceName}`, - async (msg) => { - const f = this.transformToResult( - await consumer.callback( - Buffer.isBuffer(msg?.content) ? msg?.content.toString() : msg?.content, - ), - ) - - // if noAck, the broker won’t expect an acknowledgement of messages delivered to this consumer - if (!consumer?.noAck && (await f) !== false && msg) { - channel.ack(msg) - } - }, - consumer, - ) - } - }) + private static registerCore() { + return { + global: true, + module: AMQPModule, + imports: [DiscoveryModule], + providers: [AMQPExplorer, AMQPMetadataAccessor], + } } + // TODO: Put this in the right place. I think its on the explorer now. private async transformToResult(resultOrDeferred: any) { if (resultOrDeferred && isFunction(resultOrDeferred.subscribe)) { return resultOrDeferred.toPromise() diff --git a/lib/amqp.providers.ts b/lib/amqp.providers.ts new file mode 100644 index 0000000..2b28f8d --- /dev/null +++ b/lib/amqp.providers.ts @@ -0,0 +1,31 @@ +import { Logger, Provider } from '@nestjs/common' +import { Options } from 'amqplib' +import * as amqpConnectionManager from 'amqp-connection-manager' +import { AMQPModuleOptions } from '../dist/amqp.interface' +import { getAMQPConnectionToken, getAMQPOptionsToken } from './amqp.utils' + +const logger = new Logger('AMQPModule') + +export function createChannelProvider( + connection: string | Options.Connect, + options: AMQPModuleOptions, +): Provider { + return { + provide: getAMQPConnectionToken(options.name), + useFactory: (o: AMQPModuleOptions) => { + const amqpConnection = amqpConnectionManager.connect(connection) + + amqpConnection.on('connect', () => { + logger.log(`Connected to RabbitMQ broker.`) + }) + + amqpConnection.on('disconnect', ({ err }) => { + logger.error(`Lost connection to RabbitMQ broker.\n${err.stack}`) + console.log('lost', options.name) + }) + + return amqpConnection + }, + inject: [getAMQPOptionsToken(options.name)], + } +} diff --git a/lib/amqp.service.ts b/lib/amqp.service.ts deleted file mode 100644 index 0ca1b06..0000000 --- a/lib/amqp.service.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { Injectable, Inject } from '@nestjs/common' -import { AMQP_CLIENT } from './amqp.constants' -import { ChannelWrapper } from 'amqp-connection-manager' -import { AMQPClient } from './amqp-client.provider' -import { AMQPModuleOptions } from './amqp.interface' -import { Connection } from 'amqplib' - -@Injectable() -export class AMQPService { - constructor( - @Inject(AMQP_CLIENT) - private readonly amqpClient: AMQPClient, - ) {} - - getChannel(connectionName?: string): ChannelWrapper { - if (!connectionName) { - connectionName = this.amqpClient.defaultKey - } - - if (!this.amqpClient.clients.has(connectionName)) { - throw new Error(`client ${connectionName} does not exist`) - } - - const connectionTuple = this.amqpClient.clients.get(connectionName) - - if (!connectionTuple) { - throw new Error(`Connection ${connectionName} does not exist`) - } - - return connectionTuple.channel - } - - getConnection(connectionName?: string): Connection { - if (!connectionName) { - connectionName = this.amqpClient.defaultKey - } - - if (!this.amqpClient.clients.has(connectionName)) { - throw new Error(`client ${connectionName} does not exist`) - } - - const connectionTuple = this.amqpClient.clients.get(connectionName) - - if (!connectionTuple || !connectionTuple.connection || !connectionTuple.connection.connection) { - throw new Error(`Connection ${connectionName} does not exist`) - } - - return connectionTuple.connection.connection - } - - getConnectionOptions(connectionName?: string): AMQPModuleOptions { - if (!connectionName) { - connectionName = this.amqpClient.defaultKey - } - - if (!this.amqpClient.clients.has(connectionName)) { - throw new Error(`client ${connectionName} does not exist`) - } - - const channel = this.amqpClient.clientOptions.get(connectionName) - - if (!channel) { - throw new Error(`channel ${connectionName} does not exist`) - } - - return channel - } -} diff --git a/lib/amqp.utils.ts b/lib/amqp.utils.ts new file mode 100644 index 0000000..208da1a --- /dev/null +++ b/lib/amqp.utils.ts @@ -0,0 +1,17 @@ +import { + DEFAULT_AMQP_CONNECTION, + DEFAULT_AMQP_CHANNEL, + DEFAULT_AMQP_CONFIG, +} from './amqp.constants' + +export function getAMQPConnectionToken(connectionName?: string) { + return connectionName ? `${connectionName}-AMQP_CONNECTION` : DEFAULT_AMQP_CONNECTION +} + +export function getAMQPChannelToken(connectionName?: string) { + return connectionName ? `${connectionName}-AMQP_CHANNEL` : DEFAULT_AMQP_CHANNEL +} + +export function getAMQPOptionsToken(connectionName?: string) { + return connectionName ? `${connectionName}-AMQP_CONFIG` : DEFAULT_AMQP_CONFIG +} diff --git a/lib/decorators/consume.decorator.ts b/lib/decorators/consume.decorator.ts index d335b3f..15499b4 100644 --- a/lib/decorators/consume.decorator.ts +++ b/lib/decorators/consume.decorator.ts @@ -1,19 +1,11 @@ import { SetMetadata } from '@nestjs/common' -import { Options } from 'amqplib' -import { AMQP_QUEUE_CONSUMER } from '../amqp.constants' -interface ConsumerOptions extends Partial { - queueName: string -} +import { AMQP_QUEUE_CONSUMER } from '../amqp.constants' +import { ConsumerOptions } from '../amqp.interface' -export const Consume = (queueNameOrOptions: string | ConsumerOptions): MethodDecorator => { +export const Consume = (queueName: string, options?: ConsumerOptions): MethodDecorator => { return (target: object, propertyKey: string | symbol, descriptor: PropertyDescriptor) => { - const options = - typeof queueNameOrOptions === 'string' - ? { queueName: queueNameOrOptions } - : queueNameOrOptions - - SetMetadata(AMQP_QUEUE_CONSUMER, { ...options, methodName: propertyKey })( + SetMetadata(AMQP_QUEUE_CONSUMER, { ...options, queueName, methodName: propertyKey })( target, propertyKey, descriptor, diff --git a/lib/decorators/consumer.decorator.ts b/lib/decorators/consumer.decorator.ts index 4a50cbd..8a92ea8 100644 --- a/lib/decorators/consumer.decorator.ts +++ b/lib/decorators/consumer.decorator.ts @@ -1,4 +1,5 @@ import { SetMetadata } from '@nestjs/common' + import { AMQP_CONTROLLER } from '../amqp.constants' export const Consumer = (patternPrefix?: string): ClassDecorator => { diff --git a/lib/decorators/index.ts b/lib/decorators/index.ts index dc8cd5a..e703bb4 100644 --- a/lib/decorators/index.ts +++ b/lib/decorators/index.ts @@ -1,2 +1,3 @@ export * from './consume.decorator' export * from './consumer.decorator' +export * from './inject-channel.decorator' diff --git a/lib/decorators/inject-channel.decorator.ts b/lib/decorators/inject-channel.decorator.ts new file mode 100644 index 0000000..3c3f771 --- /dev/null +++ b/lib/decorators/inject-channel.decorator.ts @@ -0,0 +1,6 @@ +import { Inject } from '@nestjs/common' + +import { getAMQPChannelToken } from '../amqp.utils' + +export const InjectAMQPChannel = (connectionName?: string): ParameterDecorator => + Inject(getAMQPChannelToken(connectionName)) diff --git a/lib/index.ts b/lib/index.ts index f1d23ea..1546292 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,3 +1,2 @@ export * from './amqp.module' -export * from './amqp.service' export * from './decorators' diff --git a/tests/src/app.module.ts b/tests/src/app.module.ts index ec750ee..e1db791 100644 --- a/tests/src/app.module.ts +++ b/tests/src/app.module.ts @@ -4,16 +4,20 @@ import { JobsModule } from './jobs/jobs.module' @Module({ imports: [ - AMQPModule.forRoot({ - hostname: 'localhost', - port: 5672, - assertQueues: true, - exchange: { - assert: true, - type: 'topic', - name: 'test_exchange', + AMQPModule.forRoot( + { + hostname: 'localhost', + port: 5672, }, - }), + { + assertQueues: true, + exchange: { + assert: true, + type: 'topic', + name: 'test_exchange', + }, + }, + ), JobsModule, ], }) diff --git a/tests/src/jobs/jobs.controller.ts b/tests/src/jobs/jobs.controller.ts index 227030e..bc0a41e 100644 --- a/tests/src/jobs/jobs.controller.ts +++ b/tests/src/jobs/jobs.controller.ts @@ -1,6 +1,5 @@ import { Controller } from '@nestjs/common' import { Consume, Consumer } from '../../../lib' -import { JobsService } from './jobs.service' @Controller() @Consumer() diff --git a/tests/src/jobs/jobs.service.ts b/tests/src/jobs/jobs.service.ts index 3433ed2..559bf8c 100644 --- a/tests/src/jobs/jobs.service.ts +++ b/tests/src/jobs/jobs.service.ts @@ -1,13 +1,15 @@ import { Injectable } from '@nestjs/common' -import { AMQPService } from '../../../lib' +import { Channel } from 'amqplib' +import { InjectAMQPChannel } from '../../../lib' @Injectable() export class JobsService { - constructor(private amqpService: AMQPService) {} + constructor( + @InjectAMQPChannel() + private amqpChannel: Channel, + ) {} async publishMessage(exchange: string, routingKey: string, message: string): Promise { - const ch = this.amqpService.getChannel() - - return ch.publish(exchange, routingKey, Buffer.from(message)) + return this.amqpChannel.publish(exchange, routingKey, Buffer.from(message)) } }