Skip to content

Commit

Permalink
New global mode for rabbitmq events
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidsonGomes committed May 24, 2024
1 parent 68dcc1c commit f0d40ee
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Feature

* Now in the manager, when logging in with the client's apikey, the listing only shows the instance corresponding to the provided apikey (only with MongoDB)
* New global mode for rabbitmq events

# 1.7.5 (2024-05-21 08:50)

Expand Down
28 changes: 26 additions & 2 deletions Docker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,33 @@ DATABASE_SAVE_DATA_CONTACTS=false
DATABASE_SAVE_DATA_CHATS=false

RABBITMQ_ENABLED=false
RABBITMQ_RABBITMQ_MODE=global
RABBITMQ_EXCHANGE_NAME=evolution_exchange
RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672
RABBITMQ_EXCHANGE_NAME=evolution_exchange
RABBITMQ_GLOBAL_ENABLED=false
RABBITMQ_EVENTS_APPLICATION_STARTUP=false
RABBITMQ_EVENTS_QRCODE_UPDATED=true
RABBITMQ_EVENTS_MESSAGES_SET=true
RABBITMQ_EVENTS_MESSAGES_UPSERT=true
RABBITMQ_EVENTS_MESSAGES_UPDATE=true
RABBITMQ_EVENTS_MESSAGES_DELETE=true
RABBITMQ_EVENTS_SEND_MESSAGE=true
RABBITMQ_EVENTS_CONTACTS_SET=true
RABBITMQ_EVENTS_CONTACTS_UPSERT=true
RABBITMQ_EVENTS_CONTACTS_UPDATE=true
RABBITMQ_EVENTS_PRESENCE_UPDATE=true
RABBITMQ_EVENTS_CHATS_SET=true
RABBITMQ_EVENTS_CHATS_UPSERT=true
RABBITMQ_EVENTS_CHATS_UPDATE=true
RABBITMQ_EVENTS_CHATS_DELETE=true
RABBITMQ_EVENTS_GROUPS_UPSERT=true
RABBITMQ_EVENTS_GROUPS_UPDATE=true
RABBITMQ_EVENTS_GROUP_PARTICIPANTS_UPDATE=true
RABBITMQ_EVENTS_CONNECTION_UPDATE=true
RABBITMQ_EVENTS_LABELS_EDIT=true
RABBITMQ_EVENTS_LABELS_ASSOCIATION=true
RABBITMQ_EVENTS_CALL=true
RABBITMQ_EVENTS_TYPEBOT_START=false
RABBITMQ_EVENTS_TYPEBOT_CHANGE_STATUS=false

WEBSOCKET_ENABLED=false
WEBSOCKET_GLOBAL_EVENTS=false
Expand Down
32 changes: 29 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM node:20.7.0-alpine AS builder

LABEL version="1.7.5" description="Api to control whatsapp features through http requests."
LABEL version="1.8.0" description="Api to control whatsapp features through http requests."
LABEL maintainer="Davidson Gomes" git="https://github.com/DavidsonGomes"
LABEL contact="[email protected]"

Expand Down Expand Up @@ -59,9 +59,35 @@ ENV DATABASE_SAVE_DATA_CONTACTS=false
ENV DATABASE_SAVE_DATA_CHATS=false

ENV RABBITMQ_ENABLED=false
ENV RABBITMQ_MODE=global
ENV RABBITMQ_EXCHANGE_NAME=evolution_exchange
ENV RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672
ENV RABBITMQ_EXCHANGE_NAME=evolution_exchange
ENV RABBITMQ_GLOBAL_ENABLED=false
ENV RABBITMQ_EVENTS_APPLICATION_STARTUP=false
ENV RABBITMQ_EVENTS_INSTANCE_CREATE=false
ENV RABBITMQ_EVENTS_INSTANCE_DELETE=false
ENV RABBITMQ_EVENTS_QRCODE_UPDATED=true
ENV RABBITMQ_EVENTS_MESSAGES_SET=true
ENV RABBITMQ_EVENTS_MESSAGES_UPSERT=true
ENV RABBITMQ_EVENTS_MESSAGES_UPDATE=true
ENV RABBITMQ_EVENTS_MESSAGES_DELETE=true
ENV RABBITMQ_EVENTS_SEND_MESSAGE=true
ENV RABBITMQ_EVENTS_CONTACTS_SET=true
ENV RABBITMQ_EVENTS_CONTACTS_UPSERT=true
ENV RABBITMQ_EVENTS_CONTACTS_UPDATE=true
ENV RABBITMQ_EVENTS_PRESENCE_UPDATE=true
ENV RABBITMQ_EVENTS_CHATS_SET=true
ENV RABBITMQ_EVENTS_CHATS_UPSERT=true
ENV RABBITMQ_EVENTS_CHATS_UPDATE=true
ENV RABBITMQ_EVENTS_CHATS_DELETE=true
ENV RABBITMQ_EVENTS_GROUPS_UPSERT=true
ENV RABBITMQ_EVENTS_GROUPS_UPDATE=true
ENV RABBITMQ_EVENTS_GROUP_PARTICIPANTS_UPDATE=true
ENV RABBITMQ_EVENTS_CONNECTION_UPDATE=true
ENV RABBITMQ_EVENTS_LABELS_EDIT=true
ENV RABBITMQ_EVENTS_LABELS_ASSOCIATION=true
ENV RABBITMQ_EVENTS_CALL=true
ENV RABBITMQ_EVENTS_TYPEBOT_START=false
ENV RABBITMQ_EVENTS_TYPEBOT_CHANGE_STATUS=false

