Skip to content

Commit

Permalink
Merge pull request #251 from thanhdanh27600/dev
Browse files Browse the repository at this point in the history
support multiple queue platform
  • Loading branch information
thanhdanh27600 authored May 24, 2024
2 parents 9bd1496 + 22f58d8 commit 3e82c70
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 79 deletions.
18 changes: 14 additions & 4 deletions next.config.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/** @type {import('next').NextConfig} */
const { queuePlatform } = require('./src/services/queue/utils');
const { i18n } = require('./next-i18next.config');
const { cronJob } = require('./src/services/crons');
const { queueReceiver } = require('./src/services/queue');
const { queueReceiver } = require('./src/services/queue/azure');
const { sendMessageToRabbitQueue, consumerMessagesRabbit } = require('./src/services/queue/rabbit');
const { PHASE_DEVELOPMENT_SERVER, PHASE_PRODUCTION_SERVER } = require('next/constants');
const isProduction = process.env.NEXT_PUBLIC_BUILD_ENV === 'production';
Expand All @@ -18,9 +19,18 @@ module.exports = async (phase, { defaultConfig }) => {
// if (!isProduction) shouldRunQueue = false;
if (process.env.NEXT_PUBLIC_SHORT_DOMAIN === 'true') shouldRunQueue = false;
if (shouldRunQueue) {
// queueReceiver();
sendMessageToRabbitQueue({ subject: 'health', body: 'Queue is starting...' });
consumerMessagesRabbit();
console.log('queuePlatform', queuePlatform);
switch (queuePlatform) {
case 'AZURE':
queueReceiver()
break;
case 'RABBIT':
default:
sendMessageToRabbitQueue({ subject: 'health', body: 'Queue is starting...' });
consumerMessagesRabbit();
break;
}

}
// cronJob();
return nextConfig;
Expand Down
7 changes: 3 additions & 4 deletions src/controllers/forward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { clone, isEmpty } from 'ramda';
import { redis } from '../redis';
import { shortenCacheService } from '../services/cache';
import { forwardCacheService } from '../services/cache/forward.service';
import { sendMessageToRabbitQueue } from '../services/queue/rabbit';
import { sendMessageToQueue } from '../services/queue';
import { shortenService } from '../services/shorten';
import { REDIS_KEY, getRedisKey } from '../types/constants';
import { Forward, ForwardMeta } from '../types/forward';
Expand Down Expand Up @@ -54,8 +54,7 @@ export const handler = api<Forward>(
// cache hit
valid = shortenService.verifyToken(shortenedUrlCache, token);
if (!valid) return res.send({ errorCode: HttpStatusCode.UNAUTHORIZED, errorMessage: 'UNAUTHORIZED' });
sendMessageToRabbitQueue({ subject: 'forward', body: data });
// sendMessageToQueue([{ subject: 'forward', body: data }]);
sendMessageToQueue({ subject: 'forward', body: data });
return successHandler(res, { history: shortenedUrlCache, token: encryptS(shortenedUrlCache.id.toString()) });
}
// cache missed, fetch and write back to cache
Expand All @@ -67,7 +66,7 @@ export const handler = api<Forward>(
valid = shortenService.verifyToken(history, token);
if (!valid) return res.send({ errorCode: HttpStatusCode.UNAUTHORIZED, errorMessage: 'UNAUTHORIZED' });

sendMessageToRabbitQueue({ subject: 'forward', body: data });
sendMessageToQueue({ subject: 'forward', body: data });
shortenCacheService.postShortenHash(clone(history));

if (history?.email) history.email = '';
Expand Down
69 changes: 69 additions & 0 deletions src/services/queue/azure/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
const { postProcessForward } = require('../postProcessForward');
const { ServiceBusClient } = require('@azure/service-bus');
const { connectionString, queueName, logger } = require('../utils');

/**
* An array of objects representing message types.
* @typedef {Object} MessageType
* @property {string} subject - The subject of the message.
* @property {*} body - The body of the message, which can be of any type.
*/

/**
* Process a single message.
*
* @param {MessageType} message - The message to be processed.
* @returns {Promise<void>} A Promise that resolves when the processing is complete.
*
* @throws {Error} Throws an error if the message processing fails.
*/
async function myMessageHandler(message) {
try {
// console.log(`Processing message ${message.subject} with content: ${JSON.stringify(message.body)}`);
switch (message.subject) {
case 'forward':
await postProcessForward(message.body);
break;
default:
break;
}
} catch (error) {
logger.error(error);
throw error;
}
}

let sbClient;
let receiver;

async function main() {
console.log('Starting Queue Receiver');
// create a Service Bus client using the connection string to the Service Bus namespace
sbClient = new ServiceBusClient(connectionString);

// createReceiver() can also be used to create a receiver for a subscription.
receiver = sbClient.createReceiver(queueName);

// function to handle any errors
const myErrorHandler = async (error) => {
console.log('Error Queue Handler', error);
};

// subscribe and specify the message and error handlers
receiver.subscribe({
processMessage: myMessageHandler,
processError: myErrorHandler,
});
}

const queueReceiver = () => {
main().catch((err) => {
console.log('Error Queue Receiver: ', err);
});
// .finally(async () => {
// await receiver.close();
// await sbClient.close();
// });
};

module.exports = { queueReceiver };
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const { ServiceBusClient } = require('@azure/service-bus');
const { connectionString, queueName, isTest, logger } = require('./utils');
const { sendMessageToRabbitQueue } = require('./rabbit')
const { connectionString, queueName, isTest, isLocal, logger } = require('../utils');

/**
* An array of objects representing message types.
Expand All @@ -19,7 +18,7 @@ const { sendMessageToRabbitQueue } = require('./rabbit')
* @param {MessageTypesArray} messages - An array of message types.
* @returns {Promise<void>} A Promise that resolves when the processing is complete.
*/
async function sendMessageToQueue(messages) {
async function sendMessageToAzureQueue(messages) {
if (isTest) return;
try {
// create a Service Bus client using the connection string to the Service Bus namespace
Expand Down Expand Up @@ -55,7 +54,7 @@ async function sendMessageToQueue(messages) {
}

// Send the last created batch of messages to the queue
// console.log(`Sending a batch of messages to the queue: ${queueName}`);
if (isLocal) console.log(`Sending a batch of messages ${JSON.stringify(messages)} to the queue: ${queueName}`);
await sender.sendMessages(batch);

// Close the sender
Expand All @@ -67,4 +66,4 @@ async function sendMessageToQueue(messages) {
}
}

module.exports = { sendMessageToQueue };
module.exports = { sendMessageToAzureQueue };
80 changes: 14 additions & 66 deletions src/services/queue/index.js
Original file line number Diff line number Diff line change
@@ -1,69 +1,17 @@
const { postProcessForward } = require('./postProcessForward');
const { ServiceBusClient } = require('@azure/service-bus');
const { connectionString, queueName, logger } = require('./utils');

/**
* An array of objects representing message types.
* @typedef {Object} MessageType
* @property {string} subject - The subject of the message.
* @property {*} body - The body of the message, which can be of any type.
*/

/**
* Process a single message.
*
* @param {MessageType} message - The message to be processed.
* @returns {Promise<void>} A Promise that resolves when the processing is complete.
*
* @throws {Error} Throws an error if the message processing fails.
*/
async function myMessageHandler(message) {
try {
// console.log(`Processing message ${message.subject} with content: ${JSON.stringify(message.body)}`);
switch (message.subject) {
case 'forward':
await postProcessForward(message.body);
break;
default:
break;
const { sendMessageToAzureQueue } = require("./azure/sendMessageToAzureQueue");
const { sendMessageToRabbitQueue } = require("./rabbit");
const { queuePlatform } = require("./utils");

const sendMessageToQueue = async (message) => {
switch (queuePlatform) {
case 'AZURE':
await sendMessageToAzureQueue([message])
break;
case 'RABBIT':
default:
await sendMessageToRabbitQueue(message);
break;
}
} catch (error) {
logger.error(error);
throw error;
}
}

let sbClient;
let receiver;

async function main() {
console.log('Starting Queue Receiver');
// create a Service Bus client using the connection string to the Service Bus namespace
sbClient = new ServiceBusClient(connectionString);

// createReceiver() can also be used to create a receiver for a subscription.
receiver = sbClient.createReceiver(queueName);

// function to handle any errors
const myErrorHandler = async (error) => {
console.log('Error Queue Handler', error);
};

// subscribe and specify the message and error handlers
receiver.subscribe({
processMessage: myMessageHandler,
processError: myErrorHandler,
});
}

const queueReceiver = () => {
main().catch((err) => {
console.log('Error Queue Receiver: ', err);
});
// .finally(async () => {
// await receiver.close();
// await sbClient.close();
// });
};

module.exports = { queueReceiver };
module.exports = { sendMessageToQueue };
2 changes: 2 additions & 0 deletions src/services/queue/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const queueName = process.env.AZURE_BUS_QUEUE_NAME || '';

const isTest = process.env.NODE_ENV === 'test';
const isLocal = process.env.NEXT_PUBLIC_BUILD_ENV === 'local';
const queuePlatform = process.env.QUEUE_PLATFORM;

const logger = pino(
{
Expand All @@ -30,5 +31,6 @@ module.exports = {
queueName,
isTest,
isLocal,
queuePlatform,
logger,
};

0 comments on commit 3e82c70

Please sign in to comment.