Skip to content

Commit

Permalink
feat: ext protocol pongs
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Aug 10, 2023
1 parent cc50ef0 commit ac3f8ef
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master

- `core`: Add PONGs support to the extended protocol and allow passing protocol options via `protocolOptions`.

## 0.7.3 (2023-08-09)

- `core`: Handle WebSocket error messages on close gracefully.
Expand Down
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,16 @@ export default createCable({protocol: 'actioncable-v1-ext-json'})

#### Loading initial history on client initialization

To catch up messages broadcasted during the initial page load (or client-side application initialization), you can specify the `historyTimestamp` option to retrieve messages after the specified time along with subscription requests. The value must be a UTC timestamp (the number of seconds).
To catch up messages broadcasted during the initial page load (or client-side application initialization), you can specify the `historyTimestamp` option to retrieve messages after the specified time along with subscription requests. The value must be a UTC timestamp (the number of seconds). For example:

```js
export default createCable({
protocol: 'actioncable-v1-ext-json',
protocolOptions: {
historyTimestamp: 1614556800 // 2021-03-01 00:00:00 UTC
}
})
```

By default, we use the current time (`Date.now() / 1000`). For web applications, you can specify the value using a meta tag with the name "action-cable-history-timestamp" (or "cable-history-timestamp"). For example, in Rails, you can add the following to your application layout

Expand All @@ -305,6 +314,21 @@ This is a recommended way to use this feature with Hotwire applications, where i

You can also disable retrieving history since the specified time completely by setting the `historyTimestamp` option to `false`.

#### PONGs support

The extended protocol also support sending `pong` commands in response to `ping` messages. A server (AnyCable-Go) keeps track of pongs and disconnect the client if no pongs received in time. This helps to identify broken connections quicker.

You must opt-in to use this feature by setting the `pongs` option to `true`:

```js
export default createCable({
protocol: 'actioncable-v1-ext-json',
protocolOptions: {
pongs: true
}
})
```

### Refreshing authentication tokens

If you use a token-based authentication with expirable tokens (e.g., like [AnyCable PRO JWT identification](https://docs.anycable.io/anycable-go/jwt_identification)), you need a mechanism to refresh tokens for a long-lived clients (to let them reconnect in case of a connection failure).
Expand Down
2 changes: 2 additions & 0 deletions packages/core/action_cable_ext/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ export type ExtendedOptions =
| Partial<{
// Initial timestamp to use when requesting stream history during subscription
historyTimestamp: number | false
// Enable PONGs
pongs: boolean
}>
| Options

Expand Down
12 changes: 12 additions & 0 deletions packages/core/action_cable_ext/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export class ActionCableExtendedProtocol extends ActionCableProtocol {
this.restoreSince = opts.historyTimestamp
if (this.restoreSince === undefined) this.restoreSince = now()
this.sessionId = undefined
this.sendPongs = opts.pongs
}

receive(msg) {
Expand Down Expand Up @@ -50,6 +51,11 @@ export class ActionCableExtendedProtocol extends ActionCableProtocol {
if (!this.restoreSince === false) {
this.restoreSince = now()
}

if (this.sendPongs) {
this.sendPong()
}

return this.cable.keepalive(msg.message)
}

Expand Down Expand Up @@ -139,4 +145,10 @@ export class ActionCableExtendedProtocol extends ActionCableProtocol {

return { stream, epoch, offset }
}

// Send pongs asynchrounously—no need to block the main thread
async sendPong() {
await new Promise(resolve => setTimeout(resolve, 0))
this.cable.send({ command: 'pong' })
}
}
23 changes: 23 additions & 0 deletions packages/core/action_cable_ext/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,29 @@ describe('receive', () => {
expect(logger.warnings).toHaveLength(1)
expect(logger.warnings[0].message).toEqual('unknown message type: custom')
})

describe('pongs', () => {
beforeEach(() => {
protocol = new ActionCableExtendedProtocol({
logger,
historyTimestamp: false,
pongs: true
})
protocol.attached(cable)
})

it('sends pong in response to ping', async () => {
protocol.receive({ type: 'ping', message: '42' })
expect(cable.lastPingedAt).toEqual(42)

await new Promise(resolve => setTimeout(resolve, 0))

expect(cable.mailbox).toHaveLength(1)
expect(cable.mailbox[0]).toMatchObject({
command: 'pong'
})
})
})
})

