Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into cluster-watch-range
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Jan 22, 2025
2 parents 22446ab + d1aa2f4 commit 8901696
Show file tree
Hide file tree
Showing 47 changed files with 564 additions and 521 deletions.
7 changes: 6 additions & 1 deletion changelog/22.0/22.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- **[Update default MySQL version to 8.0.40](#mysql-8-0-40)**
- **[Update lite images to Debian Bookworm](#debian-bookworm)**
- **[KeyRanges in `--clusters_to_watch` in VTOrc](#key-range-vtorc)**
- **[Support for Filtering Query logs on Error](#query-logs)**
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
- **[Topology read concurrency behaviour changes](#topo-read-concurrency-changes)**
Expand All @@ -26,7 +27,7 @@

These are the RPC changes made in this release -

1. `GetTransactionInfo` RPC has been added to both `VtctldServer`, and `TabletManagerClient` interface. These RPCs are used to fecilitate the users in reading the state of an unresolved distributed transaction. This can be useful in debugging what went wrong and how to fix the problem.
1. `GetTransactionInfo` RPC has been added to both `VtctldServer`, and `TabletManagerClient` interface. These RPCs are used to facilitate the users in reading the state of an unresolved distributed transaction. This can be useful in debugging what went wrong and how to fix the problem.

### <a id="deprecations-and-deletions"/>Deprecations and Deletions</a>

Expand Down Expand Up @@ -138,6 +139,10 @@ VTOrc now supports specifying KeyRanges in the `--clusters_to_watch` flag. This
For example, if a VTOrc is configured to watch `ks/-80`, then it would watch all the shards that fall under the KeyRange `-80`. If a reshard is run and, `-80` is split into new shards `-40`, and `40-80`, the VTOrc instance will automatically start watching the new shard without needing a restart.
The users can still continue to specify exact key ranges too, and the new feature is backward compatible.

### <a id="query-logs"/>Support for Filtering Query logs on Error</a>

The `querylog-mode` setting can be configured to `error` to log only queries that result in errors. This option is supported in both VTGate and VTTablet.

## <a id="minor-changes"/>Minor Changes</a>

#### <a id="flags-vttablet"/>VTTablet Flags</a>
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ Flags:
--querylog-buffer-size int Maximum number of buffered query logs before throttling log output (default 10)
--querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization
--querylog-format string format for query logs ("text" or "json") (default "text")
--querylog-mode string Mode for logging queries. "error" will only log queries that return an error. Otherwise all queries will be logged. (default "all")
--querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.
--querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)
--queryserver-config-acl-exempt-acl string an acl that exempt from table acl checking (this acl is free to access any vitess tables).
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ Flags:
--querylog-buffer-size int Maximum number of buffered query logs before throttling log output (default 10)
--querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization
--querylog-format string format for query logs ("text" or "json") (default "text")
--querylog-mode string Mode for logging queries. "error" will only log queries that return an error. Otherwise all queries will be logged. (default "all")
--querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.
--querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)
--redact-debug-ui-queries redact full queries and bind variables from debug UI
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ Flags:
--query-log-stream-handler string URL handler for streaming queries log (default "/debug/querylog")
--querylog-filter-tag string string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization
--querylog-format string format for query logs ("text" or "json") (default "text")
--querylog-mode string Mode for logging queries. "error" will only log queries that return an error. Otherwise all queries will be logged. (default "all")
--querylog-row-threshold uint Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.
--querylog-sample-rate float Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)
--queryserver-config-acl-exempt-acl string an acl that exempt from table acl checking (this acl is free to access any vitess tables).
Expand Down
22 changes: 10 additions & 12 deletions go/mysql/binlog_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,19 @@ func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint6
return logFile, logPos, position, readPacketErr
}

if flags2&BinlogDumpNonBlock != 0 {
return logFile, logPos, position, io.EOF
dataSize, pos, ok := readUint32(data, pos)
if !ok {
return logFile, logPos, position, readPacketErr
}
if flags2&BinlogThroughGTID != 0 {
dataSize, pos, ok := readUint32(data, pos)
if !ok {
return logFile, logPos, position, readPacketErr
}
if gtid := string(data[pos : pos+int(dataSize)]); gtid != "" {
position, err = replication.DecodePosition(gtid)
if err != nil {
return logFile, logPos, position, err
}
if gtid := string(data[pos : pos+int(dataSize)]); gtid != "" {
position, err = replication.DecodePosition(gtid)
if err != nil {
return logFile, logPos, position, err
}
}
if flags2&BinlogDumpNonBlock != 0 {
return logFile, logPos, position, io.EOF
}

return logFile, logPos, position, nil
}
9 changes: 8 additions & 1 deletion go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,14 @@ func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilenam

// Build the command.
sidBlock := gtidSet.SIDBlock()
return c.WriteComBinlogDumpGTID(serverID, binlogFilename, 4, 0, sidBlock)
var flags2 uint16
if binlogFilename != "" {
flags2 |= BinlogThroughPosition
}
if len(sidBlock) > 0 {
flags2 |= BinlogThroughGTID
}
return c.WriteComBinlogDumpGTID(serverID, binlogFilename, 4, flags2, sidBlock)
}

// setReplicationPositionCommands is part of the Flavor interface.
Expand Down
15 changes: 15 additions & 0 deletions go/mysql/replication/mysql56_gtid_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,3 +854,18 @@ func TestMysql56GTIDSet_RemoveUUID(t *testing.T) {
})
}
}

