From d4eff3f19fefa0aa27dd62a6198766d3399f42d4 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 22 Jan 2025 08:29:12 +0200 Subject: [PATCH 1/7] parseComBinlogDumpGTID: GTID payload is always 5.6 flavor Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/mysql/binlog_dump.go | 3 ++- go/mysql/replication_test.go | 44 +++++++++++++++++++++++++++++++++--- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/go/mysql/binlog_dump.go b/go/mysql/binlog_dump.go index de91484fc64..cfa37cc0878 100644 --- a/go/mysql/binlog_dump.go +++ b/go/mysql/binlog_dump.go @@ -77,7 +77,8 @@ func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint6 return logFile, logPos, position, readPacketErr } if gtid := string(data[pos : pos+int(dataSize)]); gtid != "" { - position, err = replication.DecodePosition(gtid) + // ComBinlogDumpGTID is a MySQL specific protocol. The GTID flavor is necessarily MySQL 56 + position, _, err = replication.DecodePositionMySQL56(gtid) if err != nil { return logFile, logPos, position, err } diff --git a/go/mysql/replication_test.go b/go/mysql/replication_test.go index ded22d838f6..ca69ec05f2a 100644 --- a/go/mysql/replication_test.go +++ b/go/mysql/replication_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/test/utils" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -88,14 +89,46 @@ func TestComBinlogDumpGTID(t *testing.T) { cConn.Close() }() + t.Run("WriteComBinlogDumpGTIDEmptyGTID", func(t *testing.T) { + // Write ComBinlogDumpGTID packet, read it, compare. + var flags uint16 = 0x0d0e + err := cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte{}) + assert.NoError(t, err) + data, err := sConn.ReadPacket() + require.NoError(t, err, "sConn.ReadPacket - ComBinlogDumpGTID failed: %v", err) + require.NotEmpty(t, data) + require.EqualValues(t, data[0], ComBinlogDumpGTID) + + expectedData := []byte{ + ComBinlogDumpGTID, + 0x0e, 0x0d, // flags + 0x04, 0x03, 0x02, 0x01, // server-id + 0x07, 0x00, 0x00, 0x00, // binlog-filename-len + 'm', 'o', 'o', 'f', 'a', 'r', 'm', // bilog-filename + 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, // binlog-pos + 0x00, 0x00, 0x00, 0x00, // data-size is zero, no GTID payload + } + assert.Equal(t, expectedData, data) + logFile, logPos, pos, err := sConn.parseComBinlogDumpGTID(data) + require.NoError(t, err, "parseComBinlogDumpGTID failed: %v", err) + assert.Equal(t, "moofarm", logFile) + assert.Equal(t, uint64(0x05060708090a0b0c), logPos) + assert.True(t, pos.IsZero()) + }) + t.Run("WriteComBinlogDumpGTID", func(t *testing.T) { // Write ComBinlogDumpGTID packet, read it, compare. var flags uint16 = 0x0d0e assert.Equal(t, flags, flags|BinlogThroughGTID) - err := cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte{0xfa, 0xfb}) + gtidSet, err := replication.ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-243") + require.NoError(t, err) + // dataSize := uint32(len(gtidSet.String())) + err = cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte(gtidSet.String())) assert.NoError(t, err) data, err := sConn.ReadPacket() require.NoError(t, err, "sConn.ReadPacket - ComBinlogDumpGTID failed: %v", err) + require.NotEmpty(t, data) + require.EqualValues(t, data[0], ComBinlogDumpGTID) expectedData := []byte{ ComBinlogDumpGTID, @@ -104,10 +137,15 @@ func TestComBinlogDumpGTID(t *testing.T) { 0x07, 0x00, 0x00, 0x00, // binlog-filename-len 'm', 'o', 'o', 'f', 'a', 'r', 'm', // bilog-filename 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, // binlog-pos - 0x02, 0x00, 0x00, 0x00, // data-size - 0xfa, 0xfb, // data + 0x02a, 0x00, 0x00, 0x00, // data-size + 0x31, 0x36, 0x62, 0x31, 0x30, 0x33, 0x39, 0x66, 0x2d, 0x32, 0x32, 0x62, 0x36, 0x2d, 0x31, 0x31, 0x65, 0x64, 0x2d, 0x62, 0x37, 0x36, 0x35, 0x2d, 0x30, 0x61, 0x34, 0x33, 0x66, 0x39, 0x35, 0x66, 0x32, 0x38, 0x61, 0x33, 0x3a, 0x31, 0x2d, 0x32, 0x34, 0x33, // data } assert.Equal(t, expectedData, data) + logFile, logPos, pos, err := sConn.parseComBinlogDumpGTID(data) + require.NoError(t, err, "parseComBinlogDumpGTID failed: %v", err) + assert.Equal(t, "moofarm", logFile) + assert.Equal(t, uint64(0x05060708090a0b0c), logPos) + assert.Equal(t, gtidSet, pos.GTIDSet) }) sConn.sequence = 0 From efed7e5664e8ee63cf5ddd0930c059b36a4f3d9d Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 22 Jan 2025 15:29:06 +0200 Subject: [PATCH 2/7] parse sidBlock Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/mysql/binlog_dump.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/go/mysql/binlog_dump.go b/go/mysql/binlog_dump.go index cfa37cc0878..cfacf742856 100644 --- a/go/mysql/binlog_dump.go +++ b/go/mysql/binlog_dump.go @@ -76,12 +76,13 @@ func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint6 if !ok { return logFile, logPos, position, readPacketErr } - if gtid := string(data[pos : pos+int(dataSize)]); gtid != "" { - // ComBinlogDumpGTID is a MySQL specific protocol. The GTID flavor is necessarily MySQL 56 - position, _, err = replication.DecodePositionMySQL56(gtid) + if gtidBytes := data[pos : pos+int(dataSize)]; len(gtidBytes) != 0 { + gtid, err := replication.NewMysql56GTIDSetFromSIDBlock(gtidBytes) if err != nil { - return logFile, logPos, position, err + return logFile, logPos, position, vterrors.Wrapf(err, "error parsing GTID from BinlogDumpGTID packet") } + // ComBinlogDumpGTID is a MySQL specific protocol. The GTID flavor is necessarily MySQL 56 + position = replication.Position{GTIDSet: gtid} } if flags2&BinlogDumpNonBlock != 0 { return logFile, logPos, position, io.EOF From a61fc1424f041b59b9b5487f2e57d9d06a20299b Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 22 Jan 2025 15:30:51 +0200 Subject: [PATCH 3/7] check gtidSet for nil Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/mysql/flavor_mysql.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index c5424259973..a5993ac9aa0 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -218,7 +218,10 @@ func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilenam } // Build the command. - sidBlock := gtidSet.SIDBlock() + var sidBlock []byte + if gtidSet != nil { + sidBlock = gtidSet.SIDBlock() + } var flags2 uint16 if binlogFilename != "" { flags2 |= BinlogThroughPosition From c1fc1cd09b1a76262eab875e2056da1b977ce4f4 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 22 Jan 2025 15:31:53 +0200 Subject: [PATCH 4/7] write sidBlock instead of gtid bytes representation Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/mysql/replication_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/go/mysql/replication_test.go b/go/mysql/replication_test.go index ca69ec05f2a..347179e9ca7 100644 --- a/go/mysql/replication_test.go +++ b/go/mysql/replication_test.go @@ -122,8 +122,10 @@ func TestComBinlogDumpGTID(t *testing.T) { assert.Equal(t, flags, flags|BinlogThroughGTID) gtidSet, err := replication.ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-243") require.NoError(t, err) - // dataSize := uint32(len(gtidSet.String())) - err = cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte(gtidSet.String())) + sidBlock := gtidSet.SIDBlock() + assert.Len(t, sidBlock, 0x30) // 48 bytes + + err = cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, sidBlock) assert.NoError(t, err) data, err := sConn.ReadPacket() require.NoError(t, err, "sConn.ReadPacket - ComBinlogDumpGTID failed: %v", err) @@ -137,9 +139,9 @@ func TestComBinlogDumpGTID(t *testing.T) { 0x07, 0x00, 0x00, 0x00, // binlog-filename-len 'm', 'o', 'o', 'f', 'a', 'r', 'm', // bilog-filename 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, // binlog-pos - 0x02a, 0x00, 0x00, 0x00, // data-size - 0x31, 0x36, 0x62, 0x31, 0x30, 0x33, 0x39, 0x66, 0x2d, 0x32, 0x32, 0x62, 0x36, 0x2d, 0x31, 0x31, 0x65, 0x64, 0x2d, 0x62, 0x37, 0x36, 0x35, 0x2d, 0x30, 0x61, 0x34, 0x33, 0x66, 0x39, 0x35, 0x66, 0x32, 0x38, 0x61, 0x33, 0x3a, 0x31, 0x2d, 0x32, 0x34, 0x33, // data + 0x30, 0x00, 0x00, 0x00, // data-size } + expectedData = append(expectedData, sidBlock...) // data assert.Equal(t, expectedData, data) logFile, logPos, pos, err := sConn.parseComBinlogDumpGTID(data) require.NoError(t, err, "parseComBinlogDumpGTID failed: %v", err) From 51587780410374934c7cc94bb6f92faed2f4a987 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 22 Jan 2025 15:34:29 +0200 Subject: [PATCH 5/7] clear sequence Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/mysql/replication_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/mysql/replication_test.go b/go/mysql/replication_test.go index 347179e9ca7..667582b8dc0 100644 --- a/go/mysql/replication_test.go +++ b/go/mysql/replication_test.go @@ -116,6 +116,8 @@ func TestComBinlogDumpGTID(t *testing.T) { assert.True(t, pos.IsZero()) }) + sConn.sequence = 0 + t.Run("WriteComBinlogDumpGTID", func(t *testing.T) { // Write ComBinlogDumpGTID packet, read it, compare. var flags uint16 = 0x0d0e From ab8757598b33058a1b30f3d8b28d1d0334ff9477 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 22 Jan 2025 16:30:14 +0200 Subject: [PATCH 6/7] rename variable to sidBlock Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/mysql/replication.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/go/mysql/replication.go b/go/mysql/replication.go index 4c5a0c9523e..3dd369cd6fc 100644 --- a/go/mysql/replication.go +++ b/go/mysql/replication.go @@ -81,7 +81,8 @@ func (c *Conn) AnalyzeSemiSyncAckRequest(buf []byte) (strippedBuf []byte, ackReq // WriteComBinlogDumpGTID writes a ComBinlogDumpGTID command. // Only works with MySQL 5.6+ (and not MariaDB). // See http://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html for syntax. -func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16, gtidSet []byte) error { +// sidBlock must be the result of a gtidSet.SIDBlock() function. +func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16, sidBlock []byte) error { c.sequence = 0 length := 1 + // ComBinlogDumpGTID 2 + // flags @@ -90,7 +91,7 @@ func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, bi len(binlogFilename) + // binlog-filename 8 + // binlog-pos 4 + // data-size - len(gtidSet) // data + len(sidBlock) // data data, pos := c.startEphemeralPacketWithHeader(length) pos = writeByte(data, pos, ComBinlogDumpGTID) // nolint pos = writeUint16(data, pos, flags) // nolint @@ -98,8 +99,8 @@ func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, bi pos = writeUint32(data, pos, uint32(len(binlogFilename))) // nolint pos = writeEOFString(data, pos, binlogFilename) // nolint pos = writeUint64(data, pos, binlogPos) // nolint - pos = writeUint32(data, pos, uint32(len(gtidSet))) // nolint - pos += copy(data[pos:], gtidSet) // nolint + pos = writeUint32(data, pos, uint32(len(sidBlock))) // nolint + pos += copy(data[pos:], sidBlock) // nolint if err := c.writeEphemeralPacket(); err != nil { return sqlerror.NewSQLErrorf(sqlerror.CRServerGone, sqlerror.SSUnknownSQLState, "%v", err) } From d574d77543deae0795f5ad6fde0745311189c684 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 22 Jan 2025 16:30:30 +0200 Subject: [PATCH 7/7] using decimal value in test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/mysql/replication_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/mysql/replication_test.go b/go/mysql/replication_test.go index 667582b8dc0..3f423624b68 100644 --- a/go/mysql/replication_test.go +++ b/go/mysql/replication_test.go @@ -125,7 +125,7 @@ func TestComBinlogDumpGTID(t *testing.T) { gtidSet, err := replication.ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-243") require.NoError(t, err) sidBlock := gtidSet.SIDBlock() - assert.Len(t, sidBlock, 0x30) // 48 bytes + assert.Len(t, sidBlock, 48) err = cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, sidBlock) assert.NoError(t, err)