Skip to content

Commit

Permalink
Support inflight messages and test
Browse files Browse the repository at this point in the history
  • Loading branch information
avinoamr committed Feb 11, 2016
1 parent f6b9326 commit a24d87d
Show file tree
Hide file tree
Showing 3 changed files with 921 additions and 8 deletions.
81 changes: 74 additions & 7 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ SQS.prototype.createQueue = function ( params, callback ) {
MaximumMessageSize: 262144,
MessageRetentionPeriod: 345600,
ReceiveMessageWaitTimeSeconds: 0,
VisibilityTimeout: 30
VisibilityTimeout: 30,
CreatedTimestamp: new Date().getTime()
}, params.Attributes );

queues[ qurl ] = {
name: qname,
attributes: attributes,
messages: []
}
Expand All @@ -55,6 +57,50 @@ SQS.prototype.createQueue = function ( params, callback ) {
})
}

SQS.prototype.getQueueAttributes = function ( params, callback ) {
callback = callback || function () {}
params = extend( {}, this.params, params );

var qurl = params.QueueUrl;
if ( !qurl ) {
callback( new Error( 'QueueUrl is required' ) )
return
}

var queue = queues[ qurl ];
if ( !queue ) {
callback( new Error( 'Queue doesn\'t exist for Url:' + qurl ) );
return;
}

// add the approximations
var now = new Date().getTime();
var visible = queue.messages.filter( function ( message ) {
return !message._inflight
|| message._inflight <= now;
}).length;
var invisible = queue.messages.length - visible;
queue.attributes.ApproximateNumberOfMessages = visible;
queue.attributes.ApproximateNumberOfMessagesNotVisible = invisible;

var names = params.AttributeNames || [ 'All' ];

var attributes = Object.keys( queue.attributes )
.filter( function ( key ) {
return names.indexOf( 'All' ) !== -1
|| names.indexOf( key ) !== -1
})
.reduce( function ( attributes, key ) {
attributes[ key ] = queue.attributes[ key ]
return attributes
}, {} );

setTimeout( function () {
callback( null, { Attributes: attributes } )
})

}

SQS.prototype.deleteQueue = function ( params, callback ) {
callback = callback || function () {}
params = extend( {}, this.params, params );
Expand All @@ -74,7 +120,8 @@ SQS.prototype.listQueues = function ( params, callback ) {
var prefix = params.QueueNamePrefix || '';
var urls = Object.keys( queues )
.filter( function ( qurl ) {
return qurl.indexOf( prefix ) === 0;
var name = queues[ qurl ].name;
return name.indexOf( prefix ) === 0;
})

setTimeout( function () {
Expand Down Expand Up @@ -138,7 +185,7 @@ SQS.prototype.sendMessage = function ( params, callback ) {
queue.messages.push({
MessageId: messageId,
Body: body,
MessageAttributes: extend( {}, params.MessageAttributes ) // copy
MessageAttributes: extend( {}, params.MessageAttributes ), // copy
Attributes: attributes
})

Expand All @@ -164,22 +211,40 @@ SQS.prototype.receiveMessage = function ( params, callback ) {
return;
}

var max = params.MaxNumberOfMessages || 1;
var max = params.MaxNumberOfMessages === undefined
? 1
: params.MaxNumberOfMessages;

if ( max > 10 || max < 1 ) {
callback( new Error( 'MaxNumberOfMessages out of range' ) );
return;
}

var vis = params.VisibilityTimeout === undefined
? queue.attributes.VisibilityTimeout
: params.VisibilityTimeout;

var now = new Date().getTime();
var inflight = now + ( vis * 1000 );

var messages = queue.messages;
var received = messages.slice( 0, max )
var received = messages
.filter( function ( message ) {
return !message._inflight
|| message._inflight <= now;
})
.slice( 0, max )
.map( function ( message ) {
message._inflight = inflight;
message.ReceiptHandle = uuid.v4()
message.Attributes.ApproximateReceiveCount += 1;
if ( !message.Attributes.ApproximateFirstReceiveTimestamp ) {
var timestamp = new Date().getTime();
message.Attributes.ApproximateFirstReceiveTimestamp = timestamp;
}

message = extend( {}, message );
delete message._inflight;
return message;
});

Expand Down Expand Up @@ -211,9 +276,11 @@ SQS.prototype.deleteMessage = function ( params, callback ) {
return;
}

queue.messages = queue.messages.filter( function ( message ) {
return message.ReceiptHandle !== receipt;
var message = queue.messages.filter( function ( message ) {
return message.ReceiptHandle === receipt;
})
var idx = queue.messages.indexOf( message );
queue.messages.splice( idx, 1 );

setTimeout( function () {
callback( null, {} )
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "mock-sqs",
"version": "1.0.3",
"version": "1.0.4",
"description": "Mocking library for Amazon's SQS Node.js SDK",
"main": "index.js",
"scripts": {
Expand Down
Loading

0 comments on commit a24d87d

Please sign in to comment.