From f0d40eea1594a10db63505f8be3d24c7f3d4f3ae Mon Sep 17 00:00:00 2001 From: Davidson Gomes Date: Fri, 24 May 2024 18:48:54 -0300 Subject: [PATCH] New global mode for rabbitmq events --- CHANGELOG.md | 1 + Docker/.env.example | 28 +++- Dockerfile | 32 +++- package.json | 2 +- .../integrations/rabbitmq/libs/amqp.server.ts | 35 +++++ src/api/services/channel.service.ts | 145 +++++++++++++----- src/config/env.config.ts | 66 +++++++- src/dev-env.yml | 30 +++- src/docs/swagger.yaml | 2 +- src/main.ts | 8 +- 10 files changed, 298 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d30abd4a5..a4f2197c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Feature * Now in the manager, when logging in with the client's apikey, the listing only shows the instance corresponding to the provided apikey (only with MongoDB) +* New global mode for rabbitmq events # 1.7.5 (2024-05-21 08:50) diff --git a/Docker/.env.example b/Docker/.env.example index 04dd08057..3bee33caf 100644 --- a/Docker/.env.example +++ b/Docker/.env.example @@ -47,9 +47,33 @@ DATABASE_SAVE_DATA_CONTACTS=false DATABASE_SAVE_DATA_CHATS=false RABBITMQ_ENABLED=false -RABBITMQ_RABBITMQ_MODE=global -RABBITMQ_EXCHANGE_NAME=evolution_exchange RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672 +RABBITMQ_EXCHANGE_NAME=evolution_exchange +RABBITMQ_GLOBAL_ENABLED=false +RABBITMQ_EVENTS_APPLICATION_STARTUP=false +RABBITMQ_EVENTS_QRCODE_UPDATED=true +RABBITMQ_EVENTS_MESSAGES_SET=true +RABBITMQ_EVENTS_MESSAGES_UPSERT=true +RABBITMQ_EVENTS_MESSAGES_UPDATE=true +RABBITMQ_EVENTS_MESSAGES_DELETE=true +RABBITMQ_EVENTS_SEND_MESSAGE=true +RABBITMQ_EVENTS_CONTACTS_SET=true +RABBITMQ_EVENTS_CONTACTS_UPSERT=true +RABBITMQ_EVENTS_CONTACTS_UPDATE=true +RABBITMQ_EVENTS_PRESENCE_UPDATE=true +RABBITMQ_EVENTS_CHATS_SET=true +RABBITMQ_EVENTS_CHATS_UPSERT=true +RABBITMQ_EVENTS_CHATS_UPDATE=true +RABBITMQ_EVENTS_CHATS_DELETE=true +RABBITMQ_EVENTS_GROUPS_UPSERT=true +RABBITMQ_EVENTS_GROUPS_UPDATE=true +RABBITMQ_EVENTS_GROUP_PARTICIPANTS_UPDATE=true +RABBITMQ_EVENTS_CONNECTION_UPDATE=true +RABBITMQ_EVENTS_LABELS_EDIT=true +RABBITMQ_EVENTS_LABELS_ASSOCIATION=true +RABBITMQ_EVENTS_CALL=true +RABBITMQ_EVENTS_TYPEBOT_START=false +RABBITMQ_EVENTS_TYPEBOT_CHANGE_STATUS=false WEBSOCKET_ENABLED=false WEBSOCKET_GLOBAL_EVENTS=false diff --git a/Dockerfile b/Dockerfile index 2dac29c02..9db93aebd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM node:20.7.0-alpine AS builder -LABEL version="1.7.5" description="Api to control whatsapp features through http requests." +LABEL version="1.8.0" description="Api to control whatsapp features through http requests." LABEL maintainer="Davidson Gomes" git="https://github.com/DavidsonGomes" LABEL contact="contato@agenciadgcode.com" @@ -59,9 +59,35 @@ ENV DATABASE_SAVE_DATA_CONTACTS=false ENV DATABASE_SAVE_DATA_CHATS=false ENV RABBITMQ_ENABLED=false -ENV RABBITMQ_MODE=global -ENV RABBITMQ_EXCHANGE_NAME=evolution_exchange ENV RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672 +ENV RABBITMQ_EXCHANGE_NAME=evolution_exchange +ENV RABBITMQ_GLOBAL_ENABLED=false +ENV RABBITMQ_EVENTS_APPLICATION_STARTUP=false +ENV RABBITMQ_EVENTS_INSTANCE_CREATE=false +ENV RABBITMQ_EVENTS_INSTANCE_DELETE=false +ENV RABBITMQ_EVENTS_QRCODE_UPDATED=true +ENV RABBITMQ_EVENTS_MESSAGES_SET=true +ENV RABBITMQ_EVENTS_MESSAGES_UPSERT=true +ENV RABBITMQ_EVENTS_MESSAGES_UPDATE=true +ENV RABBITMQ_EVENTS_MESSAGES_DELETE=true +ENV RABBITMQ_EVENTS_SEND_MESSAGE=true +ENV RABBITMQ_EVENTS_CONTACTS_SET=true +ENV RABBITMQ_EVENTS_CONTACTS_UPSERT=true +ENV RABBITMQ_EVENTS_CONTACTS_UPDATE=true +ENV RABBITMQ_EVENTS_PRESENCE_UPDATE=true +ENV RABBITMQ_EVENTS_CHATS_SET=true +ENV RABBITMQ_EVENTS_CHATS_UPSERT=true +ENV RABBITMQ_EVENTS_CHATS_UPDATE=true +ENV RABBITMQ_EVENTS_CHATS_DELETE=true +ENV RABBITMQ_EVENTS_GROUPS_UPSERT=true +ENV RABBITMQ_EVENTS_GROUPS_UPDATE=true +ENV RABBITMQ_EVENTS_GROUP_PARTICIPANTS_UPDATE=true +ENV RABBITMQ_EVENTS_CONNECTION_UPDATE=true +ENV RABBITMQ_EVENTS_LABELS_EDIT=true +ENV RABBITMQ_EVENTS_LABELS_ASSOCIATION=true +ENV RABBITMQ_EVENTS_CALL=true +ENV RABBITMQ_EVENTS_TYPEBOT_START=false +ENV RABBITMQ_EVENTS_TYPEBOT_CHANGE_STATUS=false ENV WEBSOCKET_ENABLED=false ENV WEBSOCKET_GLOBAL_EVENTS=false diff --git a/package.json b/package.json index af6198c7c..b6f784808 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "evolution-api", - "version": "1.7.5", + "version": "1.8.0", "description": "Rest api for communication with WhatsApp", "main": "./dist/src/main.js", "scripts": { diff --git a/src/api/integrations/rabbitmq/libs/amqp.server.ts b/src/api/integrations/rabbitmq/libs/amqp.server.ts index 5628fac9e..99c10f661 100644 --- a/src/api/integrations/rabbitmq/libs/amqp.server.ts +++ b/src/api/integrations/rabbitmq/libs/amqp.server.ts @@ -42,6 +42,41 @@ export const getAMQP = (): amqp.Channel | null => { return amqpChannel; }; +export const initGlobalQueues = () => { + logger.info('Initializing global queues'); + const events = configService.get('RABBITMQ').EVENTS; + + if (!events) { + logger.warn('No events to initialize on AMQP'); + return; + } + + const eventKeys = Object.keys(events); + + eventKeys.forEach((event) => { + if (events[event] === false) return; + + const queueName = `${event.replace(/_/g, '.').toLowerCase()}`; + const amqp = getAMQP(); + const exchangeName = 'evolution_exchange'; + + amqp.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); + + amqp.assertQueue(queueName, { + durable: true, + autoDelete: false, + arguments: { + 'x-queue-type': 'quorum', + }, + }); + + amqp.bindQueue(queueName, exchangeName, event); + }); +}; + export const initQueues = (instanceName: string, events: string[]) => { if (!events || !events.length) return; diff --git a/src/api/services/channel.service.ts b/src/api/services/channel.service.ts index 0a35154c8..d42c5b1af 100644 --- a/src/api/services/channel.service.ts +++ b/src/api/services/channel.service.ts @@ -13,6 +13,7 @@ import { Database, HttpServer, Log, + Rabbitmq, Sqs, Webhook, Websocket, @@ -688,6 +689,9 @@ export class ChannelStartupService { const rabbitmqLocal = this.localRabbitmq.events; const sqsLocal = this.localSqs.events; const serverUrl = this.configService.get('SERVER').URL; + const rabbitmqEnabled = this.configService.get('RABBITMQ').ENABLED; + const rabbitmqGlobal = this.configService.get('RABBITMQ').GLOBAL_ENABLED; + const rabbitmqEvents = this.configService.get('RABBITMQ').EVENTS; const we = event.replace(/[.-]/gm, '_').toUpperCase(); const transformedWe = we.replace(/_/gm, '-').toLowerCase(); const tzoffset = new Date().getTimezoneOffset() * 60000; //offset in milliseconds @@ -698,67 +702,134 @@ export class ChannelStartupService { const tokenStore = await this.repository.auth.find(this.instanceName); const instanceApikey = tokenStore?.apikey || 'Apikey not found'; - if (this.localRabbitmq.enabled) { + if (rabbitmqEnabled) { const amqp = getAMQP(); - - if (amqp) { + if (this.localRabbitmq.enabled && amqp) { if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) { const exchangeName = this.instanceName ?? 'evolution_exchange'; - // await amqp.assertExchange(exchangeName, 'topic', { - // durable: true, - // autoDelete: false, - // }); + let retry = 0; - await this.assertExchangeAsync(amqp, exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); + while (retry < 3) { + try { + await amqp.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); - const queueName = `${this.instanceName}.${event}`; + const queueName = `${this.instanceName}.${event}`; - await amqp.assertQueue(queueName, { - durable: true, - autoDelete: false, - arguments: { - 'x-queue-type': 'quorum', - }, - }); + await amqp.assertQueue(queueName, { + durable: true, + autoDelete: false, + arguments: { + 'x-queue-type': 'quorum', + }, + }); - await amqp.bindQueue(queueName, exchangeName, event); + await amqp.bindQueue(queueName, exchangeName, event); - const message = { - event, - instance: this.instance.name, - data, - server_url: serverUrl, - date_time: now, - sender: this.wuid, - }; + const message = { + event, + instance: this.instance.name, + data, + server_url: serverUrl, + date_time: now, + sender: this.wuid, + }; - if (expose && instanceApikey) { - message['apikey'] = instanceApikey; + if (expose && instanceApikey) { + message['apikey'] = instanceApikey; + } + + await amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); + + if (this.configService.get('LOG').LEVEL.includes('WEBHOOKS')) { + const logData = { + local: ChannelStartupService.name + '.sendData-RabbitMQ', + event, + instance: this.instance.name, + data, + server_url: serverUrl, + apikey: (expose && instanceApikey) || null, + date_time: now, + sender: this.wuid, + }; + + if (expose && instanceApikey) { + logData['apikey'] = instanceApikey; + } + + this.logger.log(logData); + } + break; + } catch (error) { + retry++; + } } + } + } + + if (rabbitmqGlobal && rabbitmqEvents[we] && amqp) { + const exchangeName = 'evolution_exchange'; + + let retry = 0; + + while (retry < 3) { + try { + await amqp.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); + + const queueName = transformedWe; - await amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); + await amqp.assertQueue(queueName, { + durable: true, + autoDelete: false, + arguments: { + 'x-queue-type': 'quorum', + }, + }); - if (this.configService.get('LOG').LEVEL.includes('WEBHOOKS')) { - const logData = { - local: ChannelStartupService.name + '.sendData-RabbitMQ', + await amqp.bindQueue(queueName, exchangeName, event); + + const message = { event, instance: this.instance.name, data, server_url: serverUrl, - apikey: (expose && instanceApikey) || null, date_time: now, sender: this.wuid, }; if (expose && instanceApikey) { - logData['apikey'] = instanceApikey; + message['apikey'] = instanceApikey; + } + await amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); + + if (this.configService.get('LOG').LEVEL.includes('WEBHOOKS')) { + const logData = { + local: ChannelStartupService.name + '.sendData-RabbitMQ-Global', + event, + instance: this.instance.name, + data, + server_url: serverUrl, + apikey: (expose && instanceApikey) || null, + date_time: now, + sender: this.wuid, + }; + + if (expose && instanceApikey) { + logData['apikey'] = instanceApikey; + } + + this.logger.log(logData); } - this.logger.log(logData); + break; + } catch (error) { + retry++; } } } diff --git a/src/config/env.config.ts b/src/config/env.config.ts index 4f37b0901..eab883f7f 100644 --- a/src/config/env.config.ts +++ b/src/config/env.config.ts @@ -63,11 +63,42 @@ export type Database = { SAVE_DATA: SaveData; }; +export type EventsRabbitmq = { + APPLICATION_STARTUP: boolean; + INSTANCE_CREATE: boolean; + INSTANCE_DELETE: boolean; + QRCODE_UPDATED: boolean; + MESSAGES_SET: boolean; + MESSAGES_UPSERT: boolean; + MESSAGES_UPDATE: boolean; + MESSAGES_DELETE: boolean; + SEND_MESSAGE: boolean; + CONTACTS_SET: boolean; + CONTACTS_UPDATE: boolean; + CONTACTS_UPSERT: boolean; + PRESENCE_UPDATE: boolean; + CHATS_SET: boolean; + CHATS_UPDATE: boolean; + CHATS_DELETE: boolean; + CHATS_UPSERT: boolean; + CONNECTION_UPDATE: boolean; + LABELS_EDIT: boolean; + LABELS_ASSOCIATION: boolean; + GROUPS_UPSERT: boolean; + GROUP_UPDATE: boolean; + GROUP_PARTICIPANTS_UPDATE: boolean; + CALL: boolean; + NEW_JWT_TOKEN: boolean; + TYPEBOT_START: boolean; + TYPEBOT_CHANGE_STATUS: boolean; +}; + export type Rabbitmq = { ENABLED: boolean; - MODE: string; // global, single, isolated - EXCHANGE_NAME: string; // available for global and single, isolated mode will use instance name as exchange URI: string; + EXCHANGE_NAME: string; + GLOBAL_ENABLED: boolean; + EVENTS: EventsRabbitmq; }; export type Sqs = { @@ -276,9 +307,38 @@ export class ConfigService { }, RABBITMQ: { ENABLED: process.env?.RABBITMQ_ENABLED === 'true', - MODE: process.env?.RABBITMQ_MODE || 'isolated', + GLOBAL_ENABLED: process.env?.RABBITMQ_GLOBAL_ENABLED === 'true', EXCHANGE_NAME: process.env?.RABBITMQ_EXCHANGE_NAME || 'evolution_exchange', URI: process.env.RABBITMQ_URI || '', + EVENTS: { + APPLICATION_STARTUP: process.env?.RABBITMQ_EVENTS_APPLICATION_STARTUP === 'true', + INSTANCE_CREATE: process.env?.RABBITMQ_EVENTS_INSTANCE_CREATE === 'true', + INSTANCE_DELETE: process.env?.RABBITMQ_EVENTS_INSTANCE_DELETE === 'true', + QRCODE_UPDATED: process.env?.RABBITMQ_EVENTS_QRCODE_UPDATED === 'true', + MESSAGES_SET: process.env?.RABBITMQ_EVENTS_MESSAGES_SET === 'true', + MESSAGES_UPSERT: process.env?.RABBITMQ_EVENTS_MESSAGES_UPSERT === 'true', + MESSAGES_UPDATE: process.env?.RABBITMQ_EVENTS_MESSAGES_UPDATE === 'true', + MESSAGES_DELETE: process.env?.RABBITMQ_EVENTS_MESSAGES_DELETE === 'true', + SEND_MESSAGE: process.env?.RABBITMQ_EVENTS_SEND_MESSAGE === 'true', + CONTACTS_SET: process.env?.RABBITMQ_EVENTS_CONTACTS_SET === 'true', + CONTACTS_UPDATE: process.env?.RABBITMQ_EVENTS_CONTACTS_UPDATE === 'true', + CONTACTS_UPSERT: process.env?.RABBITMQ_EVENTS_CONTACTS_UPSERT === 'true', + PRESENCE_UPDATE: process.env?.RABBITMQ_EVENTS_PRESENCE_UPDATE === 'true', + CHATS_SET: process.env?.RABBITMQ_EVENTS_CHATS_SET === 'true', + CHATS_UPDATE: process.env?.RABBITMQ_EVENTS_CHATS_UPDATE === 'true', + CHATS_UPSERT: process.env?.RABBITMQ_EVENTS_CHATS_UPSERT === 'true', + CHATS_DELETE: process.env?.RABBITMQ_EVENTS_CHATS_DELETE === 'true', + CONNECTION_UPDATE: process.env?.RABBITMQ_EVENTS_CONNECTION_UPDATE === 'true', + LABELS_EDIT: process.env?.RABBITMQ_EVENTS_LABELS_EDIT === 'true', + LABELS_ASSOCIATION: process.env?.RABBITMQ_EVENTS_LABELS_ASSOCIATION === 'true', + GROUPS_UPSERT: process.env?.RABBITMQ_EVENTS_GROUPS_UPSERT === 'true', + GROUP_UPDATE: process.env?.RABBITMQ_EVENTS_GROUPS_UPDATE === 'true', + GROUP_PARTICIPANTS_UPDATE: process.env?.RABBITMQ_EVENTS_GROUP_PARTICIPANTS_UPDATE === 'true', + CALL: process.env?.RABBITMQ_EVENTS_CALL === 'true', + NEW_JWT_TOKEN: process.env?.RABBITMQ_EVENTS_NEW_JWT_TOKEN === 'true', + TYPEBOT_START: process.env?.RABBITMQ_EVENTS_TYPEBOT_START === 'true', + TYPEBOT_CHANGE_STATUS: process.env?.RABBITMQ_EVENTS_TYPEBOT_CHANGE_STATUS === 'true', + }, }, SQS: { ENABLED: process.env?.SQS_ENABLED === 'true', diff --git a/src/dev-env.yml b/src/dev-env.yml index e7f0fae34..d1f638f6d 100644 --- a/src/dev-env.yml +++ b/src/dev-env.yml @@ -79,9 +79,35 @@ DATABASE: RABBITMQ: ENABLED: false - MODE: "global" - EXCHANGE_NAME: "evolution_exchange" URI: "amqp://guest:guest@localhost:5672" + EXCHANGE_NAME: evolution_exchange + GLOBAL_ENABLED: true + EVENTS: + APPLICATION_STARTUP: false + INSTANCE_CREATE: false + INSTANCE_DELETE: false + QRCODE_UPDATED: false + MESSAGES_SET: false + MESSAGES_UPSERT: true + MESSAGES_UPDATE: true + MESSAGES_DELETE: false + SEND_MESSAGE: false + CONTACTS_SET: false + CONTACTS_UPSERT: false + CONTACTS_UPDATE: false + PRESENCE_UPDATE: false + CHATS_SET: false + CHATS_UPSERT: false + CHATS_UPDATE: false + CHATS_DELETE: false + GROUPS_UPSERT: true + GROUP_UPDATE: true + GROUP_PARTICIPANTS_UPDATE: true + CONNECTION_UPDATE: true + CALL: false + # This events is used with Typebot + TYPEBOT_START: false + TYPEBOT_CHANGE_STATUS: false SQS: ENABLED: true diff --git a/src/docs/swagger.yaml b/src/docs/swagger.yaml index 0bcab52d2..59b252d3e 100644 --- a/src/docs/swagger.yaml +++ b/src/docs/swagger.yaml @@ -25,7 +25,7 @@ info: [![Run in Postman](https://run.pstmn.io/button.svg)](https://god.gw.postman.com/run-collection/26869335-5546d063-156b-4529-915f-909dd628c090?action=collection%2Ffork&source=rip_markdown&collection-url=entityId%3D26869335-5546d063-156b-4529-915f-909dd628c090%26entityType%3Dcollection%26workspaceId%3D339a4ee7-378b-45c9-b5b8-fd2c0a9c2442) - version: 1.7.5 + version: 1.8.0 contact: name: DavidsonGomes email: contato@agenciadgcode.com diff --git a/src/main.ts b/src/main.ts index 942c725f4..815e2a111 100644 --- a/src/main.ts +++ b/src/main.ts @@ -6,7 +6,7 @@ import cors from 'cors'; import express, { json, NextFunction, Request, Response, urlencoded } from 'express'; import { join } from 'path'; -import { initAMQP } from './api/integrations/rabbitmq/libs/amqp.server'; +import { initAMQP, initGlobalQueues } from './api/integrations/rabbitmq/libs/amqp.server'; import { initSQS } from './api/integrations/sqs/libs/sqs.server'; import { initIO } from './api/integrations/websocket/libs/socket.server'; import { HttpStatus, router } from './api/routes/index.router'; @@ -128,7 +128,11 @@ function bootstrap() { initIO(server); - if (configService.get('RABBITMQ')?.ENABLED) initAMQP(); + if (configService.get('RABBITMQ')?.ENABLED) { + initAMQP().then(() => { + if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) initGlobalQueues(); + }); + } if (configService.get('SQS')?.ENABLED) initSQS();