Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parseComBinlogDumpGTID: GTID payload is always 5.6 flavor #17605

Merged
merged 7 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions go/mysql/binlog_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint6
if !ok {
return logFile, logPos, position, readPacketErr
}
if gtid := string(data[pos : pos+int(dataSize)]); gtid != "" {
position, err = replication.DecodePosition(gtid)
if gtidBytes := data[pos : pos+int(dataSize)]; len(gtidBytes) != 0 {
gtid, err := replication.NewMysql56GTIDSetFromSIDBlock(gtidBytes)
if err != nil {
return logFile, logPos, position, err
return logFile, logPos, position, vterrors.Wrapf(err, "error parsing GTID from BinlogDumpGTID packet")
}
// ComBinlogDumpGTID is a MySQL specific protocol. The GTID flavor is necessarily MySQL 56
position = replication.Position{GTIDSet: gtid}
}
if flags2&BinlogDumpNonBlock != 0 {
return logFile, logPos, position, io.EOF
Expand Down
5 changes: 4 additions & 1 deletion go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilenam
}

// Build the command.
sidBlock := gtidSet.SIDBlock()
var sidBlock []byte
if gtidSet != nil {
sidBlock = gtidSet.SIDBlock()
}
var flags2 uint16
if binlogFilename != "" {
flags2 |= BinlogThroughPosition
Expand Down
9 changes: 5 additions & 4 deletions go/mysql/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (c *Conn) AnalyzeSemiSyncAckRequest(buf []byte) (strippedBuf []byte, ackReq
// WriteComBinlogDumpGTID writes a ComBinlogDumpGTID command.
// Only works with MySQL 5.6+ (and not MariaDB).
// See http://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html for syntax.
func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16, gtidSet []byte) error {
// sidBlock must be the result of a gtidSet.SIDBlock() function.
func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16, sidBlock []byte) error {
c.sequence = 0
length := 1 + // ComBinlogDumpGTID
2 + // flags
Expand All @@ -90,16 +91,16 @@ func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, bi
len(binlogFilename) + // binlog-filename
8 + // binlog-pos
4 + // data-size
len(gtidSet) // data
len(sidBlock) // data
data, pos := c.startEphemeralPacketWithHeader(length)
pos = writeByte(data, pos, ComBinlogDumpGTID) // nolint
pos = writeUint16(data, pos, flags) // nolint
pos = writeUint32(data, pos, serverID) // nolint
pos = writeUint32(data, pos, uint32(len(binlogFilename))) // nolint
pos = writeEOFString(data, pos, binlogFilename) // nolint
pos = writeUint64(data, pos, binlogPos) // nolint
pos = writeUint32(data, pos, uint32(len(gtidSet))) // nolint
pos += copy(data[pos:], gtidSet) // nolint
pos = writeUint32(data, pos, uint32(len(sidBlock))) // nolint
pos += copy(data[pos:], sidBlock) // nolint
if err := c.writeEphemeralPacket(); err != nil {
return sqlerror.NewSQLErrorf(sqlerror.CRServerGone, sqlerror.SSUnknownSQLState, "%v", err)
}
Expand Down
48 changes: 45 additions & 3 deletions go/mysql/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/test/utils"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -88,14 +89,50 @@ func TestComBinlogDumpGTID(t *testing.T) {
cConn.Close()
}()

t.Run("WriteComBinlogDumpGTIDEmptyGTID", func(t *testing.T) {
// Write ComBinlogDumpGTID packet, read it, compare.
var flags uint16 = 0x0d0e
err := cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte{})
assert.NoError(t, err)
data, err := sConn.ReadPacket()
require.NoError(t, err, "sConn.ReadPacket - ComBinlogDumpGTID failed: %v", err)
require.NotEmpty(t, data)
require.EqualValues(t, data[0], ComBinlogDumpGTID)

expectedData := []byte{
ComBinlogDumpGTID,
0x0e, 0x0d, // flags
0x04, 0x03, 0x02, 0x01, // server-id
0x07, 0x00, 0x00, 0x00, // binlog-filename-len
'm', 'o', 'o', 'f', 'a', 'r', 'm', // bilog-filename
0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, // binlog-pos
0x00, 0x00, 0x00, 0x00, // data-size is zero, no GTID payload
}
assert.Equal(t, expectedData, data)
logFile, logPos, pos, err := sConn.parseComBinlogDumpGTID(data)
require.NoError(t, err, "parseComBinlogDumpGTID failed: %v", err)
assert.Equal(t, "moofarm", logFile)
assert.Equal(t, uint64(0x05060708090a0b0c), logPos)
assert.True(t, pos.IsZero())
})

sConn.sequence = 0

t.Run("WriteComBinlogDumpGTID", func(t *testing.T) {
// Write ComBinlogDumpGTID packet, read it, compare.
var flags uint16 = 0x0d0e
assert.Equal(t, flags, flags|BinlogThroughGTID)
err := cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte{0xfa, 0xfb})
gtidSet, err := replication.ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-243")
require.NoError(t, err)
sidBlock := gtidSet.SIDBlock()
assert.Len(t, sidBlock, 48)

err = cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, sidBlock)
assert.NoError(t, err)
data, err := sConn.ReadPacket()
require.NoError(t, err, "sConn.ReadPacket - ComBinlogDumpGTID failed: %v", err)
require.NotEmpty(t, data)
require.EqualValues(t, data[0], ComBinlogDumpGTID)

expectedData := []byte{
ComBinlogDumpGTID,
Expand All @@ -104,10 +141,15 @@ func TestComBinlogDumpGTID(t *testing.T) {
0x07, 0x00, 0x00, 0x00, // binlog-filename-len
'm', 'o', 'o', 'f', 'a', 'r', 'm', // bilog-filename
0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, // binlog-pos
0x02, 0x00, 0x00, 0x00, // data-size
0xfa, 0xfb, // data
0x30, 0x00, 0x00, 0x00, // data-size
}
expectedData = append(expectedData, sidBlock...) // data
assert.Equal(t, expectedData, data)
logFile, logPos, pos, err := sConn.parseComBinlogDumpGTID(data)
require.NoError(t, err, "parseComBinlogDumpGTID failed: %v", err)
assert.Equal(t, "moofarm", logFile)
assert.Equal(t, uint64(0x05060708090a0b0c), logPos)
assert.Equal(t, gtidSet, pos.GTIDSet)
})

sConn.sequence = 0
Expand Down
Loading