Skip to content

Commit

Permalink
feat(turbo-stream): delayedUnsubscribe option
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Jun 4, 2024
1 parent 560d7c7 commit b15a426
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 2 deletions.
2 changes: 2 additions & 0 deletions packages/turbo-stream/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master

- Add `delayedUnsubscribe` option. ([@palkan][])

## 0.6.0

- Require `@anycable/core` ^0.9.0.
Expand Down
13 changes: 13 additions & 0 deletions packages/turbo-stream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ One subtle but important difference is that **`@anycable/turbo-stream` does not

## Advanced configuration

### Delayed unsubscribe

When using Turbo Drive for navigation, it's common to have a stream source element attached to the same stream to appear in both old and new HTML. In order to avoid re-subscription to the underlying stream, we can keep the subscription during navigation by postponing the `unsubscribe` call (or more precisely, `channel.disconnect()`). Thus, we can avoid unnecessary Action Cable commands and avoid losing messages arrived in-between resubscription. You must opt-in to use this features:

```js
import { start } from "@anycable/turbo-stream"
import cable from "cable"

start(cable, { delayedUnsubscribe: true }) // default is 300ms

start(cable, { delayedUnsubscribe: 1000 }) // Custom number of milliseconds
```

### Attaching `X-Socket-ID` header to Turbo requests

