diff --git a/go/mysql/binlog_dump.go b/go/mysql/binlog_dump.go index de91484fc64..cfacf742856 100644 --- a/go/mysql/binlog_dump.go +++ b/go/mysql/binlog_dump.go @@ -76,11 +76,13 @@ func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint6 if !ok { return logFile, logPos, position, readPacketErr } - if gtid := string(data[pos : pos+int(dataSize)]); gtid != "" { - position, err = replication.DecodePosition(gtid) + if gtidBytes := data[pos : pos+int(dataSize)]; len(gtidBytes) != 0 { + gtid, err := replication.NewMysql56GTIDSetFromSIDBlock(gtidBytes) if err != nil { - return logFile, logPos, position, err + return logFile, logPos, position, vterrors.Wrapf(err, "error parsing GTID from BinlogDumpGTID packet") } + // ComBinlogDumpGTID is a MySQL specific protocol. The GTID flavor is necessarily MySQL 56 + position = replication.Position{GTIDSet: gtid} } if flags2&BinlogDumpNonBlock != 0 { return logFile, logPos, position, io.EOF diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index c5424259973..a5993ac9aa0 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -218,7 +218,10 @@ func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilenam } // Build the command. - sidBlock := gtidSet.SIDBlock() + var sidBlock []byte + if gtidSet != nil { + sidBlock = gtidSet.SIDBlock() + } var flags2 uint16 if binlogFilename != "" { flags2 |= BinlogThroughPosition diff --git a/go/mysql/replication.go b/go/mysql/replication.go index 4c5a0c9523e..3dd369cd6fc 100644 --- a/go/mysql/replication.go +++ b/go/mysql/replication.go @@ -81,7 +81,8 @@ func (c *Conn) AnalyzeSemiSyncAckRequest(buf []byte) (strippedBuf []byte, ackReq // WriteComBinlogDumpGTID writes a ComBinlogDumpGTID command. // Only works with MySQL 5.6+ (and not MariaDB). // See http://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html for syntax. -func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16, gtidSet []byte) error { +// sidBlock must be the result of a gtidSet.SIDBlock() function. +func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16, sidBlock []byte) error { c.sequence = 0 length := 1 + // ComBinlogDumpGTID 2 + // flags @@ -90,7 +91,7 @@ func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, bi len(binlogFilename) + // binlog-filename 8 + // binlog-pos 4 + // data-size - len(gtidSet) // data + len(sidBlock) // data data, pos := c.startEphemeralPacketWithHeader(length) pos = writeByte(data, pos, ComBinlogDumpGTID) // nolint pos = writeUint16(data, pos, flags) // nolint @@ -98,8 +99,8 @@ func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, bi pos = writeUint32(data, pos, uint32(len(binlogFilename))) // nolint pos = writeEOFString(data, pos, binlogFilename) // nolint pos = writeUint64(data, pos, binlogPos) // nolint - pos = writeUint32(data, pos, uint32(len(gtidSet))) // nolint - pos += copy(data[pos:], gtidSet) // nolint + pos = writeUint32(data, pos, uint32(len(sidBlock))) // nolint + pos += copy(data[pos:], sidBlock) // nolint if err := c.writeEphemeralPacket(); err != nil { return sqlerror.NewSQLErrorf(sqlerror.CRServerGone, sqlerror.SSUnknownSQLState, "%v", err) } diff --git a/go/mysql/replication_test.go b/go/mysql/replication_test.go index ded22d838f6..3f423624b68 100644 --- a/go/mysql/replication_test.go +++ b/go/mysql/replication_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/test/utils" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -88,14 +89,50 @@ func TestComBinlogDumpGTID(t *testing.T) { cConn.Close() }() + t.Run("WriteComBinlogDumpGTIDEmptyGTID", func(t *testing.T) { + // Write ComBinlogDumpGTID packet, read it, compare. + var flags uint16 = 0x0d0e + err := cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte{}) + assert.NoError(t, err) + data, err := sConn.ReadPacket() + require.NoError(t, err, "sConn.ReadPacket - ComBinlogDumpGTID failed: %v", err) + require.NotEmpty(t, data) + require.EqualValues(t, data[0], ComBinlogDumpGTID) + + expectedData := []byte{ + ComBinlogDumpGTID, + 0x0e, 0x0d, // flags + 0x04, 0x03, 0x02, 0x01, // server-id + 0x07, 0x00, 0x00, 0x00, // binlog-filename-len + 'm', 'o', 'o', 'f', 'a', 'r', 'm', // bilog-filename + 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, // binlog-pos + 0x00, 0x00, 0x00, 0x00, // data-size is zero, no GTID payload + } + assert.Equal(t, expectedData, data) + logFile, logPos, pos, err := sConn.parseComBinlogDumpGTID(data) + require.NoError(t, err, "parseComBinlogDumpGTID failed: %v", err) + assert.Equal(t, "moofarm", logFile) + assert.Equal(t, uint64(0x05060708090a0b0c), logPos) + assert.True(t, pos.IsZero()) + }) + + sConn.sequence = 0 + t.Run("WriteComBinlogDumpGTID", func(t *testing.T) { // Write ComBinlogDumpGTID packet, read it, compare. var flags uint16 = 0x0d0e assert.Equal(t, flags, flags|BinlogThroughGTID) - err := cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte{0xfa, 0xfb}) + gtidSet, err := replication.ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-243") + require.NoError(t, err) + sidBlock := gtidSet.SIDBlock() + assert.Len(t, sidBlock, 48) + + err = cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, sidBlock) assert.NoError(t, err) data, err := sConn.ReadPacket() require.NoError(t, err, "sConn.ReadPacket - ComBinlogDumpGTID failed: %v", err) + require.NotEmpty(t, data) + require.EqualValues(t, data[0], ComBinlogDumpGTID) expectedData := []byte{ ComBinlogDumpGTID, @@ -104,10 +141,15 @@ func TestComBinlogDumpGTID(t *testing.T) { 0x07, 0x00, 0x00, 0x00, // binlog-filename-len 'm', 'o', 'o', 'f', 'a', 'r', 'm', // bilog-filename 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, // binlog-pos - 0x02, 0x00, 0x00, 0x00, // data-size - 0xfa, 0xfb, // data + 0x30, 0x00, 0x00, 0x00, // data-size } + expectedData = append(expectedData, sidBlock...) // data assert.Equal(t, expectedData, data) + logFile, logPos, pos, err := sConn.parseComBinlogDumpGTID(data) + require.NoError(t, err, "parseComBinlogDumpGTID failed: %v", err) + assert.Equal(t, "moofarm", logFile) + assert.Equal(t, uint64(0x05060708090a0b0c), logPos) + assert.Equal(t, gtidSet, pos.GTIDSet) }) sConn.sequence = 0