describe('history', () => {
Expand Down
22 changes: 21 additions & 1 deletion packages/core/create-cable/errors.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { createCable, createConsumer } from './index.js'
import { ActionCableExtendedProtocol } from '../action_cable_ext/index.js'
import { CreateOptions, createCable, createConsumer } from './index.js'

interface MyMessage {
payload: {
Expand All @@ -17,3 +18,22 @@ consumer.subscriptions.create('TestChannel', { received })

// THROWS Type '(_message: boolean) => void' is not assignable to type '(data: Message) => void'
consumer.subscriptions.create('TestChannel', { received: receivedIncompatible })

const options: Partial<CreateOptions<'actioncable-v1-json'>> = {
protocol: 'actioncable-v1-json',
// THROWS Type '{ pongs: true; }' is not assignable
protocolOptions: { pongs: true }
}

const options2: Partial<CreateOptions<ActionCableExtendedProtocol>> = {
protocol: new ActionCableExtendedProtocol(),
// THROWS Type '{ pongs: true; }' is not assignable to type 'undefined'.
protocolOptions: { pongs: true }
}

const extOptions: Partial<CreateOptions<'actioncable-v1-ext-json'>> = {
protocol: 'actioncable-v1-ext-json',
protocolOptions: { pongs: true, historyTimestamp: false }
}

createCable('ws://whatever.com', options)
18 changes: 15 additions & 3 deletions packages/core/create-cable/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,31 @@ import { Monitor, ReconnectStrategy } from '../monitor/index.js'
import { Cable } from '../cable/index.js'
import { Protocol } from '../protocol/index.js'
import { Channel, Message, ChannelParamsMap } from '../channel/index.js'
import { Options } from '../action_cable/index.js'
import { ExtendedOptions } from '../action_cable_ext/index.js'

export type ExtendedProtocolID = 'actioncable-v1-ext-json'

export type ProtocolID =
| 'actioncable-v1-json'
| 'actioncable-v1-ext-json'
| 'actioncable-v1-msgpack'
| 'actioncable-v1-protobuf'
| ExtendedProtocolID

export type TokenRefresher = (transport: Transport) => Promise<void>

export interface CreateOptions {
protocol: ProtocolID | Protocol
type ProtocolOptions<T extends Protocol | ProtocolID> = T extends Protocol
? never
: T extends ExtendedProtocolID
? ExtendedOptions
: Options

export interface CreateOptions<P extends ProtocolID | Protocol> {
protocol: P
subprotocol: string

protocolOptions: ProtocolOptions<P>

transport: Transport
/* eslint-disable @typescript-eslint/no-explicit-any */
websocketImplementation: any
Expand Down
13 changes: 10 additions & 3 deletions packages/core/create-cable/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ export function createCable(url, opts) {
maxReconnectAttempts,
subprotocol,
tokenRefresher,
historyTimestamp
historyTimestamp,
protocolOptions
} = opts

logger = logger || new NoopLogger(logLevel)
Expand All @@ -60,10 +61,16 @@ export function createCable(url, opts) {
let protocolName = protocol.substring(0, protocol.lastIndexOf('-'))
let protocolEncoderName = protocol.substring(protocol.lastIndexOf('-') + 1)

protocolOptions = protocolOptions || {}

if (protocolName === 'actioncable-v1') {
protocol = new ActionCableProtocol({ logger })
protocol = new ActionCableProtocol({ logger, ...protocolOptions })
} else if (protocolName === 'actioncable-v1-ext') {
protocol = new ActionCableExtendedProtocol({ logger, historyTimestamp })
protocol = new ActionCableExtendedProtocol({
logger,
historyTimestamp,
...protocolOptions
})
} else {
throw Error(`Protocol is not supported yet: ${protocol}`)
}
Expand Down
41 changes: 38 additions & 3 deletions packages/core/create-cable/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import {
CreateOptions
} from '../index.js'
import { TestTransport } from '../transport/testing'
import { ProtocolID } from './index.js'

class CableTransport extends TestTransport {
pingTid?: any
subscriptions: any
pongsCount: number = 0

constructor(url: string) {
super(url)
Expand All @@ -39,7 +41,9 @@ class CableTransport extends TestTransport {
let identifier = msg.identifier
let command = msg.command

if (command === 'subscribe') {
if (command === 'pong') {
this.pongsCount++
} else if (command === 'subscribe') {
if (!identifier) {
this.sendLater({ type: 'reject_subscription', identifier })
return
Expand Down Expand Up @@ -104,13 +108,14 @@ let waitSec = (val?: number) => {
}

describe('Action Cable protocol communication', () => {
let transport: TestTransport
let transport: CableTransport
let cable: Cable
let opts: Partial<CreateOptions<ProtocolID>>

beforeEach(() => {
transport = new CableTransport('ws://anycable.test')

let opts: Partial<CreateOptions> = {
opts = {
transport
}

Expand Down Expand Up @@ -156,6 +161,36 @@ describe('Action Cable protocol communication', () => {
await keepalivePromise
})

it('responds with pongs when enabled', async () => {
let extOpts: Partial<CreateOptions<'actioncable-v1-ext-json'>> = {
protocol: 'actioncable-v1-ext-json',
protocolOptions: { pongs: true },
transport: opts.transport,
logger: opts.logger
}

cable = createCable('ws://example', extOpts)

let keepalivePromise = new Promise<void>((resolve, reject) => {
let tid = setTimeout(() => {
reject(Error('Timed out to received pings'))
}, 1000)

cable.on('keepalive', async () => {
clearTimeout(tid)

await waitSec(0)
resolve()
})
})

cable.connect()

await keepalivePromise

expect(transport.pongsCount).toBeGreaterThan(0)
})

describe('basic race conditions', () => {
it('subscribed - unsubscribe + subscribe + usubscribe', async () => {
cable.connect()
Expand Down

0 comments on commit ac3f8ef

Please sign in to comment.