diff --git a/go/vt/proto/replicationdata/replicationdata.pb.go b/go/vt/proto/replicationdata/replicationdata.pb.go index d5462e1ea2b..bb973577d55 100644 --- a/go/vt/proto/replicationdata/replicationdata.pb.go +++ b/go/vt/proto/replicationdata/replicationdata.pb.go @@ -509,6 +509,7 @@ type FullStatus struct { SemiSyncWaitForReplicaCount uint32 `protobuf:"varint,20,opt,name=semi_sync_wait_for_replica_count,json=semiSyncWaitForReplicaCount,proto3" json:"semi_sync_wait_for_replica_count,omitempty"` SuperReadOnly bool `protobuf:"varint,21,opt,name=super_read_only,json=superReadOnly,proto3" json:"super_read_only,omitempty"` ReplicationConfiguration *Configuration `protobuf:"bytes,22,opt,name=replication_configuration,json=replicationConfiguration,proto3" json:"replication_configuration,omitempty"` + DiskStalled bool `protobuf:"varint,23,opt,name=disk_stalled,json=diskStalled,proto3" json:"disk_stalled,omitempty"` } func (x *FullStatus) Reset() { @@ -695,6 +696,13 @@ func (x *FullStatus) GetReplicationConfiguration() *Configuration { return nil } +func (x *FullStatus) GetDiskStalled() bool { + if x != nil { + return x.DiskStalled + } + return false +} + var File_replicationdata_proto protoreflect.FileDescriptor var file_replicationdata_proto_rawDesc = []byte{ @@ -782,7 +790,7 @@ var file_replicationdata_proto_rawDesc = []byte{ 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x66, 0x69, 0x6c, 0x65, 0x50, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x75, 0x75, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x55, 0x75, 0x69, 0x64, 0x22, 0xc8, 0x08, 0x0a, 0x0a, 0x46, 0x75, 0x6c, 0x6c, 0x53, 0x74, 0x61, + 0x55, 0x75, 0x69, 0x64, 0x22, 0xeb, 0x08, 0x0a, 0x0a, 0x46, 0x75, 0x6c, 0x6c, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x75, 0x75, 0x69, 0x64, 0x18, @@ -850,15 +858,17 @@ var file_replicationdata_proto_rawDesc = []byte{ 0x16, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x18, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2a, - 0x3b, 0x0a, 0x13, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x0e, 0x49, 0x4f, 0x41, 0x4e, 0x44, 0x53, - 0x51, 0x4c, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x10, 0x00, 0x12, 0x10, 0x0a, 0x0c, 0x49, 0x4f, - 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x4f, 0x4e, 0x4c, 0x59, 0x10, 0x01, 0x42, 0x2e, 0x5a, 0x2c, - 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, - 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x72, 0x65, 0x70, - 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, + 0x21, 0x0a, 0x0c, 0x64, 0x69, 0x73, 0x6b, 0x5f, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x65, 0x64, 0x18, + 0x17, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x64, 0x69, 0x73, 0x6b, 0x53, 0x74, 0x61, 0x6c, 0x6c, + 0x65, 0x64, 0x2a, 0x3b, 0x0a, 0x13, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x12, 0x0a, 0x0e, 0x49, 0x4f, 0x41, + 0x4e, 0x44, 0x53, 0x51, 0x4c, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x10, 0x00, 0x12, 0x10, 0x0a, + 0x0c, 0x49, 0x4f, 0x54, 0x48, 0x52, 0x45, 0x41, 0x44, 0x4f, 0x4e, 0x4c, 0x59, 0x10, 0x01, 0x42, + 0x2e, 0x5a, 0x2c, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, + 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, + 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x64, 0x61, 0x74, 0x61, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go b/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go index b3d638a1327..a515397c065 100644 --- a/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go +++ b/go/vt/proto/replicationdata/replicationdata_vtproto.pb.go @@ -142,6 +142,7 @@ func (m *FullStatus) CloneVT() *FullStatus { r.SemiSyncWaitForReplicaCount = m.SemiSyncWaitForReplicaCount r.SuperReadOnly = m.SuperReadOnly r.ReplicationConfiguration = m.ReplicationConfiguration.CloneVT() + r.DiskStalled = m.DiskStalled if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -552,6 +553,18 @@ func (m *FullStatus) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.DiskStalled { + i-- + if m.DiskStalled { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0xb8 + } if m.ReplicationConfiguration != nil { size, err := m.ReplicationConfiguration.MarshalToSizedBufferVT(dAtA[:i]) if err != nil { @@ -975,6 +988,9 @@ func (m *FullStatus) SizeVT() (n int) { l = m.ReplicationConfiguration.SizeVT() n += 2 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.DiskStalled { + n += 3 + } n += len(m.unknownFields) return n } @@ -2551,6 +2567,26 @@ func (m *FullStatus) UnmarshalVT(dAtA []byte) error { return err } iNdEx = postIndex + case 23: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field DiskStalled", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.DiskStalled = bool(v != 0) default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index f92a15079dd..9e35e6e3e0b 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -206,9 +206,10 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named fs, err = fullStatus(tabletAlias) if err != nil { - if config.GetStalledDiskPrimaryRecovery() && strings.Contains(err.Error(), "stalled disk") { - stalledDisk = true - } + goto Cleanup + } + if config.GetStalledDiskPrimaryRecovery() && fs.DiskStalled { + stalledDisk = true goto Cleanup } partialSuccess = true // We at least managed to read something from the server. diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index b27b25d87c6..dec94ee6f16 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -18,7 +18,6 @@ package tabletmanager import ( "context" - "errors" "fmt" "runtime" "strings" @@ -62,10 +61,13 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful return nil, err } - // Return error if the disk is stalled or rejecting writes. - // Noop by default, must be enabled with the flag "disk-write-dir". - if tm.dhMonitor.IsDiskStalled() { - return nil, errors.New("stalled disk") + // Return if the disk is stalled or rejecting writes. + // If the disk is stalled, we can't be sure if reads will go through + // or not, so we should not run any reads either. + if tm.QueryServiceControl.IsDiskStalled() { + return &replicationdatapb.FullStatus{ + DiskStalled: true, + }, nil } // Server ID - "select @@global.server_id" diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index c22ea0a6e51..fbef04de357 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -95,11 +95,8 @@ var ( skipBuildInfoTags = "/.*/" initTags flagutil.StringMapValue - initTimeout = 1 * time.Minute - mysqlShutdownTimeout = mysqlctl.DefaultShutdownTimeout - stalledDiskWriteDir = "" - stalledDiskWriteTimeout = 30 * time.Second - stalledDiskWriteInterval = 5 * time.Second + initTimeout = 1 * time.Minute + mysqlShutdownTimeout = mysqlctl.DefaultShutdownTimeout ) func registerInitFlags(fs *pflag.FlagSet) { @@ -112,9 +109,6 @@ func registerInitFlags(fs *pflag.FlagSet) { fs.Var(&initTags, "init_tags", "(init parameter) comma separated list of key:value pairs used to tag the tablet") fs.DurationVar(&initTimeout, "init_timeout", initTimeout, "(init parameter) timeout to use for the init phase.") fs.DurationVar(&mysqlShutdownTimeout, "mysql-shutdown-timeout", mysqlShutdownTimeout, "timeout to use when MySQL is being shut down.") - fs.StringVar(&stalledDiskWriteDir, "disk-write-dir", stalledDiskWriteDir, "if provided, tablet will attempt to write a file to this directory to check if the disk is stalled") - fs.DurationVar(&stalledDiskWriteTimeout, "disk-write-timeout", stalledDiskWriteTimeout, "if writes exceed this duration, the disk is considered stalled") - fs.DurationVar(&stalledDiskWriteInterval, "disk-write-interval", stalledDiskWriteInterval, "how often to write to the disk to check whether it is stalled") } var ( @@ -170,7 +164,6 @@ type TabletManager struct { VREngine *vreplication.Engine VDiffEngine *vdiff.Engine Env *vtenv.Environment - dhMonitor DiskHealthMonitor // tmc is used to run an RPC against other vttablets. tmc tmclient.TabletManagerClient @@ -379,7 +372,6 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, config *tabletenv.Tabl tm.tmc = tmclient.NewTabletManagerClient() tm.tmState = newTMState(tm, tablet) tm.actionSema = semaphore.NewWeighted(1) - tm.dhMonitor = newDiskHealthMonitor(tm.BatchCtx) tm._waitForGrantsComplete = make(chan struct{}) tm.baseTabletType = tablet.Type diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index c4a4bef99fc..ab2875ae27b 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -122,6 +122,9 @@ type Controller interface { // SetDemotePrimaryStalled marks that demote primary is stalled in the state manager. SetDemotePrimaryStalled() + + // IsDiskStalled returns if the disk is stalled. + IsDiskStalled() bool } // Ensure TabletServer satisfies Controller interface. diff --git a/go/vt/vttablet/tabletmanager/disk_health_monitor.go b/go/vt/vttablet/tabletserver/disk_health_monitor.go similarity index 67% rename from go/vt/vttablet/tabletmanager/disk_health_monitor.go rename to go/vt/vttablet/tabletserver/disk_health_monitor.go index e35bc662a12..f477f7fd30c 100644 --- a/go/vt/vttablet/tabletmanager/disk_health_monitor.go +++ b/go/vt/vttablet/tabletserver/disk_health_monitor.go @@ -1,4 +1,20 @@ -package tabletmanager +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tabletserver import ( "context" @@ -7,8 +23,29 @@ import ( "strconv" "sync" "time" + + "github.com/spf13/pflag" + + "vitess.io/vitess/go/vt/servenv" ) +var ( + stalledDiskWriteDir = "" + stalledDiskWriteTimeout = 30 * time.Second + stalledDiskWriteInterval = 5 * time.Second +) + +func init() { + servenv.OnParseFor("vtcombo", registerInitFlags) + servenv.OnParseFor("vttablet", registerInitFlags) +} + +func registerInitFlags(fs *pflag.FlagSet) { + fs.StringVar(&stalledDiskWriteDir, "disk-write-dir", stalledDiskWriteDir, "if provided, tablet will attempt to write a file to this directory to check if the disk is stalled") + fs.DurationVar(&stalledDiskWriteTimeout, "disk-write-timeout", stalledDiskWriteTimeout, "if writes exceed this duration, the disk is considered stalled") + fs.DurationVar(&stalledDiskWriteInterval, "disk-write-interval", stalledDiskWriteInterval, "how often to write to the disk to check whether it is stalled") +} + type DiskHealthMonitor interface { // IsDiskStalled returns true if the disk is stalled or rejecting writes. IsDiskStalled() bool diff --git a/go/vt/vttablet/tabletmanager/disk_health_monitor_test.go b/go/vt/vttablet/tabletserver/disk_health_monitor_test.go similarity index 85% rename from go/vt/vttablet/tabletmanager/disk_health_monitor_test.go rename to go/vt/vttablet/tabletserver/disk_health_monitor_test.go index 68930f3061d..8b47e40ee79 100644 --- a/go/vt/vttablet/tabletmanager/disk_health_monitor_test.go +++ b/go/vt/vttablet/tabletserver/disk_health_monitor_test.go @@ -1,4 +1,20 @@ -package tabletmanager +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tabletserver import ( "context" diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 4512b26f177..0ccd0e42735 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -97,6 +97,7 @@ type stateManager struct { replHealthy bool demotePrimaryStalled bool lameduck bool + diskHealthMonitor DiskHealthMonitor alsoAllow []topodatapb.TabletType reason string transitionErr error @@ -777,7 +778,7 @@ func (sm *stateManager) IsServing() bool { } func (sm *stateManager) isServingLocked() bool { - return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.demotePrimaryStalled && !sm.lameduck + return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.demotePrimaryStalled && !sm.lameduck && !sm.diskHealthMonitor.IsDiskStalled() } func (sm *stateManager) AppendDetails(details []*kv) []*kv { diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index f8059d6edea..99a3e7e681d 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -41,7 +41,9 @@ import ( var testNow = time.Now() func TestStateManagerStateByName(t *testing.T) { - sm := &stateManager{} + sm := &stateManager{ + diskHealthMonitor: newNoopDiskHealthMonitor(), + } sm.replHealthy = true sm.wantState = StateServing @@ -147,6 +149,29 @@ func TestStateManagerUnservePrimary(t *testing.T) { assert.Equal(t, StateNotServing, sm.state) } +type testDiskMonitor struct { + isDiskStalled bool +} + +func (t *testDiskMonitor) IsDiskStalled() bool { + return t.isDiskStalled +} + +// TestIsServingLocked tests isServingLocked() functionality. +func TestIsServingLocked(t *testing.T) { + sm := newTestStateManager() + defer sm.StopService() + tdm := &testDiskMonitor{isDiskStalled: false} + sm.diskHealthMonitor = tdm + + err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "") + require.NoError(t, err) + require.True(t, sm.isServingLocked()) + + tdm.isDiskStalled = true + require.False(t, sm.isServingLocked()) +} + func TestStateManagerUnserveNonPrimary(t *testing.T) { sm := newTestStateManager() defer sm.StopService() @@ -778,23 +803,24 @@ func newTestStateManager() *stateManager { parser := sqlparser.NewTestParser() env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "StateManagerTest") sm := &stateManager{ - statelessql: NewQueryList("stateless", parser), - statefulql: NewQueryList("stateful", parser), - olapql: NewQueryList("olap", parser), - hs: newHealthStreamer(env, &topodatapb.TabletAlias{}, schema.NewEngine(env)), - se: &testSchemaEngine{}, - rt: &testReplTracker{lag: 1 * time.Second}, - vstreamer: &testSubcomponent{}, - tracker: &testSubcomponent{}, - watcher: &testSubcomponent{}, - qe: &testQueryEngine{}, - txThrottler: &testTxThrottler{}, - te: &testTxEngine{}, - messager: &testSubcomponent{}, - ddle: &testOnlineDDLExecutor{}, - throttler: &testLagThrottler{}, - tableGC: &testTableGC{}, - rw: newRequestsWaiter(), + statelessql: NewQueryList("stateless", parser), + statefulql: NewQueryList("stateful", parser), + olapql: NewQueryList("olap", parser), + hs: newHealthStreamer(env, &topodatapb.TabletAlias{}, schema.NewEngine(env)), + se: &testSchemaEngine{}, + rt: &testReplTracker{lag: 1 * time.Second}, + vstreamer: &testSubcomponent{}, + tracker: &testSubcomponent{}, + watcher: &testSubcomponent{}, + qe: &testQueryEngine{}, + txThrottler: &testTxThrottler{}, + te: &testTxEngine{}, + messager: &testSubcomponent{}, + ddle: &testOnlineDDLExecutor{}, + diskHealthMonitor: newNoopDiskHealthMonitor(), + throttler: &testLagThrottler{}, + tableGC: &testTableGC{}, + rw: newRequestsWaiter(), } sm.Init(env, &querypb.Target{}) sm.hs.InitDBConfig(&querypb.Target{}) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 30f73d2d818..b65b3949354 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -190,23 +190,24 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer, tsv.tableGC.RequestChecks, tsv.te.preparedPool.IsEmptyForTable) tsv.sm = &stateManager{ - statelessql: tsv.statelessql, - statefulql: tsv.statefulql, - olapql: tsv.olapql, - hs: tsv.hs, - se: tsv.se, - rt: tsv.rt, - vstreamer: tsv.vstreamer, - tracker: tsv.tracker, - watcher: tsv.watcher, - qe: tsv.qe, - txThrottler: tsv.txThrottler, - te: tsv.te, - messager: tsv.messager, - ddle: tsv.onlineDDLExecutor, - throttler: tsv.lagThrottler, - tableGC: tsv.tableGC, - rw: newRequestsWaiter(), + statelessql: tsv.statelessql, + statefulql: tsv.statefulql, + olapql: tsv.olapql, + hs: tsv.hs, + se: tsv.se, + rt: tsv.rt, + vstreamer: tsv.vstreamer, + tracker: tsv.tracker, + watcher: tsv.watcher, + qe: tsv.qe, + txThrottler: tsv.txThrottler, + te: tsv.te, + messager: tsv.messager, + ddle: tsv.onlineDDLExecutor, + throttler: tsv.lagThrottler, + tableGC: tsv.tableGC, + rw: newRequestsWaiter(), + diskHealthMonitor: newDiskHealthMonitor(ctx), } tsv.exporter.NewGaugeFunc("TabletState", "Tablet server state", func() int64 { return int64(tsv.sm.State()) }) @@ -767,6 +768,11 @@ func (tsv *TabletServer) SetDemotePrimaryStalled() { tsv.BroadcastHealth() } +// IsDiskStalled returns if the disk is stalled or not. +func (tsv *TabletServer) IsDiskStalled() bool { + return tsv.sm.diskHealthMonitor.IsDiskStalled() +} + // CreateTransaction creates the metadata for a 2PC transaction. func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error) { return tsv.execRequest( diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index a5242751454..21b38755302 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -279,6 +279,12 @@ func (tqsc *Controller) SetDemotePrimaryStalled() { tqsc.MethodCalled["SetDemotePrimaryStalled"] = true } +// IsDiskStalled is part of the tabletserver.Controller interface +func (tqsc *Controller) IsDiskStalled() bool { + tqsc.MethodCalled["IsDiskStalled"] = true + return false +} + // EnterLameduck implements tabletserver.Controller. func (tqsc *Controller) EnterLameduck() { tqsc.mu.Lock() diff --git a/proto/replicationdata.proto b/proto/replicationdata.proto index e788fc64bc3..eba4d323ee6 100644 --- a/proto/replicationdata.proto +++ b/proto/replicationdata.proto @@ -105,4 +105,5 @@ message FullStatus { uint32 semi_sync_wait_for_replica_count = 20; bool super_read_only = 21; replicationdata.Configuration replication_configuration = 22; + bool disk_stalled = 23; } diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index 4411c436083..5a8859b90e7 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -48595,6 +48595,9 @@ export namespace replicationdata { /** FullStatus replication_configuration */ replication_configuration?: (replicationdata.IConfiguration|null); + + /** FullStatus disk_stalled */ + disk_stalled?: (boolean|null); } /** Represents a FullStatus. */ @@ -48672,6 +48675,9 @@ export namespace replicationdata { /** FullStatus replication_configuration. */ public replication_configuration?: (replicationdata.IConfiguration|null); + /** FullStatus disk_stalled. */ + public disk_stalled: boolean; + /** * Creates a new FullStatus instance using the specified properties. * @param [properties] Properties to set diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index 457ae6e4214..cafe51c0058 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -118108,6 +118108,7 @@ export const replicationdata = $root.replicationdata = (() => { * @property {number|null} [semi_sync_wait_for_replica_count] FullStatus semi_sync_wait_for_replica_count * @property {boolean|null} [super_read_only] FullStatus super_read_only * @property {replicationdata.IConfiguration|null} [replication_configuration] FullStatus replication_configuration + * @property {boolean|null} [disk_stalled] FullStatus disk_stalled */ /** @@ -118301,6 +118302,14 @@ export const replicationdata = $root.replicationdata = (() => { */ FullStatus.prototype.replication_configuration = null; + /** + * FullStatus disk_stalled. + * @member {boolean} disk_stalled + * @memberof replicationdata.FullStatus + * @instance + */ + FullStatus.prototype.disk_stalled = false; + /** * Creates a new FullStatus instance using the specified properties. * @function create @@ -118369,6 +118378,8 @@ export const replicationdata = $root.replicationdata = (() => { writer.uint32(/* id 21, wireType 0 =*/168).bool(message.super_read_only); if (message.replication_configuration != null && Object.hasOwnProperty.call(message, "replication_configuration")) $root.replicationdata.Configuration.encode(message.replication_configuration, writer.uint32(/* id 22, wireType 2 =*/178).fork()).ldelim(); + if (message.disk_stalled != null && Object.hasOwnProperty.call(message, "disk_stalled")) + writer.uint32(/* id 23, wireType 0 =*/184).bool(message.disk_stalled); return writer; }; @@ -118491,6 +118502,10 @@ export const replicationdata = $root.replicationdata = (() => { message.replication_configuration = $root.replicationdata.Configuration.decode(reader, reader.uint32()); break; } + case 23: { + message.disk_stalled = reader.bool(); + break; + } default: reader.skipType(tag & 7); break; @@ -118598,6 +118613,9 @@ export const replicationdata = $root.replicationdata = (() => { if (error) return "replication_configuration." + error; } + if (message.disk_stalled != null && message.hasOwnProperty("disk_stalled")) + if (typeof message.disk_stalled !== "boolean") + return "disk_stalled: boolean expected"; return null; }; @@ -118673,6 +118691,8 @@ export const replicationdata = $root.replicationdata = (() => { throw TypeError(".replicationdata.FullStatus.replication_configuration: object expected"); message.replication_configuration = $root.replicationdata.Configuration.fromObject(object.replication_configuration); } + if (object.disk_stalled != null) + message.disk_stalled = Boolean(object.disk_stalled); return message; }; @@ -118716,6 +118736,7 @@ export const replicationdata = $root.replicationdata = (() => { object.semi_sync_wait_for_replica_count = 0; object.super_read_only = false; object.replication_configuration = null; + object.disk_stalled = false; } if (message.server_id != null && message.hasOwnProperty("server_id")) object.server_id = message.server_id; @@ -118764,6 +118785,8 @@ export const replicationdata = $root.replicationdata = (() => { object.super_read_only = message.super_read_only; if (message.replication_configuration != null && message.hasOwnProperty("replication_configuration")) object.replication_configuration = $root.replicationdata.Configuration.toObject(message.replication_configuration, options); + if (message.disk_stalled != null && message.hasOwnProperty("disk_stalled")) + object.disk_stalled = message.disk_stalled; return object; };