Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing interruption behaviour #3183

Merged
merged 13 commits into from
May 12, 2023

Conversation

Angel-O
Copy link
Contributor

@Angel-O Angel-O commented Mar 25, 2023

Summary

Changes

increasing supply by Int.MaxValue + chunkSize on upstream finalization to prevent the supply semaphore from blocking when upstream is interrupted while downstream is waiting for the timeout to expire or to have enough elements

Notes

  • the stress test: all elements are processed test is nothing but a copy of the benchmark with and integrity check at the end. While it may seem redundant (i.e. should never lose any elements should cover it), in reality it is possible to write an implementation that passes that test but fails this one. (on a side note, that explains the big performance gain of the implementation linked earlier, so having this test in place could prevent being misled when looking at benchmark results, while working on a new implementation 🙏🏾 )

* adding error & interruption propagation and integrity test
@armanbilge
Copy link
Member

How does this PR relate to #3186? Should this be reviewed first?

@Angel-O
Copy link
Contributor Author

Angel-O commented Mar 31, 2023

I ended up making all the changes in #3186, since I wanted to verify the correctness of that implementation, but I'm happy for this one to be considered first. I'll sync up the branch, run sbt prePR and push again if needed.


val downstream = source.groupWithin(100, 2.seconds)

downstream.intercept[SevenNotAllowed.type]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make an assertion here about what the downstream has / has not received before the error?