You can automatically add a header to all Turbo requests with the current socket session ID. This can be used to perform **broadcasts to others** (see [Rails integration docs](https://docs.anycable.io/rails/getting_started?id=action-cable-extensions)):
Expand Down
3 changes: 3 additions & 0 deletions packages/turbo-stream/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ export interface StartOptions {
tagName: string
channelClass: typeof TurboChannel
requestSocketIDHeader: boolean | string
// Number of milliseconds to wait before unsubscribing from a channel after the element has been disconnected.
// If set to `true`, the default value (300ms) is used.
delayedUnsubscribe: boolean | number
}

export const DEFAULT_SOCKET_HEADER: string
Expand Down
74 changes: 73 additions & 1 deletion packages/turbo-stream/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const broadcast = (stream: string, message: string) => {
}

afterEach(() => {
document.documentElement.removeAttribute('data-turbo-preview')
wss.close()
for (let ws of wss.clients) {
ws.terminate()
Expand Down Expand Up @@ -128,7 +129,9 @@ describe('<turbo-stream-source>', () => {
`

let cable = createCable(`ws://localhost:${port}`)
start(cable, { tagName: 'turbo-cable-stream-source-connected' })
start(cable, {
tagName: 'turbo-cable-stream-source-connected'
})

let channel = cable.hub.channels[0]
expect(channel).not.toBeUndefined()
Expand Down Expand Up @@ -231,4 +234,73 @@ describe('<turbo-stream-source>', () => {
(event.detail.fetchOptions.headers as any)['X-TURBO-SOCKET']
).toEqual('42')
})

it('with delayedUnsubscribe', async () => {
document.body.innerHTML = `
<turbo-cable-stream-source-delayed signed-stream-name='test-delayed' channel='TurboChannel'></turbo-cable-stream-source-delayed>
`

let cable = createCable(`ws://localhost:${port}`)
start(cable, {
tagName: 'turbo-cable-stream-source-delayed',
delayedUnsubscribe: true
})

let channel = cable.hub.channels[0]
expect(channel).not.toBeUndefined()

await new Promise((resolve, reject) => {
setTimeout(() => {
reject(Error('no connect received'))
}, 2000)
channel.on('connect', resolve)
})

await new Promise(resolve => setTimeout(resolve, 300))

expect(
document.body
.querySelector('turbo-cable-stream-source-delayed')
?.getAttribute('connected')
).toEqual('')

// One subscribe command was sent
expect(receivedByServer).toHaveLength(1)

document.body.innerHTML = `<div></div>`

await new Promise(resolve => setTimeout(resolve, 100))

document.body.innerHTML = `
<turbo-cable-stream-source-delayed signed-stream-name='test-delayed' channel='TurboChannel'></turbo-cable-stream-source-delayed>
`

await new Promise(resolve => setTimeout(resolve, 100))

// No new commands were sent to the server
expect(receivedByServer).toHaveLength(1)

expect(
document.body
.querySelector('turbo-cable-stream-source-delayed')
?.getAttribute('connected')
).toEqual('')

expect(cable.hub.channels).toHaveLength(2)
channel = cable.hub.channels[1]

document.body.innerHTML = `<div></div>`

await new Promise((resolve, reject) => {
setTimeout(() => {
reject(Error('no close received'))
}, 1000)
channel.on('close', resolve)
})

await new Promise(resolve => setTimeout(resolve, 500))

expect(receivedByServer).toHaveLength(2)
expect((receivedByServer[1] as any).command).toEqual('unsubscribe')
})
})
4 changes: 4 additions & 0 deletions packages/turbo-stream/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ export const DEFAULT_SOCKET_HEADER = 'X-Socket-ID'
export function start(cable, opts = {}) {
let tagName = opts.tagName || 'turbo-cable-stream-source'
let channelClass = opts.channelClass || TurboChannel
let delayedUnsubscribe = opts.delayedUnsubscribe || 0

if (delayedUnsubscribe === true) delayedUnsubscribe = 300

let C = class extends TurboStreamSourceElement {}

C.cable = cable
C.channelClass = channelClass
C.delayedUnsubscribe = delayedUnsubscribe

if (customElements.get(tagName) === undefined) {
customElements.define(tagName, C)
Expand Down
11 changes: 10 additions & 1 deletion packages/turbo-stream/stream_source_element.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { isPreview } from './turbo.js'
export class TurboStreamSourceElement extends HTMLElement {
static cable
static channelClass
static delayedUnsubscribe

async connectedCallback() {
connectStreamSource(this)
Expand Down Expand Up @@ -48,7 +49,15 @@ export class TurboStreamSourceElement extends HTMLElement {
listener()
}
this.listeners.length = 0
this.channel.disconnect()

let ch = this.channel
let delay = this.constructor.delayedUnsubscribe

if (delay) {
setTimeout(() => ch.disconnect(), delay)
} else {
ch.disconnect()
}

This comment has been minimized.

Copy link
@tleish

tleish Jun 5, 2024

@palkan - If the default value is delayedUnsubscribe =0, is there any need for a conditional here?

just use

setTimeout(() => this.channel.disconnect(), this.constructor.delayedUnsubscribe)

This comment has been minimized.

Copy link
@palkan

palkan Jun 5, 2024

Author Member

Yeah, I thought about it but decided to preserve the current behaviour (which were already captured by my tests 😁 maybe, someone else relied on the disconnect being called immediately here).

This comment has been minimized.

Copy link
@tleish

tleish Jun 6, 2024

It seems to be working great. It's such a simple solution. For some reason I thought it would be more complicated.

I'm curious... it calls disconnect after a delay, but it doesn't actually disconnect. And the MR doesn't add any additional logic to the Channel#disconnect method. It's not clear to me how adding the timeout prevents the disconnect.

This comment has been minimized.

Copy link
@palkan

palkan Jun 6, 2024

Author Member

For some reason I thought it would be more complicated.

That's because we already solve all the concurrency and race conditions issues at the @anycable/core level 🙂

It's not clear to me how adding the timeout prevents the disconnect.

tl;dr channel.disconnect() != subscription.unsubscribe().

Whenever a new stream source element is added, it creates a new channel for the same subscription; we keep track of active channels and only issue the unsubscribe request if there are none (and we synchronize subscribe and unsubscribe requests, so no race conditions here).

This comment has been minimized.

Copy link
@tleish

tleish Jun 6, 2024

Thanks for the explanation. One sign of well written is adding a feature of this type of complexity with only a few lines of code.

Thanks for such a quick turn-around on this feature.

}
}

Expand Down

0 comments on commit b15a426

Please sign in to comment.