Skip to content

Commit

Permalink
Refactor Disk Stall implementation and mark tablet not serving if dis…
Browse files Browse the repository at this point in the history
…k is stalled (#17624)

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Jan 28, 2025
1 parent 8921bce commit c2f4dcf
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 66 deletions.
30 changes: 20 additions & 10 deletions go/vt/proto/replicationdata/replicationdata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions go/vt/proto/replicationdata/replicationdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,10 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named

fs, err = fullStatus(tabletAlias)
if err != nil {
if config.GetStalledDiskPrimaryRecovery() && strings.Contains(err.Error(), "stalled disk") {
stalledDisk = true
}
goto Cleanup
}
if config.GetStalledDiskPrimaryRecovery() && fs.DiskStalled {
stalledDisk = true
goto Cleanup
}
partialSuccess = true // We at least managed to read something from the server.
Expand Down
12 changes: 7 additions & 5 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package tabletmanager

import (
"context"
"errors"
"fmt"
"runtime"
"strings"
Expand Down Expand Up @@ -62,10 +61,13 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful
return nil, err
}

// Return error if the disk is stalled or rejecting writes.
// Noop by default, must be enabled with the flag "disk-write-dir".
if tm.dhMonitor.IsDiskStalled() {
return nil, errors.New("stalled disk")
// Return if the disk is stalled or rejecting writes.
// If the disk is stalled, we can't be sure if reads will go through
// or not, so we should not run any reads either.
if tm.QueryServiceControl.IsDiskStalled() {
return &replicationdatapb.FullStatus{
DiskStalled: true,
}, nil
}

// Server ID - "select @@global.server_id"
Expand Down
12 changes: 2 additions & 10 deletions go/vt/vttablet/tabletmanager/tm_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,8 @@ var (
skipBuildInfoTags = "/.*/"
initTags flagutil.StringMapValue

initTimeout = 1 * time.Minute
mysqlShutdownTimeout = mysqlctl.DefaultShutdownTimeout
stalledDiskWriteDir = ""
stalledDiskWriteTimeout = 30 * time.Second
stalledDiskWriteInterval = 5 * time.Second
initTimeout = 1 * time.Minute
mysqlShutdownTimeout = mysqlctl.DefaultShutdownTimeout
)

func registerInitFlags(fs *pflag.FlagSet) {
Expand All @@ -112,9 +109,6 @@ func registerInitFlags(fs *pflag.FlagSet) {
fs.Var(&initTags, "init_tags", "(init parameter) comma separated list of key:value pairs used to tag the tablet")
fs.DurationVar(&initTimeout, "init_timeout", initTimeout, "(init parameter) timeout to use for the init phase.")
fs.DurationVar(&mysqlShutdownTimeout, "mysql-shutdown-timeout", mysqlShutdownTimeout, "timeout to use when MySQL is being shut down.")
fs.StringVar(&stalledDiskWriteDir, "disk-write-dir", stalledDiskWriteDir, "if provided, tablet will attempt to write a file to this directory to check if the disk is stalled")
fs.DurationVar(&stalledDiskWriteTimeout, "disk-write-timeout", stalledDiskWriteTimeout, "if writes exceed this duration, the disk is considered stalled")
fs.DurationVar(&stalledDiskWriteInterval, "disk-write-interval", stalledDiskWriteInterval, "how often to write to the disk to check whether it is stalled")
}

var (
Expand Down Expand Up @@ -170,7 +164,6 @@ type TabletManager struct {
VREngine *vreplication.Engine
VDiffEngine *vdiff.Engine
Env *vtenv.Environment
dhMonitor DiskHealthMonitor

// tmc is used to run an RPC against other vttablets.
tmc tmclient.TabletManagerClient
Expand Down Expand Up @@ -379,7 +372,6 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, config *tabletenv.Tabl
tm.tmc = tmclient.NewTabletManagerClient()
tm.tmState = newTMState(tm, tablet)
tm.actionSema = semaphore.NewWeighted(1)
tm.dhMonitor = newDiskHealthMonitor(tm.BatchCtx)
tm._waitForGrantsComplete = make(chan struct{})

tm.baseTabletType = tablet.Type
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ type Controller interface {

// SetDemotePrimaryStalled marks that demote primary is stalled in the state manager.
SetDemotePrimaryStalled()

// IsDiskStalled returns if the disk is stalled.
IsDiskStalled() bool
}

// Ensure TabletServer satisfies Controller interface.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
package tabletmanager
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tabletserver

import (
"context"
Expand All @@ -7,8 +23,29 @@ import (
"strconv"
"sync"
"time"

"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/servenv"
)

var (
stalledDiskWriteDir = ""
stalledDiskWriteTimeout = 30 * time.Second
stalledDiskWriteInterval = 5 * time.Second
)

func init() {
servenv.OnParseFor("vtcombo", registerInitFlags)
servenv.OnParseFor("vttablet", registerInitFlags)
}

func registerInitFlags(fs *pflag.FlagSet) {
fs.StringVar(&stalledDiskWriteDir, "disk-write-dir", stalledDiskWriteDir, "if provided, tablet will attempt to write a file to this directory to check if the disk is stalled")
fs.DurationVar(&stalledDiskWriteTimeout, "disk-write-timeout", stalledDiskWriteTimeout, "if writes exceed this duration, the disk is considered stalled")
fs.DurationVar(&stalledDiskWriteInterval, "disk-write-interval", stalledDiskWriteInterval, "how often to write to the disk to check whether it is stalled")
}

type DiskHealthMonitor interface {
// IsDiskStalled returns true if the disk is stalled or rejecting writes.
IsDiskStalled() bool
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
package tabletmanager
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tabletserver

import (
"context"
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type stateManager struct {
replHealthy bool
demotePrimaryStalled bool
lameduck bool
diskHealthMonitor DiskHealthMonitor
alsoAllow []topodatapb.TabletType
reason string
transitionErr error
Expand Down Expand Up @@ -777,7 +778,7 @@ func (sm *stateManager) IsServing() bool {
}

func (sm *stateManager) isServingLocked() bool {
return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.demotePrimaryStalled && !sm.lameduck
return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.demotePrimaryStalled && !sm.lameduck && !sm.diskHealthMonitor.IsDiskStalled()
}

func (sm *stateManager) AppendDetails(details []*kv) []*kv {
Expand Down
62 changes: 44 additions & 18 deletions go/vt/vttablet/tabletserver/state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import (
var testNow = time.Now()

func TestStateManagerStateByName(t *testing.T) {
sm := &stateManager{}
sm := &stateManager{
diskHealthMonitor: newNoopDiskHealthMonitor(),
}

sm.replHealthy = true
sm.wantState = StateServing
Expand Down Expand Up @@ -147,6 +149,29 @@ func TestStateManagerUnservePrimary(t *testing.T) {
assert.Equal(t, StateNotServing, sm.state)
}

type testDiskMonitor struct {
isDiskStalled bool
}

func (t *testDiskMonitor) IsDiskStalled() bool {
return t.isDiskStalled
}

// TestIsServingLocked tests isServingLocked() functionality.
func TestIsServingLocked(t *testing.T) {
sm := newTestStateManager()
defer sm.StopService()
tdm := &testDiskMonitor{isDiskStalled: false}
sm.diskHealthMonitor = tdm

err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "")
require.NoError(t, err)
require.True(t, sm.isServingLocked())

tdm.isDiskStalled = true
require.False(t, sm.isServingLocked())
}

func TestStateManagerUnserveNonPrimary(t *testing.T) {
sm := newTestStateManager()
defer sm.StopService()
Expand Down Expand Up @@ -778,23 +803,24 @@ func newTestStateManager() *stateManager {
parser := sqlparser.NewTestParser()
env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "StateManagerTest")
sm := &stateManager{
statelessql: NewQueryList("stateless", parser),
statefulql: NewQueryList("stateful", parser),
olapql: NewQueryList("olap", parser),
hs: newHealthStreamer(env, &topodatapb.TabletAlias{}, schema.NewEngine(env)),
se: &testSchemaEngine{},
rt: &testReplTracker{lag: 1 * time.Second},
vstreamer: &testSubcomponent{},
tracker: &testSubcomponent{},
watcher: &testSubcomponent{},
qe: &testQueryEngine{},
txThrottler: &testTxThrottler{},
te: &testTxEngine{},
messager: &testSubcomponent{},
ddle: &testOnlineDDLExecutor{},
throttler: &testLagThrottler{},
tableGC: &testTableGC{},
rw: newRequestsWaiter(),
statelessql: NewQueryList("stateless", parser),
statefulql: NewQueryList("stateful", parser),
olapql: NewQueryList("olap", parser),
hs: newHealthStreamer(env, &topodatapb.TabletAlias{}, schema.NewEngine(env)),
se: &testSchemaEngine{},
rt: &testReplTracker{lag: 1 * time.Second},
vstreamer: &testSubcomponent{},
tracker: &testSubcomponent{},
watcher: &testSubcomponent{},
qe: &testQueryEngine{},
txThrottler: &testTxThrottler{},
te: &testTxEngine{},
messager: &testSubcomponent{},
ddle: &testOnlineDDLExecutor{},
diskHealthMonitor: newNoopDiskHealthMonitor(),
throttler: &testLagThrottler{},
tableGC: &testTableGC{},
rw: newRequestsWaiter(),
}
sm.Init(env, &querypb.Target{})
sm.hs.InitDBConfig(&querypb.Target{})
Expand Down
Loading

0 comments on commit c2f4dcf

Please sign in to comment.