Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Entire NestJS Module Revamp + Handling multiple connections #18

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 0 additions & 54 deletions lib/amqp-client.provider.ts

This file was deleted.

207 changes: 178 additions & 29 deletions lib/amqp-core.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,205 @@ import {
Inject,
OnApplicationShutdown,
Logger,
Provider,
Type,
} from '@nestjs/common'
import { ModuleRef } from '@nestjs/core'
import { AMQPClient } from './amqp-client.provider'
import { createClient } from './amqp-client.provider'
import { AMQP_CLIENT, AMQP_MODULE_OPTIONS } from './amqp.constants'
import { AMQPModuleOptions, ClientTuple } from './amqp.interface'
import { AMQPService } from './amqp.service'
import { DiscoveryModule, ModuleRef } from '@nestjs/core'
import { Channel, Options } from 'amqplib'
import amqpConnectionManager, { AmqpConnectionManager } from 'amqp-connection-manager'

import { getAMQPChannelToken, getAMQPConnectionToken } from './amqp.utils'
import { AMQPModuleOptions, AMQPModuleAsyncOptions, AMQPOptionsFactory } from './amqp.interface'
import { AMQP_CONNECTION_NAME, AMQP_MODULE_OPTIONS } from './amqp.constants'
import { AMQPExplorer } from './amqp.explorer'
import { AMQPMetadataAccessor } from './amqp-metadata.accessor'