ENV WEBSOCKET_ENABLED=false
ENV WEBSOCKET_GLOBAL_EVENTS=false
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "evolution-api",
"version": "1.7.5",
"version": "1.8.0",
"description": "Rest api for communication with WhatsApp",
"main": "./dist/src/main.js",
"scripts": {
Expand Down
35 changes: 35 additions & 0 deletions src/api/integrations/rabbitmq/libs/amqp.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,41 @@ export const getAMQP = (): amqp.Channel | null => {
return amqpChannel;
};

export const initGlobalQueues = () => {
logger.info('Initializing global queues');
const events = configService.get<Rabbitmq>('RABBITMQ').EVENTS;

if (!events) {
logger.warn('No events to initialize on AMQP');
return;
}

const eventKeys = Object.keys(events);

eventKeys.forEach((event) => {
if (events[event] === false) return;

const queueName = `${event.replace(/_/g, '.').toLowerCase()}`;
const amqp = getAMQP();
const exchangeName = 'evolution_exchange';

amqp.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
});

amqp.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});

amqp.bindQueue(queueName, exchangeName, event);
});
};

export const initQueues = (instanceName: string, events: string[]) => {
if (!events || !events.length) return;

Expand Down
145 changes: 108 additions & 37 deletions src/api/services/channel.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
Database,
HttpServer,
Log,
Rabbitmq,
Sqs,
Webhook,
Websocket,
Expand Down Expand Up @@ -688,6 +689,9 @@ export class ChannelStartupService {
const rabbitmqLocal = this.localRabbitmq.events;
const sqsLocal = this.localSqs.events;
const serverUrl = this.configService.get<HttpServer>('SERVER').URL;
const rabbitmqEnabled = this.configService.get<Rabbitmq>('RABBITMQ').ENABLED;
const rabbitmqGlobal = this.configService.get<Rabbitmq>('RABBITMQ').GLOBAL_ENABLED;
const rabbitmqEvents = this.configService.get<Rabbitmq>('RABBITMQ').EVENTS;
const we = event.replace(/[.-]/gm, '_').toUpperCase();
const transformedWe = we.replace(/_/gm, '-').toLowerCase();
const tzoffset = new Date().getTimezoneOffset() * 60000; //offset in milliseconds
Expand All @@ -698,67 +702,134 @@ export class ChannelStartupService {
const tokenStore = await this.repository.auth.find(this.instanceName);
const instanceApikey = tokenStore?.apikey || 'Apikey not found';

if (this.localRabbitmq.enabled) {
if (rabbitmqEnabled) {
const amqp = getAMQP();

if (amqp) {
if (this.localRabbitmq.enabled && amqp) {
if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) {
const exchangeName = this.instanceName ?? 'evolution_exchange';

// await amqp.assertExchange(exchangeName, 'topic', {
// durable: true,
// autoDelete: false,
// });
let retry = 0;

await this.assertExchangeAsync(amqp, exchangeName, 'topic', {
durable: true,
autoDelete: false,
});
while (retry < 3) {
try {
await amqp.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
});

const queueName = `${this.instanceName}.${event}`;
const queueName = `${this.instanceName}.${event}`;

await amqp.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});
await amqp.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});

await amqp.bindQueue(queueName, exchangeName, event);
await amqp.bindQueue(queueName, exchangeName, event);

const message = {
event,
instance: this.instance.name,
data,
server_url: serverUrl,
date_time: now,
sender: this.wuid,
};
const message = {
event,
instance: this.instance.name,
data,
server_url: serverUrl,
date_time: now,
sender: this.wuid,
};

if (expose && instanceApikey) {
message['apikey'] = instanceApikey;
if (expose && instanceApikey) {
message['apikey'] = instanceApikey;
}

await amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));

if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: ChannelStartupService.name + '.sendData-RabbitMQ',
event,
instance: this.instance.name,
data,
server_url: serverUrl,
apikey: (expose && instanceApikey) || null,
date_time: now,
sender: this.wuid,
};

if (expose && instanceApikey) {
logData['apikey'] = instanceApikey;
}

this.logger.log(logData);
}
break;
} catch (error) {
retry++;
}
}
}
}

if (rabbitmqGlobal && rabbitmqEvents[we] && amqp) {
const exchangeName = 'evolution_exchange';

let retry = 0;

while (retry < 3) {
try {
await amqp.assertExchange(exchangeName, 'topic', {
durable: true,
autoDelete: false,
});

const queueName = transformedWe;

await amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
await amqp.assertQueue(queueName, {
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'quorum',
},
});

if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: ChannelStartupService.name + '.sendData-RabbitMQ',
await amqp.bindQueue(queueName, exchangeName, event);

const message = {
event,
instance: this.instance.name,
data,
server_url: serverUrl,
apikey: (expose && instanceApikey) || null,
date_time: now,
sender: this.wuid,
};

if (expose && instanceApikey) {
logData['apikey'] = instanceApikey;
message['apikey'] = instanceApikey;
}
await amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));

if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: ChannelStartupService.name + '.sendData-RabbitMQ-Global',
event,
instance: this.instance.name,
data,
server_url: serverUrl,
apikey: (expose && instanceApikey) || null,
date_time: now,
sender: this.wuid,
};

if (expose && instanceApikey) {
logData['apikey'] = instanceApikey;
}

this.logger.log(logData);
}

this.logger.log(logData);
break;
} catch (error) {
retry++;
}
}
}
Expand Down
Loading

0 comments on commit f0d40ee

Please sign in to comment.