func TestSIDs(t *testing.T) {
var set Mysql56GTIDSet // nil
sids := set.SIDs()
assert.NotNil(t, sids)
assert.Empty(t, sids)

gtid := "8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24"
gtidSet, err := ParseMysql56GTIDSet(gtid)
require.NoError(t, err)
sids = gtidSet.SIDs()
assert.NotNil(t, sids)
require.Len(t, sids, 1)
assert.Equal(t, "8bc65cca-3fe4-11ed-bbfb-091034d48b3e", sids[0].String())
}
4 changes: 3 additions & 1 deletion go/mysql/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ func TestComBinlogDumpGTID(t *testing.T) {

t.Run("WriteComBinlogDumpGTID", func(t *testing.T) {
// Write ComBinlogDumpGTID packet, read it, compare.
err := cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, 0x0d0e, []byte{0xfa, 0xfb})
var flags uint16 = 0x0d0e
assert.Equal(t, flags, flags|BinlogThroughGTID)
err := cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte{0xfa, 0xfb})
assert.NoError(t, err)
data, err := sConn.ReadPacket()
require.NoError(t, err, "sConn.ReadPacket - ComBinlogDumpGTID failed: %v", err)
Expand Down
96 changes: 48 additions & 48 deletions go/streamlog/streamlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package streamlog
import (
"fmt"
"io"
rand "math/rand/v2"
"math/rand/v2"
"net/http"
"net/url"
"os"
Expand All @@ -47,40 +47,42 @@ var (
[]string{"Log", "Subscriber"})
)

var (
redactDebugUIQueries bool
queryLogFilterTag string
queryLogRowThreshold uint64
queryLogFormat = "text"
queryLogSampleRate float64
)
const (
// QueryLogFormatText is the format specifier for text querylog output
QueryLogFormatText = "text"

func GetRedactDebugUIQueries() bool {
return redactDebugUIQueries
}
// QueryLogFormatJSON is the format specifier for json querylog output
QueryLogFormatJSON = "json"

func SetRedactDebugUIQueries(newRedactDebugUIQueries bool) {
redactDebugUIQueries = newRedactDebugUIQueries
}
// QueryLogModeAll is the mode specifier for logging all queries
QueryLogModeAll = "all"

func SetQueryLogFilterTag(newQueryLogFilterTag string) {
queryLogFilterTag = newQueryLogFilterTag
}
// QueryLogModeError is the mode specifier for logging only queries that return an error
QueryLogModeError = "error"
)

func SetQueryLogRowThreshold(newQueryLogRowThreshold uint64) {
queryLogRowThreshold = newQueryLogRowThreshold
type QueryLogConfig struct {
RedactDebugUIQueries bool
FilterTag string
Format string
Mode string
RowThreshold uint64
sampleRate float64
}

func SetQueryLogSampleRate(sampleRate float64) {
queryLogSampleRate = sampleRate
var queryLogConfigInstance = QueryLogConfig{
Format: QueryLogFormatText,
Mode: QueryLogModeAll,
}

func GetQueryLogFormat() string {
return queryLogFormat
func GetQueryLogConfig() QueryLogConfig {
return queryLogConfigInstance
}

func SetQueryLogFormat(newQueryLogFormat string) {
queryLogFormat = newQueryLogFormat
func NewQueryLogConfigForTest() QueryLogConfig {
return QueryLogConfig{
Format: QueryLogFormatText,
}
}

func init() {
Expand All @@ -91,28 +93,23 @@ func init() {

func registerStreamLogFlags(fs *pflag.FlagSet) {
// RedactDebugUIQueries controls whether full queries and bind variables are suppressed from debug UIs.
fs.BoolVar(&redactDebugUIQueries, "redact-debug-ui-queries", redactDebugUIQueries, "redact full queries and bind variables from debug UI")
fs.BoolVar(&queryLogConfigInstance.RedactDebugUIQueries, "redact-debug-ui-queries", queryLogConfigInstance.RedactDebugUIQueries, "redact full queries and bind variables from debug UI")

// QueryLogFormat controls the format of the query log (either text or json)
fs.StringVar(&queryLogFormat, "querylog-format", queryLogFormat, "format for query logs (\"text\" or \"json\")")
fs.StringVar(&queryLogConfigInstance.Format, "querylog-format", queryLogConfigInstance.Format, "format for query logs (\"text\" or \"json\")")

// QueryLogFilterTag contains an optional string that must be present in the query for it to be logged
fs.StringVar(&queryLogFilterTag, "querylog-filter-tag", queryLogFilterTag, "string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization")
fs.StringVar(&queryLogConfigInstance.FilterTag, "querylog-filter-tag", queryLogConfigInstance.FilterTag, "string that must be present in the query for it to be logged; if using a value as the tag, you need to disable query normalization")

// QueryLogRowThreshold only log queries returning or affecting this many rows
fs.Uint64Var(&queryLogRowThreshold, "querylog-row-threshold", queryLogRowThreshold, "Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.")
fs.Uint64Var(&queryLogConfigInstance.RowThreshold, "querylog-row-threshold", queryLogConfigInstance.RowThreshold, "Number of rows a query has to return or affect before being logged; not useful for streaming queries. 0 means all queries will be logged.")

// QueryLogSampleRate causes a sample of queries to be logged
fs.Float64Var(&queryLogSampleRate, "querylog-sample-rate", queryLogSampleRate, "Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)")
}

const (
// QueryLogFormatText is the format specifier for text querylog output
QueryLogFormatText = "text"
fs.Float64Var(&queryLogConfigInstance.sampleRate, "querylog-sample-rate", queryLogConfigInstance.sampleRate, "Sample rate for logging queries. Value must be between 0.0 (no logging) and 1.0 (all queries)")

// QueryLogFormatJSON is the format specifier for json querylog output
QueryLogFormatJSON = "json"
)
// QueryLogMode controls the mode for logging queries (all or error)
fs.StringVar(&queryLogConfigInstance.Mode, "querylog-mode", queryLogConfigInstance.Mode, `Mode for logging queries. "error" will only log queries that return an error. Otherwise all queries will be logged.`)
}

// StreamLogger is a non-blocking broadcaster of messages.
// Subscribers can use channels or HTTP.
Expand Down Expand Up @@ -257,27 +254,30 @@ func GetFormatter[T any](logger *StreamLogger[T]) LogFormatter {
}
}

// shouldSampleQuery returns true if a query should be sampled based on queryLogSampleRate
func shouldSampleQuery() bool {
if queryLogSampleRate <= 0 {
// shouldSampleQuery returns true if a query should be sampled based on sampleRate
func (qlConfig QueryLogConfig) shouldSampleQuery() bool {
if qlConfig.sampleRate <= 0 {
return false
} else if queryLogSampleRate >= 1 {
} else if qlConfig.sampleRate >= 1 {
return true
}
return rand.Float64() <= queryLogSampleRate
return rand.Float64() <= qlConfig.sampleRate
}

// ShouldEmitLog returns whether the log with the given SQL query
// should be emitted or filtered
func ShouldEmitLog(sql string, rowsAffected, rowsReturned uint64) bool {
if shouldSampleQuery() {
func (qlConfig QueryLogConfig) ShouldEmitLog(sql string, rowsAffected, rowsReturned uint64, hasError bool) bool {
if qlConfig.shouldSampleQuery() {
return true
}
if queryLogRowThreshold > max(rowsAffected, rowsReturned) && queryLogFilterTag == "" {
if qlConfig.RowThreshold > max(rowsAffected, rowsReturned) && qlConfig.FilterTag == "" {
return false
}
if queryLogFilterTag != "" {
return strings.Contains(sql, queryLogFilterTag)
if qlConfig.FilterTag != "" {
return strings.Contains(sql, qlConfig.FilterTag)
}
if qlConfig.Mode == QueryLogModeError {
return hasError
}
return true
}
79 changes: 29 additions & 50 deletions go/streamlog/streamlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,40 +266,29 @@ func TestFile(t *testing.T) {
}

func TestShouldSampleQuery(t *testing.T) {
queryLogSampleRate = -1
assert.False(t, shouldSampleQuery())
qlConfig := QueryLogConfig{sampleRate: -1}
assert.False(t, qlConfig.shouldSampleQuery())

queryLogSampleRate = 0
assert.False(t, shouldSampleQuery())
qlConfig.sampleRate = 0
assert.False(t, qlConfig.shouldSampleQuery())

// for test coverage, can't test a random result
queryLogSampleRate = 0.5
shouldSampleQuery()
qlConfig.sampleRate = 1.0
assert.True(t, qlConfig.shouldSampleQuery())

queryLogSampleRate = 1.0
assert.True(t, shouldSampleQuery())

queryLogSampleRate = 100.0
assert.True(t, shouldSampleQuery())
qlConfig.sampleRate = 100.0
assert.True(t, qlConfig.shouldSampleQuery())
}

func TestShouldEmitLog(t *testing.T) {
origQueryLogFilterTag := queryLogFilterTag
origQueryLogRowThreshold := queryLogRowThreshold
origQueryLogSampleRate := queryLogSampleRate
defer func() {
SetQueryLogFilterTag(origQueryLogFilterTag)
SetQueryLogRowThreshold(origQueryLogRowThreshold)
SetQueryLogSampleRate(origQueryLogSampleRate)
}()

tests := []struct {
sql string
qLogFilterTag string
qLogRowThreshold uint64
qLogSampleRate float64
qLogMode string
rowsAffected uint64
rowsReturned uint64
errored bool
ok bool
}{
{
Expand Down Expand Up @@ -356,43 +345,33 @@ func TestShouldEmitLog(t *testing.T) {
rowsReturned: 17,
ok: true,
},
{
sql: "log only error - no error",
qLogMode: "error",
errored: false,
ok: false,
},
{
sql: "log only error - errored",
qLogMode: "error",
errored: true,
ok: true,
},
}

for _, tt := range tests {
t.Run(tt.sql, func(t *testing.T) {
SetQueryLogFilterTag(tt.qLogFilterTag)
SetQueryLogRowThreshold(tt.qLogRowThreshold)
SetQueryLogSampleRate(tt.qLogSampleRate)

require.Equal(t, tt.ok, ShouldEmitLog(tt.sql, tt.rowsAffected, tt.rowsReturned))
qlConfig := QueryLogConfig{
FilterTag: tt.qLogFilterTag,
RowThreshold: tt.qLogRowThreshold,
sampleRate: tt.qLogSampleRate,
Mode: tt.qLogMode,
}
require.Equal(t, tt.ok, qlConfig.ShouldEmitLog(tt.sql, tt.rowsAffected, tt.rowsReturned, tt.errored))
})
}
}

func BenchmarkShouldEmitLog(b *testing.B) {
b.Run("default", func(b *testing.B) {
SetQueryLogSampleRate(0.0)
for i := 0; i < b.N; i++ {
ShouldEmitLog("select * from test where user='someone'", 0, 123)
}
})
b.Run("filter_tag", func(b *testing.B) {
SetQueryLogSampleRate(0.0)
SetQueryLogFilterTag("LOG_QUERY")
defer SetQueryLogFilterTag("")
for i := 0; i < b.N; i++ {
ShouldEmitLog("select /* LOG_QUERY=1 */ * from test where user='someone'", 0, 123)
}
})
b.Run("50%_sample_rate", func(b *testing.B) {
SetQueryLogSampleRate(0.5)
defer SetQueryLogSampleRate(0.0)
for i := 0; i < b.N; i++ {
ShouldEmitLog("select * from test where user='someone'", 0, 123)
}
})
}

func TestGetFormatter(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading

0 comments on commit 8901696

Please sign in to comment.