Skip to content

Commit

Permalink
Make Eventual Get() panic if the value is unset.
Browse files Browse the repository at this point in the history
  • Loading branch information
FGasper committed Dec 9, 2024
1 parent d11cf4a commit 42376f5
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 17 deletions.
13 changes: 9 additions & 4 deletions internal/util/eventual.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ func (e *Eventual[T]) Ready() <-chan struct{} {
return e.ready
}

// Get returns an option that contains the Eventual’s value, or
// empty if the value isn’t ready yet.
func (e *Eventual[T]) Get() option.Option[T] {
// Get returns the Eventual’s value if it’s ready.
// It panics otherwise.
func (e *Eventual[T]) Get() T {
e.mux.RLock()
defer e.mux.RUnlock()

return e.val
val, has := e.val.Get()
if has {
return val
}

panic("Eventual's Get() called before value was ready.")
}

// Set sets the Eventual’s value. It may be called only once;
Expand Down
11 changes: 4 additions & 7 deletions internal/util/eventual_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@ package util

import (
"time"

"github.com/10gen/migration-verifier/option"
)

func (s *UnitTestSuite) TestEventual() {
eventual := NewEventual[int]()

s.Assert().Equal(
option.None[int](),
eventual.Get(),
"Get() should return empty",
s.Assert().Panics(
func() { eventual.Get() },
"Get() should panic before the value is set",
)

select {
Expand All @@ -30,7 +27,7 @@ func (s *UnitTestSuite) TestEventual() {
}

s.Assert().Equal(
option.Some(123),
123,
eventual.Get(),
"Get() should return the value",
)
Expand Down
2 changes: 1 addition & 1 deletion internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (csr *ChangeStreamReader) iterateChangeStream(
// This means we should exit rather than continue reading the change stream
// since there should be no more events.
case <-csr.writesOffTs.Ready():
writesOffTs := csr.writesOffTs.Get().MustGet()
writesOffTs := csr.writesOffTs.Get()

csr.logger.Debug().
Interface("writesOffTimestamp", writesOffTs).
Expand Down
6 changes: 3 additions & 3 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr *ChangeSt
case <-ctx.Done():
return ctx.Err()
case <-csr.error.Ready():
err := csr.error.Get().MustGet()
err := csr.error.Get()
verifier.logger.Warn().Err(err).
Msgf("Received error from %s.", csr)
return err
Expand Down Expand Up @@ -89,10 +89,10 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error {
eg.Go(func() error {
select {
case <-verifier.srcChangeStreamReader.error.Ready():
err := verifier.srcChangeStreamReader.error.Get().MustGet()
err := verifier.srcChangeStreamReader.error.Get()
return errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader)
case <-verifier.dstChangeStreamReader.error.Ready():
err := verifier.dstChangeStreamReader.error.Get().MustGet()
err := verifier.dstChangeStreamReader.error.Get()
return errors.Wrapf(err, "%s failed", verifier.dstChangeStreamReader)
case <-ctx.Done():
return nil
Expand Down
4 changes: 2 additions & 2 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,15 @@ func (verifier *Verifier) WritesOff(ctx context.Context) error {
// under the lock.
select {
case <-verifier.srcChangeStreamReader.error.Ready():
err := verifier.srcChangeStreamReader.error.Get().MustGet()
err := verifier.srcChangeStreamReader.error.Get()
return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.srcChangeStreamReader)
default:
verifier.srcChangeStreamReader.writesOffTs.Set(srcFinalTs)
}

select {
case <-verifier.dstChangeStreamReader.error.Ready():
err := verifier.dstChangeStreamReader.error.Get().MustGet()
err := verifier.dstChangeStreamReader.error.Get()
return errors.Wrapf(err, "tried to send writes-off timestamp to %s, but change stream already failed", verifier.dstChangeStreamReader)
default:
verifier.dstChangeStreamReader.writesOffTs.Set(dstFinalTs)
Expand Down

0 comments on commit 42376f5

Please sign in to comment.