@Global()
@Module({
providers: [AMQPService],
exports: [AMQPService],
imports: [DiscoveryModule],
providers: [AMQPExplorer, AMQPMetadataAccessor],
})
export class AMQPCoreModule implements OnApplicationShutdown {
constructor(
@Inject(AMQP_MODULE_OPTIONS)
private readonly options: AMQPModuleOptions,
@Inject(AMQP_CONNECTION_NAME)
private readonly connectionName: string,
private readonly moduleRef: ModuleRef,
) {}

static register(options: AMQPModuleOptions | AMQPModuleOptions[]): DynamicModule {
static forRoot(connection: string | Options.Connect, options: AMQPModuleOptions): DynamicModule {
const logger = new Logger('AMQPModule')

const amqpConnectionName = getAMQPConnectionToken(options.name)

const AMQPConnectionProvider: Provider = {
provide: amqpConnectionName,
useFactory: (): AmqpConnectionManager => {
const amqpConnection = amqpConnectionManager.connect(connection)

amqpConnection.on('connect', () => {
logger.log(`Connected to RabbitMQ broker.`)
})

amqpConnection.on('disconnect', ({ err }) => {
logger.error(`Lost connection to RabbitMQ broker.\n${err.stack}`)
console.log('lost', options.name)
})

return amqpConnection
},
}

const AMQPChannelProvider: Provider = {
provide: getAMQPChannelToken(options.name),
useFactory: async (connection: AmqpConnectionManager) => {
const channel = connection.createChannel()

channel.addSetup((channel: Channel) => {
if (options.exchange && options.exchange.assert && options.exchange.type) {
channel.assertExchange(options.exchange.name, options.exchange.type)
} else if (options.exchange && options.exchange.assert && !options.exchange.type) {
throw new Error("Can't assert an exchange without specifying the type")
}
})

await channel.waitForConnect()
return channel
},
inject: [amqpConnectionName],
}

const AMQPConfigProvider: Provider = {
provide: AMQP_MODULE_OPTIONS,
useValue: options,
}

const AMQPConnectionNameProvider: Provider = {
provide: AMQP_CONNECTION_NAME,
useValue: amqpConnectionName,
}

return {
module: AMQPCoreModule,
providers: [createClient(), { provide: AMQP_MODULE_OPTIONS, useValue: options }],
exports: [AMQPService],
providers: [
AMQPConnectionProvider,
AMQPChannelProvider,
AMQPConfigProvider,
AMQPConnectionNameProvider,
],
exports: [AMQPChannelProvider],
imports: [],
}
}

onApplicationShutdown(): void {
static forRootAsync(options: AMQPModuleAsyncOptions): DynamicModule {
const logger = new Logger('AMQPModule')

const closeConnection =
({ clients, defaultKey }) =>
(options) => {
const connectionName = options.name || defaultKey
const client: ClientTuple = clients.get(connectionName)
const amqpConnectionName = getAMQPConnectionToken(options.name)

const AMQPConnectionProvider: Provider = {
provide: amqpConnectionName,
useFactory: async (amqpModuleOptions: AMQPModuleOptions): Promise<any> => {
const amqpConnection = amqpConnectionManager.connect(amqpModuleOptions.uri)

amqpConnection.on('connect', () => {
logger.log(`Connected to RabbitMQ broker.`)
})

amqpConnection.on('disconnect', ({ err }) => {
logger.error(`Lost connection to RabbitMQ broker.\n${err.stack}`)
})

return amqpConnection
},
inject: [AMQP_MODULE_OPTIONS],
}

const AMQPChannelProvider: Provider = {
provide: getAMQPChannelToken(options.name),
useFactory: async (
connection: AmqpConnectionManager,
amqpModuleOptions: AMQPModuleOptions,
) => {
const channel = connection.createChannel()

channel.addSetup((channel: Channel) => {
if (
amqpModuleOptions.exchange &&
amqpModuleOptions.exchange.assert &&
amqpModuleOptions.exchange.type
) {
channel.assertExchange(amqpModuleOptions.exchange.name, amqpModuleOptions.exchange.type)
} else if (
amqpModuleOptions.exchange &&
amqpModuleOptions.exchange.assert &&
!amqpModuleOptions.exchange.type
) {
throw new Error("Can't assert an exchange without specifying the type")
}
})

if (client.connection) {
logger.log('Disconnected from RabbitMQ broker')
client.connection.close()
}
await channel.waitForConnect()
return channel
},
inject: [amqpConnectionName, AMQP_MODULE_OPTIONS],
}

const AMQPConnectionNameProvider: Provider = {
provide: AMQP_CONNECTION_NAME,
useValue: amqpConnectionName,
}

const asyncProviders = this.createAsyncProviders(options)

return {
module: AMQPCoreModule,
imports: options.imports,
providers: [
...asyncProviders,
AMQPConnectionProvider,
AMQPChannelProvider,
AMQPConnectionNameProvider,
],
exports: [AMQPChannelProvider],
}
}

async onApplicationShutdown() {
const connection = this.moduleRef.get<AmqpConnectionManager>(this.connectionName)

connection && (await connection.close())
}

private static createAsyncProviders(options: AMQPModuleAsyncOptions): Provider[] {
if (options.useExisting || options.useFactory) {
return [this.createAsyncOptionsProvider(options)]
}
const useClass = options.useClass as Type<AMQPOptionsFactory>
return [
this.createAsyncOptionsProvider(options),
{
provide: useClass,
useClass,
},
]
}

private static createAsyncOptionsProvider(options: AMQPModuleAsyncOptions): Provider {
if (options.useFactory) {
return {
provide: AMQP_MODULE_OPTIONS,
useFactory: options.useFactory,
inject: options.inject || [],
}
}

const amqpClient = this.moduleRef.get<AMQPClient>(AMQP_CLIENT)
const closeClientConnection = closeConnection(amqpClient)
const inject = [(options.useClass || options.useExisting) as Type<AMQPOptionsFactory>]

if (Array.isArray(this.options)) {
this.options.forEach(closeClientConnection)
} else {
closeClientConnection(this.options)
return {
provide: AMQP_MODULE_OPTIONS,
useFactory: async (optionsFactory: AMQPOptionsFactory) =>
await optionsFactory.createAMQPOptions(),
inject,
}
}
}
5 changes: 3 additions & 2 deletions lib/amqp-metadata.accessor.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Injectable, Type } from '@nestjs/common'
import { Controller } from '@nestjs/common/interfaces'
import { Reflector } from '@nestjs/core'

import { AMQP_QUEUE_CONSUMER, AMQP_CONTROLLER } from './amqp.constants'
import { AMQPMetadataConfiguration, ControllerMetadata } from './amqp.interface'
import { AMQPHandlerMetadata, ControllerMetadata } from './amqp.interface'

@Injectable()
export class AMQPMetadataAccessor {
Expand All @@ -23,7 +24,7 @@ export class AMQPMetadataAccessor {
instancePrototype: Controller,
methodKey: string,
controllerMetadata: ControllerMetadata,
): AMQPMetadataConfiguration {
): AMQPHandlerMetadata {
const targetCallback = instancePrototype[methodKey]

const metadata = Reflect.getMetadata(AMQP_QUEUE_CONSUMER, targetCallback)
Expand Down
6 changes: 5 additions & 1 deletion lib/amqp.constants.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
export const DEFAULT_AMQP_CONNECTION = 'default-AMQP_CONNECTION'
export const DEFAULT_AMQP_CHANNEL = 'default-AMQP_CHANNEL'
export const DEFAULT_AMQP_CONFIG = 'default-AMQP_CONFIG'
// TODO: Delete AMQP_MODULE_OPTIONS
export const AMQP_MODULE_OPTIONS = 'AMQP_MODULE_OPTIONS'
export const AMQP_CLIENT = 'AMQP_CLIENT'
export const AMQP_QUEUE_CONSUMER = '__amqp_module_queue_consumer'
export const AMQP_CONTROLLER = '__amqp_module_controller'
export const AMQP_CONNECTION_NAME = 'AMQP_CONNECTION_NAME'
Loading