Skip to content

Commit

Permalink
Merge pull request #1 from jmspring/master
Browse files Browse the repository at this point in the history
Update to latest
  • Loading branch information
noodlefrenzy committed Dec 2, 2014
2 parents 73e00c2 + 4457d2a commit 594ae99
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 10 deletions.
99 changes: 93 additions & 6 deletions lib/eventhub.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ var qpid = require('qpid'),
crypto = require('crypto'),
utf8 = require('utf8'),
xml2js = require('xml2js'),
request = require('request');
request = require('request')
storage = require('./storage');

function createSharedAccessToken(namespace, hubName, saName, saKey) {
if (!namespace || !hubName || !saName || !saKey) {
Expand Down Expand Up @@ -57,8 +58,29 @@ module.exports.create_instance = function(ns, name, user, pass) {
var baseamqpuri = 'amqps://' + encodeURIComponent(username) + ':' + encodeURIComponent(password) +
'@' + ehnamespace + '.servicebus.windows.net/' + ehname + '/';
var partitions = null;
var statestore = null;
var storagename = null;
var storagekey = null;
var storageTimer = null;
var stateUpdateFrequency = 30000; // udpate state every thirty seconds

function update_partition_state(partitionId, key, value) {
if(partitions) {
for(var i = 0; i < partitions.length; i++) {
if(partitions[i].id == partitionId) {
if(!partitions[i].state) {
partitions[i].state = {};
}
partitions[i].state[key] = value;
break;
}
}
}
}

var instance = {
var instance = {
consumergroup: null,

info: function(callback) {
request({
'uri': basehttpuri + 'ConsumerGroups/$Default/Partitions',
Expand All @@ -72,7 +94,26 @@ module.exports.create_instance = function(ns, name, user, pass) {
if(partitions == null) {
partitions = result;
}
callback(null, result);
if(statestore && consumergroup) {

statestore.retrieve_eventhub_state(ehnamespace, ehname, consumergroup, result, function(storeresult) {
if(storeresult) {
for(var i = 0; i < storeresult.length; i++) {
var id = storeresult[i].id;
var state = storeresult[i].state;
for(var j = 0; j < partitions.length; j++) {
if(partitions[j].id == id) {
partitions[j]["state"] = state;
break;
}
}
}
}
callback(null, result);
});
} else {
callback(null, result);
}
});
} else {
var code = null;
Expand All @@ -89,8 +130,20 @@ module.exports.create_instance = function(ns, name, user, pass) {
var messenger = new qpid.proton.Messenger();
var message_callback = message_cb || null;
var subscribe_callback = subscribe_cb || null;
consumergroup = group;

messenger.on('message', function(message, subscription) {
var partitionId = subscription.substring(subscription.lastIndexOf("/") + 1)
if(message.annotations && (message.annotations.length == 2)) {
var annotationMap = message.annotations[1];
for(var i = 0; i < annotationMap.length / 2; i++) {
if(annotationMap[2 * i][1] == 'x-opt-offset') {
update_partition_state(partitionId, 'x-opt-offset', annotationMap[2 * i + 1][1]);
break;
}
}
}

if(message_callback) {
message_callback(message, subscription);
}
Expand All @@ -101,18 +154,39 @@ module.exports.create_instance = function(ns, name, user, pass) {
subscribe_callback(url);
}
});

function process_events() {
// subscribe to each partition
for(var i = 0; i < partitions.length; i++) {
var partitionuri = consumerbaseuri + i;
messenger.subscribe(partitionuri, { }, subscribe_callback);
var partitionuri = consumerbaseuri + partitions[i].id;
var filter = { };
if(partitions[i].state && partitions[i].state["x-opt-offset"]) {
filter = [ [ "symbol", "apache.org:selector-filter:string" ], [ "described", [ "symbol", "apache.org:selector-filter:string" ], ["string", "amqp.annotation.x-opt-offset > '" + partitions[i].state["x-opt-offset"] + "'" ] ] ];
}
messenger.subscribe(partitionuri, { sourceFilter: filter });
}

// receiving
messenger.receive();
}

function update_storage_state() {
if(statestore && partitions && partitions.length > 0) {
var update = false;
for(var i = 0; i < partitions.length; i++) {
if(partitions[i].state) {
update = true;
break;
}
}
if(update) {
statestore.store_eventhub_state(ehnamespace, ehname, consumergroup, partitions, function(result) {
// TODO -- do something?
})
}
}
}

var processor_instance = {
process: function() {
// we need to get the partition information
Expand All @@ -131,6 +205,19 @@ module.exports.create_instance = function(ns, name, user, pass) {

message_handler: function(cb) {
message_callback = cb;
},

add_store: function(storename, storekey) {
storagename = storename;
storagekey = storekey;
statestore = storage.azure_store(storename, storekey);
storageTimer = setInterval(function() {
update_storage_state();
}, stateUpdateFrequency);
},

state: function() {
return partitions;
}
}

Expand Down
89 changes: 89 additions & 0 deletions lib/storage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
var azure = require('azure-storage'),
uuid = require('node-uuid'),
crypto = require('crypto'),
async = require('async');

module.exports.azure_store = function(name, key) {
var accountUrl = name + ".table.core.windows.net";
var accountName = name;
var accountKey = key;

function sha1_hash(val) {
var shasum = crypto.createHash('sha1');
shasum.update(val);
return shasum.digest('hex');
}

function tablename(ehnamespace, ehname) {
return "tbl" + sha1_hash(ehnamespace + ":" + ehname);
}

function partitionkey(consumergroup) {
return "pk" + sha1_hash(consumergroup);
}

function rowkey(partition) {
return "rk" + sha1_hash("partition:" + partition);
}

function retrieve_partition_state(tablesvc, ehnamespace, ehname, consumergroup, partitionid, callback) {
tablesvc.retrieveEntity(tablename(ehnamespace, ehname), partitionkey(consumergroup), rowkey(partitionid), function(error, result, response) {
callback(error, result, response);
});
}

var store = {
store_eventhub_state: function(ehnamespace, ehname, consumergroup, partitioninfo, callback) {
var tablesvc = azure.createTableService(accountName, accountKey);
console.log("HERE 1");
if(tablesvc) {
var batch = new azure.TableBatch();
var entityGen = azure.TableUtilities.entityGenerator;
for(var i = 0; i < partitioninfo.length; i++) {
var entity = {
PartitionKey: entityGen.String(partitionkey(consumergroup)),
RowKey: entityGen.String(rowkey(partitioninfo[i].id)),
State: entityGen.String(JSON.stringify(partitioninfo[i].state))
};
batch.insertOrReplaceEntity(entity);
}
tablesvc.executeBatch(tablename(ehnamespace, ehname), batch, function(error, result, response) {
callback(error, result, response);
});
} else {
// TODO -- error out
}
},

retrieve_eventhub_state: function(ehnamespace, ehname, consumergroup, partitioninfo, callback) {
var tablesvc = azure.createTableService(accountName, accountKey);
if(tablesvc) {
var asyncTasks = [];
var partitionResults = [];
partitioninfo.forEach(function(partition) {
var partitionid = partition.id;
asyncTasks.push(function(cb) {
retrieve_partition_state(tablesvc, ehnamespace, ehname, consumergroup, partitionid, function(error, result, response) {
var partitionState = { id: partitionid };
if(!error) {
partitionState["state"] = JSON.parse(result.State["_"]);
} else {
if(error.statusCode && error.statusCode == 404) {
partitionState["state"] = {};
} else {
partitionState["error"] = error;
}
}
cb(null, partitionState);
});
});
});
async.parallel(asyncTasks, function(err, results) {
callback(results);
});
}
}
};

return store;
}
10 changes: 6 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-sbus",
"version": "0.0.1",
"version": "0.0.2",
"description": "A library for using Azure Service Bus",
"main": "index.js",
"scripts": {
Expand All @@ -23,11 +23,13 @@
"url": "https://github.com/jmspring/node-sbus/issues"
},
"dependencies": {
"node-qpid": "git://github.com/jmspring/node-qpid#qpid_proton_0.8_support",
"node-qpid": "git://github.com/jmspring/node-qpid#eventhub-base-support-0.1",
"azure-storage": "*",
"async": "*",
"node-uuid": "*",
"request": "*",
"utf8": "*",
"xml2js": "*",
"eyes": "*",
"minimist": "*"
"optimist": "*"
}
}
52 changes: 52 additions & 0 deletions test/test_eventhub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/env node
var eventhub = require("..").eventhub;
var optimist = require('optimist')
.options('e', { alias : 'eventhub' })
.options('n', { alias : 'namespace' })
.options('a', { alias : 'accessuser' })
.options('p', { alias : 'accesspass' })
.options('g', { alias : 'consumergroup', default : '$default' })
.options('s', { alias : 'storagetable', default : null })
.options('k', { alias : 'storagekey', default : null })
.demand(['e', 'n', 'a', 'p'])
.usage("$0 -e eventhub -n eventhub_namespace -a eventhub_username -p eventhub_password [ -g consumergroup ] [ -s storagetable -k storagekey ]")
;

var ehnamespace = optimist.argv.namespace;
var ehname = optimist.argv.eventhub;
var access_user = optimist.argv.accessuser;
var access_pass = optimist.argv.accesspass;
var consumer_group = optimist.argv.consumergroup;

var eh = eventhub.create_instance(ehnamespace, ehname, access_user, access_pass);

var ehp = eh.event_processor_instance(consumer_group, function(message, sub) {
console.log("subscription -- " + sub);
if(message.body) {
console.log(message.body);
}
if(message.properties) {
console.log(message.properties);
}
if(message.annotations) {
console.log(message.annotations);
}
});

// indicate which partitions were subscribed to
ehp.subscribe_handler(function(url) {
console.log("subscribed to -- " + url);
});

// if specified, add a storage account
if(optimist.argv.storagetable && optimist.argv.storagekey) {
ehp.add_store(optimist.argv.storagetable, optimist.argv.storagekey);
}

ehp.process();

// show state of event processor every 30 seconds
setInterval(function() {
var state = ehp.state();
console.log(JSON.stringify(state, null, 2));
}, 30000);

0 comments on commit 594ae99

Please sign in to comment.