Skip to content

Commit

Permalink
Merge pull request #72 from Runnable/assert-on-publish
Browse files Browse the repository at this point in the history
add assert to publishers
  • Loading branch information
anandkumarpatel authored Aug 18, 2016
2 parents 4b3fc37 + 415e37a commit e364cc3
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 253 deletions.
10 changes: 6 additions & 4 deletions interfaces/modules/joi.js.flow
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
declare class Joi<T> {
static alternatives(): T;
static array(): T;
static assert(opts: Object, schema: T): T;
static bool(): T;
static func(): T;
static number(): T;
static object(): T;
static string(): T;
static required(): T;
static number(): T;
static bool(): T;
static assert(opts: Object, schema: T): T;
static string(): T;
static validate(opts: Object, schema: T, opts: Object): JoiValidate;
}

Expand Down
168 changes: 123 additions & 45 deletions src/rabbitmq.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* @flow */
/* global Bluebird$Promise RabbitMQChannel RabbitMQConfirmChannel RabbitMQConnection SubscribeObject RabbitMQOptions */
/* global Bluebird$Promise RabbitMQChannel RabbitMQConfirmChannel RabbitMQConnection SubscribeObject RabbitMQOptions QueueObject*/
'use strict'

const amqplib = require('amqplib')
Expand All @@ -9,11 +9,39 @@ const Immutable = require('immutable')
const isFunction = require('101/is-function')
const isObject = require('101/is-object')
const isString = require('101/is-string')
const joi = require('joi')
const Promise = require('bluebird')
const uuid = require('uuid')

const logger = require('./logger')

const tasksSchema = joi.alternatives().try(joi.string(), joi.object({
name: joi.func().required(),
exclusive: joi.bool(),
durable: joi.bool(),
autoDelete: joi.bool()
}))

const eventsSchema = joi.alternatives().try(joi.string(), joi.object({
name: joi.func().required(),
internal: joi.bool(),
durable: joi.bool(),
autoDelete: joi.bool(),
alternateExchange: joi.bool()
}))

const optsSchema = joi.object({
name: joi.string(),
hostname: joi.string(),
port: joi.number(),
username: joi.string(),
password: joi.string(),
log: joi.object().type(logger.constructor, 'Bunyan Logger'),
channelOpts: joi.object(),
tasks: joi.array().items(tasksSchema),
events: joi.array().items(eventsSchema)
}).or('tasks', 'events').required()

