Skip to content

Commit

Permalink
Merge branch 'master' into 4.23.04.x
Browse files Browse the repository at this point in the history
  • Loading branch information
ffffwh committed May 8, 2023
2 parents b840573 + 0e55402 commit 25ab378
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 2 deletions.
4 changes: 3 additions & 1 deletion driver/mysql/applier_incr.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@ func (a *ApplierIncr) handleEntry(entryCtx *common.EntryContext) (err error) {

if isBig {
if binlogEntry.Index == 0 {
a.mtsManager.lastEnqueue = binlogEntry.Coordinates.GetSequenceNumber()
if !a.mtsManager.WaitForExecution(binlogEntry) {
return nil // shutdown
}
}
a.logger.Info("bigtx ApplyBinlogEvent", "gno", txGno, "index", binlogEntry.Index)
err = a.ApplyBinlogEvent(0, entryCtx)
Expand Down
28 changes: 28 additions & 0 deletions driver/mysql/applier_incr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,31 @@ func TestMtsManager(t *testing.T) {
t.Fatal("might be stuck")
}
}

func TestMtsManager2(t *testing.T) {
logger := hclog.Default()
shutdownCh := make(chan struct{})
defer close(shutdownCh)

mm := NewMtsManager(shutdownCh, logger)
go mm.LcUpdater()

mm.WaitForExecution0(1, 0)
logger.Info("wait 1 0")
mm.Executed0(1)
mm.WaitForExecution0(2, 1)
logger.Info("wait 2 1")
go func() {
// execute later
time.Sleep(100 * time.Millisecond)
mm.Executed0(2)
}()

// entry resent
mm.WaitForExecution0(1, 0)
mm.WaitForAllCommitted(logger)
if mm.lastCommitted < 2 {
t.Fatal("WaitForAllCommitted should not return before 2-executed")
}
}

4 changes: 3 additions & 1 deletion driver/mysql/applier_mts.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ func (mm *MtsManager) WaitForExecution(binlogEntry *common.DataEntry) bool {
}

func (mm *MtsManager) WaitForExecution0(seq int64, lc int64) bool {
mm.lastEnqueue = seq
if mm.lastEnqueue < seq {
mm.lastEnqueue = seq
}

if mm.forceMts {
return true
Expand Down

0 comments on commit 25ab378

Please sign in to comment.