diff --git a/internal/partitions/partitions.go b/internal/partitions/partitions.go index 022a54e..acb02c3 100644 --- a/internal/partitions/partitions.go +++ b/internal/partitions/partitions.go @@ -621,7 +621,7 @@ func getMidIDBounds( // Append the copied bound to the other mid _id bounds. midIDBounds = append(midIDBounds, bound) - ri.NoteSuccess() + ri.NoteSuccess("received an ID partition") } return cursor.Err() diff --git a/internal/retry/retry.go b/internal/retry/retry.go index 6630bbe..396e29a 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -69,11 +69,14 @@ func (r *Retryer) runRetryLoop( li := &LoopInfo{ durationLimit: r.retryLimit, } - funcinfos := lo.RepeatBy( - len(r.callbacks), - func(_ int) *FuncInfo { + funcinfos := lo.Map( + r.callbacks, + func(cb retryCallbackInfo, _ int) *FuncInfo { return &FuncInfo{ - lastResetTime: msync.NewTypedAtomic(startTime), + lastReset: msync.NewTypedAtomic(lastResetInfo{ + time: startTime, + }), + description: cb.description, loopDescription: r.description, loopInfo: li, } @@ -113,17 +116,25 @@ func (r *Retryer) runRetryLoop( defer ticker.Stop() for { - lastSuccessTime := funcinfos[i].lastResetTime.Load() + lastReset := funcinfos[i].lastReset.Load() select { case <-cbDoneChan: return case <-ticker.C: - if funcinfos[i].lastResetTime.Load() == lastSuccessTime { - logger.Warn(). + if funcinfos[i].lastReset.Load() == lastReset { + event := logger.Warn(). Str("callbackDescription", curCbInfo.description). - Time("lastSuccessAt", lastSuccessTime). - Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastSuccessTime))). + Time("noSuccessSince", lastReset.time). + Uint64("successesSoFar", lastReset.resetsSoFar) + + if successDesc, hasDesc := lastReset.description.Get(); hasDesc { + event. + Str("lastSuccessDescription", successDesc) + } + + event. + Str("elapsedTime", reportutils.DurationToHMS(time.Since(lastReset.time))). Msg("Operation has not reported success for a while.") } } @@ -164,9 +175,11 @@ func (r *Retryer) runRetryLoop( } failedFuncInfo := funcinfos[groupErr.funcNum] + descriptions := failedFuncInfo.GetDescriptions() + cbErr := groupErr.errFromCallback // Not a transient error? Fail immediately. - if !r.shouldRetryWithSleep(logger, sleepTime, *failedFuncInfo, groupErr.errFromCallback) { + if !r.shouldRetryWithSleep(logger, sleepTime, descriptions, cbErr) { return groupErr.errFromCallback } @@ -201,7 +214,7 @@ func (r *Retryer) runRetryLoop( // Set all of the funcs that did *not* fail as having just succeeded. for i, curInfo := range funcinfos { if i != groupErr.funcNum { - curInfo.lastResetTime.Store(now) + curInfo.lastReset.Store(lastResetInfo{time: now}) } } } @@ -235,7 +248,7 @@ func (r *Retryer) addDescriptionToEvent(event *zerolog.Event) *zerolog.Event { func (r *Retryer) shouldRetryWithSleep( logger *logger.Logger, sleepTime time.Duration, - funcinfo FuncInfo, + descriptions []string, err error, ) bool { if err == nil { @@ -250,26 +263,35 @@ func (r *Retryer) shouldRetryWithSleep( ) event := logger.WithLevel( - lo.Ternary(isTransient, zerolog.InfoLevel, zerolog.WarnLevel), + lo.Ternary( + // If it’s transient, surface it as info. + isTransient, + zerolog.InfoLevel, + + lo.Ternary( + // Context cancellation is unimportant, so debug. + errors.Is(err, context.Canceled), + zerolog.DebugLevel, + + // Other non-retryables are serious, so warn. + zerolog.WarnLevel, + ), + ), ) - if loopDesc, hasLoopDesc := r.description.Get(); hasLoopDesc { - event.Str("operationDescription", loopDesc) - } - - event.Str("callbackDescription", funcinfo.description). + event.Strs("description", descriptions). Int("error code", util.GetErrorCode(err)). Err(err) if isTransient { event. Stringer("delay", sleepTime). - Msg("Pausing before retrying after transient error.") + Msg("Got retryable error. Pausing, then will retry.") return true } - event.Msg("Non-transient error occurred.") + event.Msg("Non-retryable error occurred.") return false } diff --git a/internal/retry/retry_info.go b/internal/retry/retry_info.go index ea325ac..8be4fce 100644 --- a/internal/retry/retry_info.go +++ b/internal/retry/retry_info.go @@ -1,9 +1,11 @@ package retry import ( + "slices" "time" "github.com/10gen/migration-verifier/internal/reportutils" + "github.com/10gen/migration-verifier/mslices" "github.com/10gen/migration-verifier/msync" "github.com/10gen/migration-verifier/option" "github.com/rs/zerolog" @@ -19,11 +21,20 @@ type LoopInfo struct { durationLimit time.Duration } +type lastResetInfo struct { + time time.Time + + // These go into logs to facilitate debugging. + description option.Option[string] + resetsSoFar uint64 +} + type FuncInfo struct { loopInfo *LoopInfo description string loopDescription option.Option[string] - lastResetTime *msync.TypedAtomic[time.Time] + + lastReset *msync.TypedAtomic[lastResetInfo] } // Log will log a debug-level message for the current Info values and the provided strings. @@ -69,7 +80,7 @@ func (fi *FuncInfo) GetAttemptNumber() int { // GetDurationSoFar returns the Info's current duration so far. This duration // applies to the duration of retrying for transient errors only. func (fi *FuncInfo) GetDurationSoFar() time.Duration { - return time.Since(fi.lastResetTime.Load()) + return time.Since(fi.lastReset.Load().time) } // NoteSuccess is used to tell the retry util to reset its measurement @@ -78,6 +89,21 @@ func (fi *FuncInfo) GetDurationSoFar() time.Duration { // // Call this after every successful command in a multi-command callback. // (It’s useless--but harmless--in a single-command callback.) -func (i *FuncInfo) NoteSuccess() { - i.lastResetTime.Store(time.Now()) +func (i *FuncInfo) NoteSuccess(description string) { + totalResets := i.lastReset.Load().resetsSoFar + + i.lastReset.Store(lastResetInfo{ + description: option.Some(description), + time: time.Now(), + resetsSoFar: 1 + totalResets, + }) +} + +func (i *FuncInfo) GetDescriptions() []string { + descriptions := mslices.Of(i.description) + if loopDesc, hasDesc := i.loopDescription.Get(); hasDesc { + descriptions = slices.Insert(descriptions, 0, loopDesc) + } + + return descriptions } diff --git a/internal/retry/retryer_test.go b/internal/retry/retryer_test.go index 8b43040..38a76fc 100644 --- a/internal/retry/retryer_test.go +++ b/internal/retry/retryer_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/option" "go.mongodb.org/mongo-driver/mongo" ) @@ -99,8 +100,11 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() { noSuccessIterations := 0 f1 := func(_ context.Context, ri *FuncInfo) error { // Artificially advance how much time was taken. - ri.lastResetTime.Store( - ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit), + ri.lastReset.Store( + lastResetInfo{ + time: ri.lastReset.Load().time.Add(-2 * ri.loopInfo.durationLimit), + description: option.Some("artificially rewinding time"), + }, ) noSuccessIterations++ @@ -124,12 +128,13 @@ func (suite *UnitTestSuite) TestRetryerDurationReset() { successIterations := 0 f2 := func(_ context.Context, ri *FuncInfo) error { // Artificially advance how much time was taken. - ri.lastResetTime.Store( - ri.lastResetTime.Load().Add(-2 * ri.loopInfo.durationLimit), + ri.lastReset.Store( + lastResetInfo{ + time: ri.lastReset.Load().time.Add(-2 * ri.loopInfo.durationLimit), + description: option.Some("artificially rewinding time"), + }, ) - ri.NoteSuccess() - successIterations++ if successIterations == 1 { return someNetworkError @@ -307,7 +312,7 @@ func (suite *UnitTestSuite) TestMulti_LongRunningSuccess() { err := retryer.WithCallback( func(ctx context.Context, fi *FuncInfo) error { - fi.NoteSuccess() + fi.NoteSuccess("success right away") if time.Now().Before(succeedPastTime) { time.Sleep(1 * time.Second) diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index a4dd0c0..2a7ba26 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -9,6 +9,8 @@ import ( "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/msync" + "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/samber/mo" @@ -71,6 +73,8 @@ type ChangeStreamReader struct { doneChan chan struct{} startAtTs *primitive.Timestamp + + lag *msync.TypedAtomic[option.Option[time.Duration]] } func (verifier *Verifier) initializeChangeStreamReaders() { @@ -86,6 +90,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() { writesOffTs: util.NewEventual[primitive.Timestamp](), error: util.NewEventual[error](), doneChan: make(chan struct{}), + lag: msync.NewTypedAtomic(option.None[time.Duration]()), } verifier.dstChangeStreamReader = &ChangeStreamReader{ readerType: dst, @@ -99,6 +104,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() { writesOffTs: util.NewEventual[primitive.Timestamp](), error: util.NewEventual[error](), doneChan: make(chan struct{}), + lag: msync.NewTypedAtomic(option.None[time.Duration]()), } } @@ -256,6 +262,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( ctx context.Context, ri *retry.FuncInfo, cs *mongo.ChangeStream, + sess mongo.Session, ) error { eventsRead := 0 var changeEventBatch []ParsedEvent @@ -291,12 +298,23 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( eventsRead++ } - ri.NoteSuccess() + ri.NoteSuccess("received a batch of change events") if eventsRead == 0 { return nil } + var curTs primitive.Timestamp + curTs, err := extractTimestampFromResumeToken(cs.ResumeToken()) + if err == nil { + lagSecs := curTs.T - sess.OperationTime().T + csr.lag.Store(option.Some(time.Second * time.Duration(lagSecs))) + } else { + csr.logger.Warn(). + Err(err). + Msgf("Failed to extract timestamp from %s's resume token to compute change stream lag.", csr) + } + csr.changeEventBatchChan <- changeEventBatch return nil } @@ -305,6 +323,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( ctx context.Context, ri *retry.FuncInfo, cs *mongo.ChangeStream, + sess mongo.Session, ) error { var lastPersistedTime time.Time @@ -364,7 +383,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( break } - err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs) + err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) if err != nil { return err @@ -372,7 +391,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( } default: - err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs) + err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) if err == nil { err = persistResumeTokenIfNeeded() @@ -409,7 +428,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( func (csr *ChangeStreamReader) createChangeStream( ctx context.Context, -) (*mongo.ChangeStream, primitive.Timestamp, error) { +) (*mongo.ChangeStream, mongo.Session, primitive.Timestamp, error) { pipeline := csr.GetChangeStreamFilter() opts := options.ChangeStream(). SetMaxAwaitTime(1 * time.Second). @@ -421,7 +440,7 @@ func (csr *ChangeStreamReader) createChangeStream( savedResumeToken, err := csr.loadChangeStreamResumeToken(ctx) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") } csStartLogEvent := csr.logger.Info() @@ -448,22 +467,22 @@ func (csr *ChangeStreamReader) createChangeStream( sess, err := csr.watcherClient.StartSession() if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session") } sctx := mongo.NewSessionContext(ctx, sess) changeStream, err := csr.watcherClient.Watch(sctx, pipeline, opts) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream") } err = csr.persistChangeStreamResumeToken(ctx, changeStream) if err != nil { - return nil, primitive.Timestamp{}, err + return nil, nil, primitive.Timestamp{}, err } startTs, err := extractTimestampFromResumeToken(changeStream.ResumeToken()) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") } // With sharded clusters the resume token might lead the cluster time @@ -471,14 +490,14 @@ func (csr *ChangeStreamReader) createChangeStream( // otherwise we will get errors. clusterTime, err := getClusterTimeFromSession(sess) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") } if startTs.After(clusterTime) { startTs = clusterTime } - return changeStream, startTs, nil + return changeStream, sess, startTs, nil } // StartChangeStream starts the change stream. @@ -500,7 +519,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { err := retryer.WithCallback( func(ctx context.Context, ri *retry.FuncInfo) error { - changeStream, startTs, err := csr.createChangeStream(ctx) + changeStream, sess, startTs, err := csr.createChangeStream(ctx) if err != nil { if parentThreadWaiting { initialCreateResultChan <- mo.Err[primitive.Timestamp](err) @@ -518,7 +537,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { parentThreadWaiting = false } - return csr.iterateChangeStream(ctx, ri, changeStream) + return csr.iterateChangeStream(ctx, ri, changeStream, sess) }, "running %s", csr, ).Run(ctx, csr.logger) @@ -542,6 +561,10 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { return nil } +func (csr *ChangeStreamReader) GetLag() option.Option[time.Duration] { + return csr.lag.Load() +} + func addTimestampToLogEvent(ts primitive.Timestamp, event *zerolog.Event) *zerolog.Event { return event. Interface("timestamp", ts). diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 6910acb..944a440 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -172,7 +172,6 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_WithNamespaces() { events = append(events, newEvent) } - suite.T().Logf("Change stream op time (got event? %v): %v", gotEvent, csOpTime) if csOpTime.After(*changeStreamStopTime) { break } diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 3fdcb12..d6f8cca 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -324,7 +324,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( ) if err == nil { - state.NoteSuccess() + state.NoteSuccess("opened src find cursor") err = errors.Wrap( iterateCursorToChannel(ctx, state, cursor, srcChannel), @@ -350,7 +350,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks( ) if err == nil { - state.NoteSuccess() + state.NoteSuccess("opened dst find cursor") err = errors.Wrap( iterateCursorToChannel(ctx, state, cursor, dstChannel), @@ -376,7 +376,7 @@ func iterateCursorToChannel( writer chan<- bson.Raw, ) error { for cursor.Next(ctx) { - state.NoteSuccess() + state.NoteSuccess("received a document") writer <- slices.Clone(cursor.Current) } diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 254017c..b4228f1 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -1240,6 +1240,11 @@ func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*Verificat taskCollection := verifier.verificationTaskCollection() generation, _ := verifier.getGeneration() + // XXX REMOVE ME + verifier.logger.Debug(). + Int("generation", generation). + Msg("Running GetVerificationStatus().") + var results []bson.Raw err := retry.New().WithCallback( diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index 3d71f5e..65084c6 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -399,43 +399,59 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) { builder.WriteString(fmt.Sprintf("\nChange events this generation: %s\n", eventsDescr)) - if totalEvents == 0 { - return - } + if totalEvents > 0 { - reverseSortedNamespaces := maps.Keys(nsTotals) - sort.Slice( - reverseSortedNamespaces, - func(i, j int) bool { - return nsTotals[reverseSortedNamespaces[i]] > nsTotals[reverseSortedNamespaces[j]] - }, - ) + reverseSortedNamespaces := maps.Keys(nsTotals) + sort.Slice( + reverseSortedNamespaces, + func(i, j int) bool { + return nsTotals[reverseSortedNamespaces[i]] > nsTotals[reverseSortedNamespaces[j]] + }, + ) - // Only report the busiest namespaces. - if len(reverseSortedNamespaces) > changeEventsTableMaxSize { - reverseSortedNamespaces = reverseSortedNamespaces[:changeEventsTableMaxSize] - } + // Only report the busiest namespaces. + if len(reverseSortedNamespaces) > changeEventsTableMaxSize { + reverseSortedNamespaces = reverseSortedNamespaces[:changeEventsTableMaxSize] + } - table := tablewriter.NewWriter(builder) - table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"}) + table := tablewriter.NewWriter(builder) + table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"}) + + for _, ns := range reverseSortedNamespaces { + curNsStats := nsStats[ns] + + table.Append( + append( + []string{ns}, + strconv.Itoa(curNsStats.Insert), + strconv.Itoa(curNsStats.Update), + strconv.Itoa(curNsStats.Replace), + strconv.Itoa(curNsStats.Delete), + strconv.Itoa(curNsStats.Total()), + ), + ) + } - for _, ns := range reverseSortedNamespaces { - curNsStats := nsStats[ns] + builder.WriteString("\nMost frequently-changing namespaces:\n") + table.Render() + } - table.Append( - append( - []string{ns}, - strconv.Itoa(curNsStats.Insert), - strconv.Itoa(curNsStats.Update), - strconv.Itoa(curNsStats.Replace), - strconv.Itoa(curNsStats.Delete), - strconv.Itoa(curNsStats.Total()), - ), + srcLag, hasSrcLag := verifier.srcChangeStreamReader.GetLag().Get() + if hasSrcLag { + builder.WriteString( + fmt.Sprintf("\nSource change stream lag: %s\n", reportutils.DurationToHMS(srcLag)), ) } - builder.WriteString("\nMost frequently-changing namespaces:\n") - table.Render() + dstLag, hasDstLag := verifier.dstChangeStreamReader.GetLag().Get() + if hasDstLag { + if !hasSrcLag { + builder.WriteString("\n") + } + builder.WriteString( + fmt.Sprintf("Destination change stream lag: %s\n", reportutils.DurationToHMS(dstLag)), + ) + } } func (verifier *Verifier) printWorkerStatus(builder *strings.Builder) {