/**
* RabbitMQ model. Can be used independently for publishing or other uses.
*
Expand All @@ -39,6 +67,7 @@ class RabbitMQ {
channelOpts: Object;
connection: RabbitMQConnection;
consuming: Map<string, string>;
events: Array<string|Object>;
hostname: string;
log: Object;
name: string;
Expand All @@ -47,13 +76,12 @@ class RabbitMQ {
publishChannel: RabbitMQConfirmChannel;
subscribed: Set<string>;
subscriptions: Map<string, Function>;
tasks: Array<string|Object>;
username: string;

constructor (opts: Object) {
this.name = opts.name || 'ponos'
this.hostname = opts.hostname ||
process.env.RABBITMQ_HOSTNAME ||
'localhost'
this.hostname = opts.hostname || process.env.RABBITMQ_HOSTNAME || 'localhost'
this.port = opts.port || parseInt(process.env.RABBITMQ_PORT, 10) || 5672
this.username = opts.username || process.env.RABBITMQ_USERNAME
this.password = opts.password || process.env.RABBITMQ_PASSWORD
Expand All @@ -65,7 +93,10 @@ class RabbitMQ {
'constructor documentation.'
)
}
this.tasks = opts.tasks || []
this.events = opts.events || []
this.log.trace({ opts: opts }, 'RabbitMQ constructor')
joi.assert(this, optsSchema)
this._setCleanState()
}

Expand Down Expand Up @@ -127,8 +158,34 @@ class RabbitMQ {
this.publishChannel = channel
this.publishChannel.on('error', this._channelErrorHandler.bind(this))
})
.then(() => {
return this._assertQueuesAndExchanges()
})
}

/**
* Asserts all passed queues and exchanges on channel
* @return {Promise} Promise resolved when everything is asserted
*/
_assertQueuesAndExchanges (): Bluebird$Promise<void> {
return Promise.each(this.events, (event) => {
if (typeof event === 'string') {
return this._assertExchange(event, 'fanout')
}

return this._assertExchange(event.name, 'fanout', event)
})
.then(() => {
return Promise.each(this.tasks, (task) => {
if (typeof task === 'string') {
return this._assertQueue(`${this.name}.${task}`)
}

return this._assertQueue(`${this.name}.${task.name}`, task)
})
})
.return()
}
/**
* Takes an object representing a message and sends it to a queue.
*
Expand Down Expand Up @@ -179,16 +236,21 @@ class RabbitMQ {

/**
* Takes an object representing a message and sends it to a task queue.
*
* appends passed in name to tasks
* @param {String} queue Task queue to receive the message.
* @param {Object} content Job to send.
* @return {Promise} Promise resolved when message is sent to queue.
*/
publishTask (queue: string, content: Object): Bluebird$Promise<void> {
return Promise.try(() => {
const queueName = `${this.name}.${queue}`
const bufferContent = this._validatePublish(queue, content)
this.log.trace({ queue: queueName, job: content }, 'Publishing job')
if (!~this.tasks.indexOf(queue)) {
throw new Error('Trying to publish task not defined in constructor')
}
return Promise.resolve(
this.publishChannel.sendToQueue(queue, bufferContent)
this.publishChannel.sendToQueue(queueName, bufferContent)
)
})
}
Expand All @@ -204,13 +266,52 @@ class RabbitMQ {
publishEvent (exchange: string, content: Object): Bluebird$Promise<void> {
return Promise.try(() => {
const bufferContent = this._validatePublish(exchange, content)
if (!~this.events.indexOf(exchange)) {
throw new Error('Trying to publish event not defined in constructor')
}
// events do not need a routing key (so we send '')
return Promise.resolve(
this.publishChannel.publish(exchange, '', bufferContent)
)
})
}

/**
* Asserts exchanges on the channel.
*
* @param {String} name Exchange Name
* @param {String} type Type of exchange [topic|fanout]
* @param {Object} opts extra options for exchange
* @return {Promise} Promise resolved when exchange is created.
* @resolves {QueueObject} asserted exchange
*/
_assertExchange (name: string, type: string, opts?: Object): Bluebird$Promise<void> {
return Promise.resolve(
this.channel.assertExchange(
name,
type,
defaults(opts, RabbitMQ.AMQPLIB_EXCHANGE_DEFAULTS)
)
)
}

/**
* Asserts queue on the channel.
*
* @param {String} name Queue Name
* @param {Object} opts extra options for queue
* @return {Promise} Promise resolved when queue is created.
* @resolves {QueueObject} asserted queue
*/
_assertQueue (name: string, opts?: Object): Bluebird$Promise<QueueObject> {
return Promise.resolve(
this.channel.assertQueue(
name,
defaults(opts, RabbitMQ.AMQPLIB_QUEUE_DEFAULTS)
)
)
}

/**
* Subscribe to a specific direct queue.
*
Expand All @@ -226,9 +327,10 @@ class RabbitMQ {
handler: Function,
queueOptions?: Object
): Bluebird$Promise<void> {
const queueName = `${this.name}.${queue}`
const log = this.log.child({
method: 'subscribeToQueue',
queue: queue
queue: queueName
})
log.info('subscribing to queue')
if (!this._isConnected()) {
Expand All @@ -237,25 +339,14 @@ class RabbitMQ {
if (!isFunction(handler)) {
log.error('handler must be a function')
return Promise.reject(
new Error(`handler for ${queue} must be a function`)
new Error(`handler for ${queueName} must be a function`)
)
}
if (this.subscribed.has(`queue:::${queue}`)) {
log.warn('already subscribed to queue')
return Promise.resolve()
}
return Promise
.resolve(
this.channel.assertQueue(
queue,
defaults(queueOptions, RabbitMQ.AMQPLIB_QUEUE_DEFAULTS)
)
)
.then(() => {
log.info('queue asserted, binding queue')
this.subscriptions = this.subscriptions.set(queue, handler)
this.subscribed = this.subscribed.add(`queue:::${queue}`)
})
return Promise.try(() => {
log.trace('binding to queue')
this.subscriptions = this.subscriptions.set(queueName, handler)
this.subscribed = this.subscribed.add(`queue:::${queueName}`)
})
}

/**
Expand Down Expand Up @@ -502,27 +593,14 @@ class RabbitMQ {
log.warn(`already subscribed to ${opts.type} exchange`)
return Promise.resolve()
}
return Promise
.resolve(
this.channel.assertExchange(
opts.exchange,
opts.type,
defaults(opts.exchangeOptions, RabbitMQ.AMQPLIB_EXCHANGE_DEFAULTS)
)
)
.then(() => {
log.info('exchange asserted')
let queueName = `${this.name}.${opts.exchange}`
if (opts.type === 'topic' && opts.routingKey) {
queueName = `${queueName}.${opts.routingKey}`
}
return Promise.resolve(
this.channel.assertQueue(
queueName,
defaults(opts.queueOptions, RabbitMQ.AMQPLIB_QUEUE_DEFAULTS)
)
)
})

log.trace('asserting queue for exchange')
let queueName = `${this.name}.${opts.exchange}`
if (opts.type === 'topic' && opts.routingKey) {
queueName = `${queueName}.${opts.routingKey}`
}

return this._assertQueue(queueName, opts.queueOptions)
.then((queueInfo) => {
const queue = queueInfo.queue
log.info({ queue: queue }, 'queue asserted')
Expand Down
17 changes: 9 additions & 8 deletions test/functional/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@ const testWorkerEmitter = testWorker.emitter
describe('Basic Example', () => {
let server
let rabbitmq

const testQueue = 'ponos-test:one'
before(() => {
rabbitmq = new RabbitMQ({})
const tasks = {
'ponos-test:one': testWorker
}
const tasks = {}
tasks[testQueue] = testWorker
rabbitmq = new RabbitMQ({
tasks: Object.keys(tasks)
})
server = new ponos.Server({ tasks: tasks })
return server.start()
return rabbitmq.connect()
.then(() => {
return rabbitmq.connect()
return server.start()
})
})

Expand All @@ -42,6 +43,6 @@ describe('Basic Example', () => {
eventName: 'task',
message: 'hello world'
}
rabbitmq.publishTask('ponos-test:one', job)
rabbitmq.publishTask(testQueue, job)
})
})
4 changes: 3 additions & 1 deletion test/functional/failing.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ describe('Basic Failing Task', () => {
before(() => {
sinon.spy(_Worker.prototype, 'run')
sinon.spy(_Worker.prototype, '_reportError')
rabbitmq = new RabbitMQ({})
const tasks = {
'ponos-test:one': testWorker
}
rabbitmq = new RabbitMQ({
tasks: Object.keys(tasks)
})
server = new ponos.Server({ tasks: tasks })
return server.start()
.then(() => {
Expand Down
4 changes: 3 additions & 1 deletion test/functional/retry-limit.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ describe('Basic Timeout Task', function () {
let testRecover
before(() => {
sinon.spy(_Worker.prototype, 'run')
rabbitmq = new RabbitMQ({})
const tasks = {
'ponos-test:one': {
task: () => {
Expand All @@ -34,6 +33,9 @@ describe('Basic Timeout Task', function () {
maxNumRetries: 5
}
}
rabbitmq = new RabbitMQ({
tasks: Object.keys(tasks)
})
server = new ponos.Server({ tasks: tasks })
return server.start()
.then(() => {
Expand Down
4 changes: 3 additions & 1 deletion test/functional/tid.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ describe('Basic Example', () => {
let rabbitmq

before(() => {
rabbitmq = new RabbitMQ({})
const tasks = {
'ponos-test:one': testWorker
}
rabbitmq = new RabbitMQ({
tasks: Object.keys(tasks)
})
server = new ponos.Server({ tasks: tasks })
return server.start()
.then(() => {
Expand Down
4 changes: 3 additions & 1 deletion test/functional/timeout.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ describe('Basic Timeout Task', function () {
before(() => {
sinon.spy(_Worker.prototype, 'run')
sinon.spy(_Bunyan.prototype, 'warn')
rabbitmq = new RabbitMQ({})
const tasks = {
'ponos-test:one': testWorker
}
rabbitmq = new RabbitMQ({
tasks: Object.keys(tasks)
})
server = new ponos.Server({ tasks: tasks })
return server.start()
.then(() => {
Expand Down
Loading

0 comments on commit e364cc3

Please sign in to comment.