Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where Dispose() called after source terminated potentially violates spec #11

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

bjnsn
Copy link

@bjnsn bjnsn commented Jan 28, 2021

Spec says: A callbag MUST NOT be delivered data after it has been terminated

As I understand it, for callbags to work in concert all callbags must either:

  1. not send 'terminate' to sources/sinks after said source/sink notifies it has terminated
  2. guard against case where callbag has notified sources/sinks of termination and is itself subsequently terminated

I expect 1) - though perhaps @staltz should weigh in...

If 2) then I believe this issue should be raised with @Andarist in callbag-remember since it doesn't guard against that case.

Example: https://stackblitz.com/edit/typescript-6w7v6u?file=index.ts

import pipe from "callbag-pipe";
import of from "callbag-of";
import subscribe from "callbag-subscribe";
import remember from "callbag-remember";

// once `of` arguments are exhausted, it sends dispose signal to its sink (remember)
const num$ = remember(of(1));

const cancelSubscription = pipe(
  num$,
  subscribe(value => console.log(value))
);

setTimeout(
  () =>
    // when subscription is canceled, it sends dispose signal to its source (remember)
    cancelSubscription(),
  100
);

// remember throws error after recieving dispose signal from both source and sink

@Andarist
Copy link
Contributor

The fix here looks OK - I believe that stuff like this should be mostly handed on the ends of the callbag chains. It's the subscribe that knows that it shouldn't send 2 upwards after the source is completed (or errored).

@bjnsn
Copy link
Author

bjnsn commented Feb 22, 2021

@Andarist It looks like handling this at the end of the chain won't fully solve the problem. In cases where remember is used with subscribe and a combination operator such as combine - the combination operator would need to terminate only those sources that haven't terminated, which isn't currently happening. Using the current patch there still are issues: https://stackblitz.com/edit/js-vqe3br

import fromPromise from "callbag-from-promise";
import interval from "callbag-interval";
import combine from "callbag-combine";
import pipe from "callbag-pipe";
import remember from "callbag-remember";

const str$ = remember(fromPromise(Promise.resolve("foo")));
const time$ = interval(1000);
const combine$ = combine(str$, time$);

const dispose = pipe(
  combine$,
  patchedSubscribe(val => {
    console.log(val);
  })
);

setTimeout(() => dispose(), 1500);

@Andarist
Copy link
Contributor

Just to clarify if this wasn't clear beforehand - by "at the end" I have meant "as close to the end as possible".

I can't say that remember is 100% correct and it definitely shouldn't change but I'd only like to touch it after we fix other issues and prove that it has to be changed. This is IMHO a good process as it allows us to recognize those unhandled scenarios in other operators.

As to this particular case - I'm pretty sure that combine should "track" the completion state of its sources here:
https://github.com/staltz/callbag-combine/blob/912a5ec8ec3d9e65d3beccdc7a53eabd624c1c8a/readme.js#L65
which could simply be done by nullifying the correct source talkback slot. More than that - errors from sources seem to be currently ignored by combine and that could be fixed as well.

@bjnsn
Copy link
Author

bjnsn commented Feb 22, 2021

Right - I believe the talkback from the sink (in this case callbag-subscribe) is handled here and that the list of talkbacks is accumulated but never culled.

@Andarist
Copy link
Contributor

Yep, I suspect that this should be enough to fix this particular problem:

diff --git a/readme.js b/readme.js
index 5de5d3f..d66c240 100644
--- a/readme.js
+++ b/readme.js
@@ -45,7 +45,7 @@ const combine = (...sources) => (start, sink) => {
   const sourceTalkbacks = new Array(n);
   const talkback = (t, d) => {
     if (t === 0) return;
-    for (let i = 0; i < n; i++) sourceTalkbacks[i](t, d);
+    for (let i = 0; i < n; i++) sourceTalkbacks[i] && sourceTalkbacks[i](t, d);
   };
   sources.forEach((source, i) => {
     vals[i] = EMPTY;
@@ -62,6 +62,7 @@ const combine = (...sources) => (start, sink) => {
           sink(1, arr);
         }
       } else if (t === 2) {
+        sourceTalkbacks[i] = null;
         if (--Ne === 0) sink(2);
       } else {
         sink(t, d);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants