diff --git a/lib/adapters/stomp.js b/lib/adapters/stomp.js index 191c1a6..1d32f93 100644 --- a/lib/adapters/stomp.js +++ b/lib/adapters/stomp.js @@ -110,6 +110,7 @@ var PREFIX = { pull: 'queue', pub: 'topic', sub: 'topic', + tempPull: 'temp-queue' }; function stompOpen(self, type, connection, name) { @@ -143,7 +144,7 @@ function topicSelector(pattern) { return 'topickey LIKE \''+pattern+'.%\' OR topickey = \''+pattern+'\''; } -function stompSubscribe(self, pattern, callback) { +function stompSubscribe(self, pattern, callback, stompHeaders) { var selector = topicSelector(pattern); if (callback) { @@ -151,7 +152,7 @@ function stompSubscribe(self, pattern, callback) { } c(self)._doWhenReady(function pullSubscribe(done) { - var headers = {}; + var headers = stompHeaders || {}; dbg('pull subscribe start+done'); @@ -208,9 +209,9 @@ function decode(body, contentType) { return JSON.parse(body); } -function stompPublish(self, msg, topic) { +function stompPublish(self, msg, topic, stompHeaders) { c(self)._doWhenReady(function pushPublish(done) { - var headers = {}; + var headers = stompHeaders || {}; dbg(self.type+' publish start', msg); var encoding = encode(msg); @@ -241,8 +242,8 @@ function PushStomp(connection, name) { util.inherits(PushStomp, events.EventEmitter); -PushStomp.prototype.publish = function(msg) { - return stompPublish(this, msg); +PushStomp.prototype.publish = function(msg, headers) { + return stompPublish(this, msg, null, headers); }; PushStomp.prototype.close = stompClose; @@ -257,8 +258,8 @@ function PullStomp(connection, name) { util.inherits(PullStomp, events.EventEmitter); -PullStomp.prototype.subscribe = function(callback) { - return stompSubscribe(this, null, callback); +PullStomp.prototype.subscribe = function(callback, headers) { + return stompSubscribe(this, null, callback, headers); }; PullStomp.prototype.close = stompClose; @@ -267,6 +268,23 @@ CreateStomp.prototype.createPullQueue = function(name, callback) { return new PullStomp(this, name, callback); }; +//-- Pull temp-queue + +function PullTempStomp(connection, name) { + stompOpen(this, 'tempPull', connection, name); +} + +util.inherits(PullTempStomp, events.EventEmitter); + +PullTempStomp.prototype.subscribe = function(callback, headers) { + return stompSubscribe(this, null, callback, headers); +}; + +PullTempStomp.prototype.close = stompClose; + +CreateStomp.prototype.createTempPullQueue = function(name, callback) { + return new PullTempStomp(this, name, callback); +}; //-- Pub/Sub Queue