Skip to content

Commit

Permalink
Merge branch 'REP-5358-add-retry-description' into REP-5358-retry-and…
Browse files Browse the repository at this point in the history
…-channels
  • Loading branch information
FGasper committed Dec 7, 2024
2 parents ab10df3 + 668e194 commit 50708b7
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 78 deletions.
2 changes: 1 addition & 1 deletion internal/partitions/partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ func getMidIDBounds(

// Append the copied bound to the other mid _id bounds.
midIDBounds = append(midIDBounds, bound)
ri.NoteSuccess()
ri.NoteSuccess("received an ID partition")
}

return cursor.Err()
Expand Down
62 changes: 42 additions & 20 deletions internal/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ func (r *Retryer) runRetryLoop(
li := &LoopInfo{
durationLimit: r.retryLimit,
}
funcinfos := lo.RepeatBy(
len(r.callbacks),
func(_ int) *FuncInfo {
funcinfos := lo.Map(
r.callbacks,
func(cb retryCallbackInfo, _ int) *FuncInfo {
return &FuncInfo{
lastResetTime: msync.NewTypedAtomic(startTime),
lastReset: msync.NewTypedAtomic(lastResetInfo{
time: startTime,
}),
description: cb.description,
loopDescription: r.description,
loopInfo: li,
}
Expand Down Expand Up @@ -113,17 +116,25 @@ func (r *Retryer) runRetryLoop(
defer ticker.Stop()

for {
lastSuccessTime := funcinfos[i].lastResetTime.Load()
lastReset := funcinfos[i].lastReset.Load()

select {
case <-cbDoneChan:
return
case <-ticker.C:
if funcinfos[i].lastResetTime.Load() == lastSuccessTime {
logger.Warn().
if funcinfos[i].lastReset.Load() == lastReset {
event := logger.Warn().
Str("callbackDescription", curCbInfo.description).
Time("lastSuccessAt", lastSuccessTime).
Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastSuccessTime))).
Time("noSuccessSince", lastReset.time).
Uint64("successesSoFar", lastReset.resetsSoFar)

if successDesc, hasDesc := lastReset.description.Get(); hasDesc {
event.
Str("lastSuccessDescription", successDesc)
}

event.
Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastReset.time))).
Msg("Operation has not reported success for a while.")
}
}
Expand Down Expand Up @@ -164,9 +175,11 @@ func (r *Retryer) runRetryLoop(
}

failedFuncInfo := funcinfos[groupErr.funcNum]
descriptions := failedFuncInfo.GetDescriptions()
cbErr := groupErr.errFromCallback

// Not a transient error? Fail immediately.
if !r.shouldRetryWithSleep(logger, sleepTime, *failedFuncInfo, groupErr.errFromCallback) {
if !r.shouldRetryWithSleep(logger, sleepTime, descriptions, cbErr) {
return groupErr.errFromCallback
}

Expand Down Expand Up @@ -201,7 +214,7 @@ func (r *Retryer) runRetryLoop(
// Set all of the funcs that did *not* fail as having just succeeded.
for i, curInfo := range funcinfos {
if i != groupErr.funcNum {
curInfo.lastResetTime.Store(now)
curInfo.lastReset.Store(lastResetInfo{time: now})
}
}
}
Expand Down Expand Up @@ -235,7 +248,7 @@ func (r *Retryer) addDescriptionToEvent(event *zerolog.Event) *zerolog.Event {
func (r *Retryer) shouldRetryWithSleep(
logger *logger.Logger,
sleepTime time.Duration,
funcinfo FuncInfo,
descriptions []string,
err error,
) bool {
if err == nil {
Expand All @@ -250,26 +263,35 @@ func (r *Retryer) shouldRetryWithSleep(
)

event := logger.WithLevel(
lo.Ternary(isTransient, zerolog.InfoLevel, zerolog.WarnLevel),
lo.Ternary(
// If it’s transient, surface it as info.
isTransient,
zerolog.InfoLevel,

lo.Ternary(
// Context cancellation is unimportant, so debug.
errors.Is(err, context.Canceled),
zerolog.DebugLevel,

// Other non-retryables are serious, so warn.
zerolog.WarnLevel,
),
),
)

if loopDesc, hasLoopDesc := r.description.Get(); hasLoopDesc {
event.Str("operationDescription", loopDesc)
}

event.Str("callbackDescription", funcinfo.description).
event.Strs("description", descriptions).
Int("error code", util.GetErrorCode(err)).
Err(err)

if isTransient {
event.
Stringer("delay", sleepTime).
Msg("Pausing before retrying after transient error.")
Msg("Got retryable error. Pausing, then will retry.")

return true
}

event.Msg("Non-transient error occurred.")
event.Msg("Non-retryable error occurred.")

return false
}
34 changes: 30 additions & 4 deletions internal/retry/retry_info.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package retry

import (
"slices"
"time"

"github.com/10gen/migration-verifier/internal/reportutils"
"github.com/10gen/migration-verifier/mslices"
"github.com/10gen/migration-verifier/msync"
"github.com/10gen/migration-verifier/option"
"github.com/rs/zerolog"
Expand All @@ -19,11 +21,20 @@ type LoopInfo struct {
durationLimit time.Duration
}

type lastResetInfo struct {
time time.Time

// These go into logs to facilitate debugging.
description option.Option[string]
resetsSoFar uint64
}

type FuncInfo struct {
loopInfo *LoopInfo
description string
loopDescription option.Option[string]
lastResetTime *msync.TypedAtomic[time.Time]

lastReset *msync.TypedAtomic[lastResetInfo]
}

// Log will log a debug-level message for the current Info values and the provided strings.
Expand Down Expand Up @@ -69,7 +80,7 @@ func (fi *FuncInfo) GetAttemptNumber() int {
// GetDurationSoFar returns the Info's current duration so far. This duration
// applies to the duration of retrying for transient errors only.
func (fi *FuncInfo) GetDurationSoFar() time.Duration {
return time.Since(fi.lastResetTime.Load())
return time.Since(fi.lastReset.Load().time)
}

// NoteSuccess is used to tell the retry util to reset its measurement
Expand All @@ -78,6 +89,21 @@ func (fi *FuncInfo) GetDurationSoFar() time.Duration {
//
// Call this after every successful command in a multi-command callback.
// (It’s useless--but harmless--in a single-command callback.)
func (i *FuncInfo) NoteSuccess() {
i.lastResetTime.Store(time.Now())
func (i *FuncInfo) NoteSuccess(description string) {
totalResets := i.lastReset.Load().resetsSoFar

i.lastReset.Store(lastResetInfo{
description: option.Some(description),
time: time.Now(),
resetsSoFar: 1 + totalResets,
})
}

func (i *FuncInfo) GetDescriptions() []string {
descriptions := mslices.Of(i.description)
if loopDesc, hasDesc := i.loopDescription.Get(); hasDesc {
descriptions = slices.Insert(descriptions, 0, loopDesc)
}

return descriptions
}
19 changes: 12 additions & 7 deletions internal/retry/retryer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/option"
"go.mongodb.org/mongo-driver/mongo"
)

Expand Down Expand Up @@ -99,8 +100,11 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() {
noSuccessIterations := 0
f1 := func(_ context.Context, ri *FuncInfo) error {
// Artificially advance how much time was taken.
ri.lastResetTime.Store(
ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit),
ri.lastReset.Store(
lastResetInfo{
time: ri.lastReset.Load().time.Add(-2 * ri.loopInfo.durationLimit),
description: option.Some("artificially rewinding time"),
},
)

noSuccessIterations++
Expand All @@ -124,12 +128,13 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() {
successIterations := 0
f2 := func(_ context.Context, ri *FuncInfo) error {
// Artificially advance how much time was taken.
ri.lastResetTime.Store(
ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit),
ri.lastReset.Store(
lastResetInfo{
time: ri.lastReset.Load().time.Add(-2 * ri.loopInfo.durationLimit),
description: option.Some("artificially rewinding time"),
},
)

ri.NoteSuccess()

successIterations++
if successIterations == 1 {
return someNetworkError
Expand Down Expand Up @@ -307,7 +312,7 @@ func (suite *UnitTestSuite) TestMulti_LongRunningSuccess() {

err := retryer.WithCallback(
func(ctx context.Context, fi *FuncInfo) error {
fi.NoteSuccess()
fi.NoteSuccess("success right away")

if time.Now().Before(succeedPastTime) {
time.Sleep(1 * time.Second)
Expand Down
Loading

0 comments on commit 50708b7

Please sign in to comment.