Comment on lines 881 to 883
.timeout(downstreamTimeout)
.flatTap(_ => IO.monotonic.flatMap(ref.set))
.flatMap(emit => ref.get.map(timeLapsed => (timeLapsed, emit)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.timeout(downstreamTimeout)
.flatTap(_ => IO.monotonic.flatMap(ref.set))
.flatMap(emit => ref.get.map(timeLapsed => (timeLapsed, emit)))
.timed

@@ -874,6 +874,7 @@ class StreamCombinatorsSuite extends Fs2Suite {
source.groupWithin(Int.MaxValue, 1.day)

downstream.compile.lastOrError
.timeout(downstreamTimeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this timeout necessary? Since its an executeEmbed test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's because if the test fails we get a slightly better error message

java.util.concurrent.TimeoutException: 7500 milliseconds which can be easily associated to downstreamTimeout

otherwise we get this value on the diff which looks a bit random

_1 = 86405500000000 nanoseconds,

but I'm happy to remove it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, that is a nicer error :) thanks!

Comment on lines 1473 to 1476
def endSupply(result: Either[Throwable, Unit]): F2[Unit] =
buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN(Int.MaxValue)
buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN(
Int.MaxValue + outputLong
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, dumb question: why is Int.MaxValue a "magic number" in this context? I would have thought it's effectively maxing out the semaphore, but if it needs + outputLong to work then I feel like it must have more significance?

Copy link
Contributor Author

@Angel-O Angel-O Apr 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Legit question to be fair. Had to think about it again.

Interruption of the upstream fiber (i.e. Outcome.Cancelled) is handled downstream by doing nothing (permits are never released)
So by increasing the supply to Int.MaxValue we are just evening out the negative balance (Int.MaxValue is to account for the worst case scenario: at most the chunkSize parameter will be equal to Int.MaxValue)

    val waitSupply = supply.acquireN(outputLong).guaranteeCase {
        case Outcome.Succeeded(_) => supply.releaseN(outputLong)
        case _                    => F.unit
     }

Now after getting past the "checkpoint" above we are acquiring outputLong permits again

    acq <- F.race(F.sleep(timeout), waitSupply).flatMap {
      case Left(_)  => onTimeout
      case Right(_) => supply.acquireN(outputLong).as(outputLong)
    }

So in order to get past this point we need to release an additional outputLong permits and that allows the stream to be unblocked

EDIT

Interruption of the upstream fiber (i.e. Outcome.Cancelled)

uhm well actually I've just tested it, it is not handled with Outcome.Cancelled...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for that explanation!

Int.MaxValue is to account for the worst case scenario: at most the chunkSize parameter will be equal to Int.MaxValue

So could we just use chunkSize here, instead of Int.MaxValue ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@armanbilge apologies I was wrong, that's not what's happening here. I'm just doing some tests to figure out why we need the additional outputLong

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, if these implementations details are no longer relevant after your rewrite in the other PR, then let's not get too hung up on this one :)

Copy link
Contributor Author

@Angel-O Angel-O Apr 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I think I've figured it out (might be useful for the other implementation actually)

basically the problem is that we need enough supply to cover 2 iterations of the race loop. So if we only increase it by Int.MaxValue the following will happen

  • (current iteration): supply is unblocked
  • (next iteration): supply gets stuck (not enough supply because upstream was interrupted)

if instead we increase it by Int.MaxValue + outputLong

  • (current iteration): supply is unblocked
  • (next iteration): supply is not blocked thanks to the additional outputLong

So since the chunkSize can be as high as Int.MaxValue then the minimum supply to unblock the semaphore should be Int.MaxValue + outputLong

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So since the chunkSize can be as high as Int.MaxValue then the minimum supply to unblock the semaphore should be Int.MaxValue + outputLong

Key word being "can". Wouldn't chunkSize + outputLong be sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that should work. The test still passes, I'll change it to outputLong * 2 since chunkSize == outputLong

@He-Pin
Copy link
Contributor

He-Pin commented Apr 2, 2023

Have you stress the latency of many fs streams with groupWithin?

@Angel-O
Copy link
Contributor Author

Angel-O commented Apr 2, 2023

Have you stress the latency of many fs streams with groupWithin?

Hey @He-Pin , may I ask ? what do you mean exactly? Running many streams concurrently and using groupWithin on the resulting stream ? i.e. after calling .parJoinUnbounded ?

@He-Pin
Copy link
Contributor

He-Pin commented Apr 2, 2023

Have you stress the latency of many fs streams with groupWithin?

Hey @He-Pin , may I ask ? what do you mean exactly? Running many streams concurrently and using groupWithin on the resulting stream ? i.e. after calling .parJoinUnbounded ?

Eg, using groupwithin to build a lock-step game server where the latency matters

@Angel-O
Copy link
Contributor Author

Angel-O commented Apr 2, 2023

Eg, using groupwithin to build a lock-step game server where the latency matters

Thanks for the clarification. I must admit that unfortunately I've never heard of a lock-step server, so I haven't done any work (or rather wrote any test) with this particular use case in mind. Also keep in mind that I'm fairly new to open source contributions so maybe this is more of a question for regular maintainers.

Nonetheless, I'd be interested to find out more: can you point me towards an example or a beginner friendly resource? Thank you 🙏🏾

@armanbilge
Copy link
Member

@He-Pin you are welcome to contribute some benchmarks for your usecase, that will probably be the best way to answer your question :)

@He-Pin
Copy link
Contributor

He-Pin commented Apr 2, 2023

Eg, using groupwithin to build a lock-step game server where the latency matters

Thanks for the clarification. I must admit that unfortunately I've never heard of a lock-step server, so I haven't done any work (or rather wrote any test) with this particular use case in mind. Also keep in mind that I'm fairly new to open source contributions so maybe this is more of a question for regular maintainers.

Nonetheless, I'd be interested to find out more: can you point me towards an example or a beginner friendly resource? Thank you 🙏🏾

keep performance in mind, this is the way,mandalorian.

@He-Pin
Copy link
Contributor

He-Pin commented Apr 2, 2023

@He-Pin you are welcome to contribute some benchmarks for your usecase, that will probably be the best way to answer your question :)

that's true, for my usecase i need to use a Hashed timer instead.

I'm learning more CE code too, will pr later.

Copy link
Member

@armanbilge armanbilge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for chasing this one down! The new tests specifying the expected behavior are great 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

groupWithin: inconsistent behaviour on source termination
3 participants