diff --git a/lib/agent.js b/lib/agent.js index f55f1f9f4..a3ee100d6 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -48,6 +48,10 @@ function Agent(backend, stream) { // request if the client disconnects ungracefully. This is a // map of channel -> id -> request this.presenceRequests = Object.create(null); + // Keep track of the latest known Doc version, so that we can avoid fetching + // ops to transform presence if not needed + this.latestDocVersionStreams = Object.create(null); + this.latestDocVersions = Object.create(null); // We need to track this manually to make sure we don't reply to messages // after the stream was closed. @@ -108,6 +112,12 @@ Agent.prototype._cleanup = function() { emitter.destroy(); } this.subscribedQueries = Object.create(null); + + for (var collection in this.latestDocVersionStreams) { + var streams = this.latestDocVersionStreams[collection]; + for (var id in streams) streams[id].destroy(); + } + this.latestDocVersionStreams = Object.create(null); }; /** @@ -115,17 +125,8 @@ Agent.prototype._cleanup = function() { * _sendOp() */ Agent.prototype._subscribeToStream = function(collection, id, stream) { - if (this.closed) return stream.destroy(); - - var streams = this.subscribedDocs[collection] || (this.subscribedDocs[collection] = Object.create(null)); - - // If already subscribed to this document, destroy the previously subscribed stream - var previous = streams[id]; - if (previous) previous.destroy(); - streams[id] = stream; - var agent = this; - stream.on('data', function(data) { + this._subscribeMapToStream(this.subscribedDocs, collection, id, stream, function(data) { if (data.error) { // Log then silently ignore errors in a subscription stream, since these // may not be the client's fault, and they were not the result of a @@ -135,13 +136,26 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) { } agent._onOp(collection, id, data); }); +}; + +Agent.prototype._subscribeMapToStream = function(map, collection, id, stream, dataHandler) { + if (this.closed) return stream.destroy(); + + var streams = map[collection] || (map[collection] = Object.create(null)); + + // If already subscribed to this document, destroy the previously subscribed stream + var previous = streams[id]; + if (previous) previous.destroy(); + streams[id] = stream; + + stream.on('data', dataHandler); stream.on('end', function() { // The op stream is done sending, so release its reference - var streams = agent.subscribedDocs[collection]; + var streams = map[collection]; if (!streams || streams[id] !== stream) return; delete streams[id]; if (util.hasKeys(streams)) return; - delete agent.subscribedDocs[collection]; + delete map[collection]; }); }; @@ -794,25 +808,74 @@ Agent.prototype._broadcastPresence = function(presence, callback) { collection: presence.c }; var start = Date.now(); - backend.trigger(backend.MIDDLEWARE_ACTIONS.receivePresence, this, context, function(error) { + + var subscriptionUpdater = presence.p === null ? + this._unsubscribeDocVersion.bind(this) : + this._subscribeDocVersion.bind(this); + + subscriptionUpdater(presence.c, presence.d, function(error) { if (error) return callback(error); - var requests = presenceRequests[presence.ch] || (presenceRequests[presence.ch] = Object.create(null)); - var previousRequest = requests[presence.id]; - if (!previousRequest || previousRequest.pv < presence.pv) { - presenceRequests[presence.ch][presence.id] = presence; - } - backend.transformPresenceToLatestVersion(agent, presence, function(error, presence) { + backend.trigger(backend.MIDDLEWARE_ACTIONS.receivePresence, agent, context, function(error) { if (error) return callback(error); - var channel = agent._getPresenceChannel(presence.ch); - agent.backend.pubsub.publish([channel], presence, function(error) { - if (error) return callback(error); - backend.emit('timing', 'presence.broadcast', Date.now() - start, context); + var requests = presenceRequests[presence.ch] || (presenceRequests[presence.ch] = Object.create(null)); + var previousRequest = requests[presence.id]; + if (!previousRequest || previousRequest.pv < presence.pv) { + presenceRequests[presence.ch][presence.id] = presence; + } + + var transformer = function(agent, presence, callback) { callback(null, presence); + }; + + var latestDocVersion = util.dig(agent.latestDocVersions, presence.c, presence.d); + var presenceIsUpToDate = presence.v === latestDocVersion; + if (!presenceIsUpToDate) { + transformer = backend.transformPresenceToLatestVersion.bind(backend); + } + + transformer(agent, presence, function(error, presence) { + if (error) return callback(error); + var channel = agent._getPresenceChannel(presence.ch); + agent.backend.pubsub.publish([channel], presence, function(error) { + if (error) return callback(error); + backend.emit('timing', 'presence.broadcast', Date.now() - start, context); + callback(null, presence); + }); }); }); }); }; +Agent.prototype._subscribeDocVersion = function(collection, id, callback) { + if (!collection || !id) return callback(); + + var latestDocVersions = this.latestDocVersions; + var isSubscribed = util.dig(latestDocVersions, collection, id) !== undefined; + if (isSubscribed) return callback(); + + var agent = this; + this.backend.subscribe(this, collection, id, null, function(error, stream, snapshot) { + if (error) return callback(error); + + var versions = latestDocVersions[collection] || (latestDocVersions[collection] = Object.create(null)); + versions[id] = snapshot.v; + + agent._subscribeMapToStream(agent.latestDocVersionStreams, collection, id, stream, function(op) { + // op.v behind snapshot.v by 1 + latestDocVersions[collection][id] = op.v + 1; + }); + + callback(); + }); +}; + +Agent.prototype._unsubscribeDocVersion = function(collection, id, callback) { + var stream = util.dig(this.latestDocVersionStreams, collection, id); + if (stream) stream.destroy(); + util.digAndRemove(this.latestDocVersions, collection, id); + util.nextTick(callback); +}; + Agent.prototype._createPresence = function(request) { return { a: ACTIONS.presence, diff --git a/test/client/presence/doc-presence.js b/test/client/presence/doc-presence.js index 24425aa8a..45772b9aa 100644 --- a/test/client/presence/doc-presence.js +++ b/test/client/presence/doc-presence.js @@ -5,6 +5,7 @@ var types = require('../../../lib/types'); var presenceTestType = require('./presence-test-type'); var errorHandler = require('../../util').errorHandler; var PresencePauser = require('./presence-pauser'); +var sinon = require('sinon'); types.register(presenceTestType.type); describe('DocPresence', function() { @@ -297,6 +298,23 @@ describe('DocPresence', function() { ], done); }); + it('does not call getOps() when presence is already up-to-date', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + doc1.fetch.bind(doc1), // Ensure up-to-date + function(next) { + sinon.spy(Backend.prototype, 'getOps'); + next(); + }, + localPresence1.submit.bind(localPresence1, {index: 1}), + function(next) { + expect(Backend.prototype.getOps).not.to.have.been.called; + next(); + } + ], done); + }); + // This test case attempts to force us into a tight race condition corner case: // 1. doc1 sends presence, as well as submits an op // 2. doc2 receives the op first, followed by the presence, which is now out-of-date