From 9c65613f71e018dbf11481062d16914707e3025f Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Tue, 17 Oct 2023 19:07:03 -0700 Subject: [PATCH] feat: treat any incoming message as hearbeat --- packages/core/CHANGELOG.md | 4 ++++ packages/core/action_cable/index.js | 7 +++++- packages/core/action_cable_ext/index.js | 23 +++++++++++-------- .../core/create-cable/integration.test.ts | 9 +++++--- 4 files changed, 29 insertions(+), 14 deletions(-) diff --git a/packages/core/CHANGELOG.md b/packages/core/CHANGELOG.md index 799caf8..581d8ef 100644 --- a/packages/core/CHANGELOG.md +++ b/packages/core/CHANGELOG.md @@ -2,6 +2,10 @@ ## master +- Treat any incoming message as keepalive. ([@palkan][]) + + See the corresponding [Rails PR](https://github.com/rails/rails/pull/49168). + ## 0.7.9 (2023-10-13) - Set `cable.sessionId` when using Action Cable (base) protocol. ([@palkan][]) diff --git a/packages/core/action_cable/index.js b/packages/core/action_cable/index.js index 3973e19..a7b5255 100644 --- a/packages/core/action_cable/index.js +++ b/packages/core/action_cable/index.js @@ -150,7 +150,12 @@ export class ActionCableProtocol { let { type, identifier, message, reason, reconnect } = msg - if (type === 'ping') return this.cable.keepalive(msg.message) + if (type === 'ping') { + return this.cable.keepalive(msg.message) + } else { + // Any incoming message may be considered as a heartbeat + this.cable.keepalive() + } if (type === 'welcome') { let sessionId = msg.sid diff --git a/packages/core/action_cable_ext/index.js b/packages/core/action_cable_ext/index.js index 476938b..464a002 100644 --- a/packages/core/action_cable_ext/index.js +++ b/packages/core/action_cable_ext/index.js @@ -37,16 +37,6 @@ export class ActionCableExtendedProtocol extends ActionCableProtocol { return super.receive(msg) } - if (type === 'confirm_history') { - this.logger.debug('history result received', msg) - return - } - - if (type === 'reject_history') { - this.logger.warn('failed to retrieve history', msg) - return - } - if (type === 'ping') { if (!this.restoreSince === false) { this.restoreSince = now() @@ -57,6 +47,19 @@ export class ActionCableExtendedProtocol extends ActionCableProtocol { } return this.cable.keepalive(msg.message) + } else { + // Any incoming message may be considered as a heartbeat + this.cable.keepalive() + } + + if (type === 'confirm_history') { + this.logger.debug('history result received', msg) + return + } + + if (type === 'reject_history') { + this.logger.warn('failed to retrieve history', msg) + return } if (type === 'welcome') { diff --git a/packages/core/create-cable/integration.test.ts b/packages/core/create-cable/integration.test.ts index 7330988..8eac976 100644 --- a/packages/core/create-cable/integration.test.ts +++ b/packages/core/create-cable/integration.test.ts @@ -28,7 +28,7 @@ class CableTransport extends TestTransport { this.subscriptions = {} this.sendLater({ type: 'welcome' }) this.pingTid = setInterval(() => { - this.sendLater({ type: 'ping' }) + this.sendLater({ type: 'ping', message: Date.now() }) }, 500) return promise } @@ -150,7 +150,8 @@ describe('Action Cable protocol communication', () => { reject(Error('Timed out to received pings')) }, 1000) - cable.on('keepalive', () => { + cable.on('keepalive', msg => { + if (!msg) return clearTimeout(tid) resolve() }) @@ -176,7 +177,9 @@ describe('Action Cable protocol communication', () => { reject(Error('Timed out to received pings')) }, 1000) - cable.on('keepalive', async () => { + cable.on('keepalive', async msg => { + if (!msg) return + clearTimeout(tid) await waitSec(0)