diff --git a/README.md b/README.md index b0950d1..34b5508 100644 --- a/README.md +++ b/README.md @@ -164,6 +164,7 @@ export class StartConsumer implements IConsumer { A Producer is a loopback service for producing message for a particular topic, you can inject a producer using the `@producer(TOPIC_NAME)` decorator. Note: The topic name passed to decorator must be first configured in the Component configuration's topic property - +If you want to produce a raw message without any event type, you can use the `@genericProducer(TOPIC_NAME)` decorator, note that in this case, the topic name must be passed in the genericTopics property of the component configuration. #### Example diff --git a/src/__tests__/acceptance/application.test.ts b/src/__tests__/acceptance/application.test.ts index 0294b63..d19c944 100644 --- a/src/__tests__/acceptance/application.test.ts +++ b/src/__tests__/acceptance/application.test.ts @@ -13,6 +13,7 @@ import { setupConsumerApplication, setupProducerApplication, } from './test-helper'; +import {GenericProducerService} from './fixtures/producer/generic-producer.service'; describe('end-to-end', () => { let consumerApp: Application; @@ -93,6 +94,18 @@ describe('end-to-end', () => { ); }); + it('should produce from a generic producer without events for a single topic', async () => { + const producerService = producerApp.getSync( + `services.GenericProducerService`, + ); + const message = 'message'; + await producerService.produceMessage(message); + sinon.assert.called(genericHandler); + expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual({ + data: message, + }); + }); + it('should consume from a generic consumer without events for a single topic', async () => { const producerInstance = producerApp.getSync>( producerKey(Topics.Generic), diff --git a/src/__tests__/acceptance/fixtures/producer/generic-producer.service.ts b/src/__tests__/acceptance/fixtures/producer/generic-producer.service.ts new file mode 100644 index 0000000..26ccf52 --- /dev/null +++ b/src/__tests__/acceptance/fixtures/producer/generic-producer.service.ts @@ -0,0 +1,19 @@ +import {genericProducer} from '../../../../decorators/generic-producer.decorator'; +import {GenericProducer} from '../../../../types'; +import {GenericStream} from '../stream'; +import {Topics} from '../topics.enum'; + +export class GenericProducerService { + constructor( + @genericProducer(Topics.Generic) + private producer: GenericProducer, + ) {} + + async produceMessage(message: string): Promise { + await this.producer.send([ + { + data: message, + }, + ]); + } +} diff --git a/src/__tests__/acceptance/fixtures/producer/producer-app.ts b/src/__tests__/acceptance/fixtures/producer/producer-app.ts index 881ff58..5bfc55f 100644 --- a/src/__tests__/acceptance/fixtures/producer/producer-app.ts +++ b/src/__tests__/acceptance/fixtures/producer/producer-app.ts @@ -7,6 +7,7 @@ import {KafkaClientComponent} from '../../../../component'; import {KafkaClientBindings} from '../../../../keys'; import {KafkaClientStub} from '../../../stubs'; import {Topics} from '../topics.enum'; +import {GenericProducerService} from './generic-producer.service'; export class ProducerApp extends BootMixin( ServiceMixin(RepositoryMixin(RestApplication)), @@ -16,11 +17,13 @@ export class ProducerApp extends BootMixin( this.configure(KafkaClientBindings.Component).to({ topics: Object.values(Topics) as string[], + genericTopics: [Topics.Generic], }); this.bind(KafkaClientBindings.KafkaClient).to( options.client, ); this.component(KafkaClientComponent); + this.service(GenericProducerService); this.projectRoot = __dirname; // Customize @loopback/boot Booter Conventions here diff --git a/src/__tests__/acceptance/fixtures/stream.ts b/src/__tests__/acceptance/fixtures/stream.ts index ae3a68b..148844d 100644 --- a/src/__tests__/acceptance/fixtures/stream.ts +++ b/src/__tests__/acceptance/fixtures/stream.ts @@ -17,6 +17,6 @@ export interface TestStream extends IStreamDefinition { export interface GenericStream extends IStreamDefinition { topic: Topics.Generic; messages: { - [Events.close]: {}; + data: string; }; } diff --git a/src/component.ts b/src/component.ts index 02d875a..b75cdec 100644 --- a/src/component.ts +++ b/src/component.ts @@ -10,9 +10,12 @@ import { } from '@loopback/core'; import {LoggerExtensionComponent} from '@sourceloop/core'; import {Kafka} from 'kafkajs'; -import {KafkaClientBindings, producerKey} from './keys'; +import {genericProducerKey, KafkaClientBindings, producerKey} from './keys'; import {KafkaObserver} from './observers'; -import {KafkaProducerFactoryProvider} from './providers'; +import { + GenericKafkaProducerFactoryProvider, + KafkaProducerFactoryProvider, +} from './providers'; import {KafkaConsumerService} from './services/kafka-consumer.service'; import {KafkaClientOptions} from './types'; @@ -39,6 +42,11 @@ export class KafkaClientComponent implements Component { .toProvider(KafkaProducerFactoryProvider) .inScope(BindingScope.SINGLETON); + app + .bind(KafkaClientBindings.GenericProducerFactor) + .toProvider(GenericKafkaProducerFactoryProvider) + .inScope(BindingScope.SINGLETON); + app.service(KafkaConsumerService); if (configuration?.topics) { @@ -50,6 +58,18 @@ export class KafkaClientComponent implements Component { .inScope(BindingScope.SINGLETON); }); } + + if (configuration?.genericTopics) { + const genericProducerFactory = app.getSync( + KafkaClientBindings.GenericProducerFactor, + ); + configuration.genericTopics.forEach(topic => { + app + .bind(genericProducerKey(topic)) + .to(genericProducerFactory(topic)) + .inScope(BindingScope.SINGLETON); + }); + } if (configuration?.initObservers) { app.lifeCycleObserver(KafkaObserver); } diff --git a/src/decorators/generic-producer.decorator.ts b/src/decorators/generic-producer.decorator.ts new file mode 100644 index 0000000..abb596b --- /dev/null +++ b/src/decorators/generic-producer.decorator.ts @@ -0,0 +1,6 @@ +import {inject} from '@loopback/core'; +import {genericProducerKey} from '../keys'; + +export function genericProducer(topic: string) { + return inject(genericProducerKey(topic)); +} diff --git a/src/keys.ts b/src/keys.ts index a5c0be9..38d8f6c 100644 --- a/src/keys.ts +++ b/src/keys.ts @@ -4,6 +4,8 @@ import {KafkaClientComponent} from './component'; import {KafkaConsumerService} from './services/kafka-consumer.service'; import { ConsumerConfig, + GenericProducer, + GenericProducerFactoryType, IStreamDefinition, Producer, ProducerFactoryType, @@ -30,6 +32,9 @@ export namespace KafkaClientBindings { export const ProducerFactory = BindingKey.create< ProducerFactoryType >(`${KafkaNamespace}.ProducerFactory`); + export const GenericProducerFactor = BindingKey.create< + GenericProducerFactoryType + >(`${KafkaNamespace}.GenericProducerFactory`); export const LifeCycleGroup = `${KafkaNamespace}.KAFKA_OBSERVER_GROUP`; } @@ -38,6 +43,11 @@ export const producerKey = (topic: string) => `${KafkaNamespace}.producer.${topic}`, ); +export const genericProducerKey = (topic: string) => + BindingKey.create>( + `${KafkaNamespace}.generic.producer.${topic}`, + ); + export const eventHandlerKey = < Stream extends IStreamDefinition, K extends keyof Stream['messages'], diff --git a/src/providers/generic-kafka-producer-factory.provider.ts b/src/providers/generic-kafka-producer-factory.provider.ts new file mode 100644 index 0000000..fbb9955 --- /dev/null +++ b/src/providers/generic-kafka-producer-factory.provider.ts @@ -0,0 +1,49 @@ +import {inject, Provider} from '@loopback/core'; +import {ILogger, LOGGER} from '@sourceloop/core'; +import {CompressionTypes, Kafka, ProducerConfig} from 'kafkajs'; +import {KafkaErrorKeys} from '../error-keys'; +import {GenericProducerFactoryType, IStreamDefinition} from '../types'; +import {KafkaClientBindings} from '../keys'; + +/* The class `GenericKafkaProducerFactoryProvider` is a TypeScript class that provides a factory for creating +Kafka producers to send messages to specified topics without events. */ +export class GenericKafkaProducerFactoryProvider + implements Provider> +{ + constructor( + @inject(KafkaClientBindings.KafkaClient) + private client: Kafka, + @inject(LOGGER.LOGGER_INJECT) private readonly logger: ILogger, + @inject(KafkaClientBindings.ProducerConfiguration, {optional: true}) + private configuration?: ProducerConfig, + ) {} + + value(): GenericProducerFactoryType { + return (topic: string) => { + return { + send: async (payload: T['messages'][], key?: string): Promise => { + const producer = this.client.producer(this.configuration); + + try { + await producer.connect(); + await producer.send({ + topic: topic, + compression: CompressionTypes.GZIP, + messages: payload.map(message => ({ + key, + value: JSON.stringify(message), + })), + }); + await producer.disconnect(); + } catch (e) { + this.logger.error( + `${KafkaErrorKeys.PublishFailed}: ${JSON.stringify(e)}`, + ); + await producer.disconnect(); + throw e; + } + }, + }; + }; + } +} diff --git a/src/providers/index.ts b/src/providers/index.ts index 2ed770c..4f91654 100644 --- a/src/providers/index.ts +++ b/src/providers/index.ts @@ -1 +1,2 @@ export * from './kafka-producer-factory.provider'; +export * from './generic-kafka-producer-factory.provider'; diff --git a/src/types.ts b/src/types.ts index 47cffeb..9cdfced 100644 --- a/src/types.ts +++ b/src/types.ts @@ -4,6 +4,7 @@ export interface KafkaClientOptions { connection: KafkaConfig; topics?: string[]; initObservers?: boolean; + genericTopics?: string[]; } export type ConsumerConfig = { @@ -81,10 +82,18 @@ export interface Producer { ): Promise; } +export interface GenericProducer { + send(payload: Stream['messages'][], key?: string): Promise; +} + export type ProducerFactoryType = ( topic: Stream['topic'], ) => Producer; +export type GenericProducerFactoryType = ( + topic: Stream['topic'], +) => GenericProducer; + export type StreamHandler< Stream extends IStreamDefinition, K extends EventsInStream,