diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 4451dad..4c5bc1d 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -9,6 +9,8 @@ import ( "github.com/10gen/migration-verifier/internal/logger" "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/msync" + "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/samber/mo" @@ -71,6 +73,8 @@ type ChangeStreamReader struct { doneChan chan struct{} startAtTs *primitive.Timestamp + + lag *msync.TypedAtomic[option.Option[time.Duration]] } func (verifier *Verifier) initializeChangeStreamReaders() { @@ -86,6 +90,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() { writesOffTsChan: make(chan primitive.Timestamp), errChan: make(chan error), doneChan: make(chan struct{}), + lag: msync.NewTypedAtomic(option.None[time.Duration]()), } verifier.dstChangeStreamReader = &ChangeStreamReader{ readerType: dst, @@ -99,6 +104,7 @@ func (verifier *Verifier) initializeChangeStreamReaders() { writesOffTsChan: make(chan primitive.Timestamp), errChan: make(chan error), doneChan: make(chan struct{}), + lag: msync.NewTypedAtomic(option.None[time.Duration]()), } } @@ -257,6 +263,7 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( ctx context.Context, ri *retry.FuncInfo, cs *mongo.ChangeStream, + sess mongo.Session, ) error { eventsRead := 0 var changeEventBatch []ParsedEvent @@ -298,6 +305,17 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch( return nil } + var curTs primitive.Timestamp + curTs, err := extractTimestampFromResumeToken(cs.ResumeToken()) + if err == nil { + lagSecs := curTs.T - sess.OperationTime().T + csr.lag.Store(option.Some(time.Second * time.Duration(lagSecs))) + } else { + csr.logger.Warn(). + Err(err). + Msgf("Failed to extract timestamp from %s's resume token to compute change stream lag.", csr) + } + csr.changeEventBatchChan <- changeEventBatch return nil } @@ -306,6 +324,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( ctx context.Context, ri *retry.FuncInfo, cs *mongo.ChangeStream, + sess mongo.Session, ) error { var lastPersistedTime time.Time @@ -363,7 +382,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( break } - err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs) + err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) if err != nil { return err @@ -371,7 +390,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( } default: - err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs) + err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess) if err == nil { err = persistResumeTokenIfNeeded() @@ -408,7 +427,7 @@ func (csr *ChangeStreamReader) iterateChangeStream( func (csr *ChangeStreamReader) createChangeStream( ctx context.Context, -) (*mongo.ChangeStream, primitive.Timestamp, error) { +) (*mongo.ChangeStream, mongo.Session, primitive.Timestamp, error) { pipeline := csr.GetChangeStreamFilter() opts := options.ChangeStream(). SetMaxAwaitTime(1 * time.Second). @@ -420,7 +439,7 @@ func (csr *ChangeStreamReader) createChangeStream( savedResumeToken, err := csr.loadChangeStreamResumeToken(ctx) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to load persisted change stream resume token") } csStartLogEvent := csr.logger.Info() @@ -447,22 +466,22 @@ func (csr *ChangeStreamReader) createChangeStream( sess, err := csr.watcherClient.StartSession() if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to start session") } sctx := mongo.NewSessionContext(ctx, sess) changeStream, err := csr.watcherClient.Watch(sctx, pipeline, opts) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to open change stream") } err = csr.persistChangeStreamResumeToken(ctx, changeStream) if err != nil { - return nil, primitive.Timestamp{}, err + return nil, nil, primitive.Timestamp{}, err } startTs, err := extractTimestampFromResumeToken(changeStream.ResumeToken()) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to extract timestamp from change stream's resume token") } // With sharded clusters the resume token might lead the cluster time @@ -470,14 +489,14 @@ func (csr *ChangeStreamReader) createChangeStream( // otherwise we will get errors. clusterTime, err := getClusterTimeFromSession(sess) if err != nil { - return nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") + return nil, nil, primitive.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") } if startTs.After(clusterTime) { startTs = clusterTime } - return changeStream, startTs, nil + return changeStream, sess, startTs, nil } // StartChangeStream starts the change stream. @@ -499,7 +518,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { err := retryer.WithCallback( func(ctx context.Context, ri *retry.FuncInfo) error { - changeStream, startTs, err := csr.createChangeStream(ctx) + changeStream, sess, startTs, err := csr.createChangeStream(ctx) if err != nil { if parentThreadWaiting { initialCreateResultChan <- mo.Err[primitive.Timestamp](err) @@ -517,7 +536,7 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { parentThreadWaiting = false } - return csr.iterateChangeStream(ctx, ri, changeStream) + return csr.iterateChangeStream(ctx, ri, changeStream, sess) }, "running %s", csr, ).Run(ctx, csr.logger) @@ -544,6 +563,10 @@ func (csr *ChangeStreamReader) StartChangeStream(ctx context.Context) error { return nil } +func (csr *ChangeStreamReader) GetLag() option.Option[time.Duration] { + return csr.lag.Load() +} + func addTimestampToLogEvent(ts primitive.Timestamp, event *zerolog.Event) *zerolog.Event { return event. Interface("timestamp", ts). diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index 3d71f5e..65084c6 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -399,43 +399,59 @@ func (verifier *Verifier) printChangeEventStatistics(builder *strings.Builder) { builder.WriteString(fmt.Sprintf("\nChange events this generation: %s\n", eventsDescr)) - if totalEvents == 0 { - return - } + if totalEvents > 0 { - reverseSortedNamespaces := maps.Keys(nsTotals) - sort.Slice( - reverseSortedNamespaces, - func(i, j int) bool { - return nsTotals[reverseSortedNamespaces[i]] > nsTotals[reverseSortedNamespaces[j]] - }, - ) + reverseSortedNamespaces := maps.Keys(nsTotals) + sort.Slice( + reverseSortedNamespaces, + func(i, j int) bool { + return nsTotals[reverseSortedNamespaces[i]] > nsTotals[reverseSortedNamespaces[j]] + }, + ) - // Only report the busiest namespaces. - if len(reverseSortedNamespaces) > changeEventsTableMaxSize { - reverseSortedNamespaces = reverseSortedNamespaces[:changeEventsTableMaxSize] - } + // Only report the busiest namespaces. + if len(reverseSortedNamespaces) > changeEventsTableMaxSize { + reverseSortedNamespaces = reverseSortedNamespaces[:changeEventsTableMaxSize] + } - table := tablewriter.NewWriter(builder) - table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"}) + table := tablewriter.NewWriter(builder) + table.SetHeader([]string{"Namespace", "Insert", "Update", "Replace", "Delete", "Total"}) + + for _, ns := range reverseSortedNamespaces { + curNsStats := nsStats[ns] + + table.Append( + append( + []string{ns}, + strconv.Itoa(curNsStats.Insert), + strconv.Itoa(curNsStats.Update), + strconv.Itoa(curNsStats.Replace), + strconv.Itoa(curNsStats.Delete), + strconv.Itoa(curNsStats.Total()), + ), + ) + } - for _, ns := range reverseSortedNamespaces { - curNsStats := nsStats[ns] + builder.WriteString("\nMost frequently-changing namespaces:\n") + table.Render() + } - table.Append( - append( - []string{ns}, - strconv.Itoa(curNsStats.Insert), - strconv.Itoa(curNsStats.Update), - strconv.Itoa(curNsStats.Replace), - strconv.Itoa(curNsStats.Delete), - strconv.Itoa(curNsStats.Total()), - ), + srcLag, hasSrcLag := verifier.srcChangeStreamReader.GetLag().Get() + if hasSrcLag { + builder.WriteString( + fmt.Sprintf("\nSource change stream lag: %s\n", reportutils.DurationToHMS(srcLag)), ) } - builder.WriteString("\nMost frequently-changing namespaces:\n") - table.Render() + dstLag, hasDstLag := verifier.dstChangeStreamReader.GetLag().Get() + if hasDstLag { + if !hasSrcLag { + builder.WriteString("\n") + } + builder.WriteString( + fmt.Sprintf("Destination change stream lag: %s\n", reportutils.DurationToHMS(dstLag)), + ) + } } func (verifier *Verifier) printWorkerStatus(builder *strings.Builder) {