From eaaa2063ef870391bf17dbf3599978c4eba20868 Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Fri, 17 Jan 2025 08:22:39 +0530 Subject: [PATCH] Add StalledDiskPrimary analysis and recovery to vtorc (#17470) Signed-off-by: Manan Gupta --- changelog/22.0/22.0.0/summary.md | 6 + go/flags/endtoend/vtcombo.txt | 3 + go/flags/endtoend/vtorc.txt | 1 + go/flags/endtoend/vttablet.txt | 3 + go/vt/vtorc/config/config.go | 16 +++ go/vt/vtorc/db/generate_base.go | 1 + go/vt/vtorc/inst/analysis.go | 2 + go/vt/vtorc/inst/analysis_dao.go | 10 +- go/vt/vtorc/inst/analysis_dao_test.go | 31 ++++- go/vt/vtorc/inst/instance.go | 1 + go/vt/vtorc/inst/instance_dao.go | 17 ++- go/vt/vtorc/inst/instance_dao_test.go | 42 +++--- go/vt/vtorc/logic/topology_recovery.go | 2 +- go/vt/vtorc/logic/topology_recovery_test.go | 15 ++ go/vt/vtorc/test/recovery_analysis.go | 2 + .../tabletmanager/disk_health_monitor.go | 131 ++++++++++++++++++ .../tabletmanager/disk_health_monitor_test.go | 103 ++++++++++++++ .../vttablet/tabletmanager/rpc_replication.go | 8 ++ go/vt/vttablet/tabletmanager/tm_init.go | 12 +- 19 files changed, 377 insertions(+), 29 deletions(-) create mode 100644 go/vt/vttablet/tabletmanager/disk_health_monitor.go create mode 100644 go/vt/vttablet/tabletmanager/disk_health_monitor_test.go diff --git a/changelog/22.0/22.0.0/summary.md b/changelog/22.0/22.0.0/summary.md index e63ffcc3547..2fb66ea8969 100644 --- a/changelog/22.0/22.0.0/summary.md +++ b/changelog/22.0/22.0.0/summary.md @@ -12,6 +12,7 @@ - **[Support for More Efficient JSON Replication](#efficient-json-replication)** - **[Support for LAST_INSERT_ID(x)](#last-insert-id)** - **[Support for Maximum Idle Connections in the Pool](#max-idle-connections)** + - **[Stalled Disk Recovery in VTOrc](#stall-disk-recovery)** - **[Minor Changes](#minor-changes)** - **[VTTablet Flags](#flags-vttablet)** - **[Topology read concurrency behaviour changes](#topo-read-concurrency-changes)** @@ -100,6 +101,11 @@ You can control idle connection retention for the query server’s query pool, s This feature ensures that, during traffic spikes, idle connections are available for faster responses, while minimizing overhead in low-traffic periods by limiting the number of idle connections retained. It helps strike a balance between performance, efficiency, and cost. +### Stalled Disk Recovery in VTOrc +VTOrc can now identify and recover from stalled disk errors. VTTablets test whether the disk is writable and they send this information in the full status output to VTOrc. If the disk is not writable on the primary tablet, VTOrc will attempt to recover the cluster by promoting a new primary. This is useful in scenarios where the disk is stalled and the primary vttablet is unable to accept writes because of it. + +To opt into this feature, `--enable-primary-disk-stalled-recovery` flag has to be specified on VTOrc, and `--disk-write-dir` flag has to be specified on the vttablets. `--disk-write-interval` and `--disk-write-timeout` flags can be used to configure the polling interval and timeout respectively. + ## Minor Changes #### VTTablet Flags diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 052c19ecaae..76c8e894347 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -102,6 +102,9 @@ Flags: --ddl_strategy string Set default strategy for DDL statements. Override with @@ddl_strategy session variable (default "direct") --default_tablet_type topodatapb.TabletType The default tablet type to set for queries, when one is not explicitly selected. (default PRIMARY) --degraded_threshold duration replication lag after which a replica is considered degraded (default 30s) + --disk-write-dir string if provided, tablet will attempt to write a file to this directory to check if the disk is stalled + --disk-write-interval duration how often to write to the disk to check whether it is stalled (default 5s) + --disk-write-timeout duration if writes exceed this duration, the disk is considered stalled (default 30s) --emit_stats If set, emit stats to push-based monitoring and stats backends --enable-consolidator Synonym to -enable_consolidator (default true) --enable-consolidator-replicas Synonym to -enable_consolidator_replicas diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index c2799a72dc1..ca8083709e5 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -33,6 +33,7 @@ Flags: --config-type string Config file type (omit to infer config type from file extension). --consul_auth_static_file string JSON File to read the topos/tokens from. --emit_stats If set, emit stats to push-based monitoring and stats backends + --enable-primary-disk-stalled-recovery Whether VTOrc should detect a stalled disk on the primary and failover --grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024) --grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server. --grpc_compression string Which protocol to use for compressing gRPC. Default: nothing. Supported: snappy diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index e2b0c30db7f..955823f7322 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -133,6 +133,9 @@ Flags: --dba_idle_timeout duration Idle timeout for dba connections (default 1m0s) --dba_pool_size int Size of the connection pool for dba connections (default 20) --degraded_threshold duration replication lag after which a replica is considered degraded (default 30s) + --disk-write-dir string if provided, tablet will attempt to write a file to this directory to check if the disk is stalled + --disk-write-interval duration how often to write to the disk to check whether it is stalled (default 5s) + --disk-write-timeout duration if writes exceed this duration, the disk is considered stalled (default 30s) --emit_stats If set, emit stats to push-based monitoring and stats backends --enable-consolidator Synonym to -enable_consolidator (default true) --enable-consolidator-replicas Synonym to -enable_consolidator_replicas diff --git a/go/vt/vtorc/config/config.go b/go/vt/vtorc/config/config.go index cafff5acce8..db367673aeb 100644 --- a/go/vt/vtorc/config/config.go +++ b/go/vt/vtorc/config/config.go @@ -174,6 +174,15 @@ var ( Dynamic: true, }, ) + + enablePrimaryDiskStalledRecovery = viperutil.Configure( + "enable-primary-disk-stalled-recovery", + viperutil.Options[bool]{ + FlagName: "enable-primary-disk-stalled-recovery", + Default: false, + Dynamic: true, + }, + ) ) func init() { @@ -197,6 +206,7 @@ func registerFlags(fs *pflag.FlagSet) { fs.Duration("recovery-poll-duration", recoveryPollDuration.Default(), "Timer duration on which VTOrc polls its database to run a recovery") fs.Bool("allow-emergency-reparent", ersEnabled.Default(), "Whether VTOrc should be allowed to run emergency reparent operation when it detects a dead primary") fs.Bool("change-tablets-with-errant-gtid-to-drained", convertTabletsWithErrantGTIDs.Default(), "Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED") + fs.Bool("enable-primary-disk-stalled-recovery", enablePrimaryDiskStalledRecovery.Default(), "Whether VTOrc should detect a stalled disk on the primary and failover") viperutil.BindFlags(fs, instancePollTime, @@ -214,6 +224,7 @@ func registerFlags(fs *pflag.FlagSet) { recoveryPollDuration, ersEnabled, convertTabletsWithErrantGTIDs, + enablePrimaryDiskStalledRecovery, ) } @@ -332,6 +343,11 @@ func SetConvertTabletWithErrantGTIDs(val bool) { convertTabletsWithErrantGTIDs.Set(val) } +// GetStalledDiskPrimaryRecovery reports whether VTOrc is allowed to check for and recovery stalled disk problems. +func GetStalledDiskPrimaryRecovery() bool { + return enablePrimaryDiskStalledRecovery.Get() +} + // MarkConfigurationLoaded is called once configuration has first been loaded. // Listeners on ConfigurationLoaded will get a notification func MarkConfigurationLoaded() { diff --git a/go/vt/vtorc/db/generate_base.go b/go/vt/vtorc/db/generate_base.go index 21375fb8eb3..8baa9a12476 100644 --- a/go/vt/vtorc/db/generate_base.go +++ b/go/vt/vtorc/db/generate_base.go @@ -105,6 +105,7 @@ CREATE TABLE database_instance ( semi_sync_primary_status TINYint NOT NULL DEFAULT 0, semi_sync_replica_status TINYint NOT NULL DEFAULT 0, semi_sync_primary_clients int NOT NULL DEFAULT 0, + is_disk_stalled TINYint NOT NULL DEFAULT 0, PRIMARY KEY (alias) )`, ` diff --git a/go/vt/vtorc/inst/analysis.go b/go/vt/vtorc/inst/analysis.go index fa2e1a4ec95..6a800e5ee0b 100644 --- a/go/vt/vtorc/inst/analysis.go +++ b/go/vt/vtorc/inst/analysis.go @@ -56,6 +56,7 @@ const ( LockedSemiSyncPrimaryHypothesis AnalysisCode = "LockedSemiSyncPrimaryHypothesis" LockedSemiSyncPrimary AnalysisCode = "LockedSemiSyncPrimary" ErrantGTIDDetected AnalysisCode = "ErrantGTIDDetected" + PrimaryDiskStalled AnalysisCode = "PrimaryDiskStalled" ) type StructureAnalysisCode string @@ -129,6 +130,7 @@ type ReplicationAnalysis struct { MaxReplicaGTIDMode string MaxReplicaGTIDErrant string IsReadOnly bool + IsDiskStalled bool } func (replicationAnalysis *ReplicationAnalysis) MarshalJSON() ([]byte, error) { diff --git a/go/vt/vtorc/inst/analysis_dao.go b/go/vt/vtorc/inst/analysis_dao.go index 7837955c541..d487973b0f0 100644 --- a/go/vt/vtorc/inst/analysis_dao.go +++ b/go/vt/vtorc/inst/analysis_dao.go @@ -79,7 +79,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna vitess_keyspace.durability_policy AS durability_policy, vitess_shard.primary_timestamp AS shard_primary_term_timestamp, primary_instance.read_only AS read_only, - MIN(primary_instance.gtid_errant) AS gtid_errant, + MIN(primary_instance.gtid_errant) AS gtid_errant, MIN(primary_instance.alias) IS NULL AS is_invalid, MIN(primary_instance.binary_log_file) AS binary_log_file, MIN(primary_instance.binary_log_pos) AS binary_log_pos, @@ -233,7 +233,8 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna COUNT( DISTINCT case when replica_instance.log_bin AND replica_instance.log_replica_updates then replica_instance.major_version else NULL end - ) AS count_distinct_logging_major_versions + ) AS count_distinct_logging_major_versions, + primary_instance.is_disk_stalled != 0 AS is_disk_stalled FROM vitess_tablet JOIN vitess_keyspace ON ( @@ -354,6 +355,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna a.HeartbeatInterval = m.GetFloat64("heartbeat_interval") a.IsReadOnly = m.GetUint("read_only") == 1 + a.IsDiskStalled = m.GetBool("is_disk_stalled") if !a.LastCheckValid { analysisMessage := fmt.Sprintf("analysis: Alias: %+v, Keyspace: %+v, Shard: %+v, IsPrimary: %+v, LastCheckValid: %+v, LastCheckPartialSuccess: %+v, CountReplicas: %+v, CountValidReplicas: %+v, CountValidReplicatingReplicas: %+v, CountLaggingReplicas: %+v, CountDelayedReplicas: %+v", @@ -401,6 +403,10 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna } else if isInvalid { a.Analysis = InvalidReplica a.Description = "VTOrc hasn't been able to reach the replica even once since restart/shutdown" + } else if a.IsClusterPrimary && !a.LastCheckValid && a.IsDiskStalled { + a.Analysis = PrimaryDiskStalled + a.Description = "Primary has a stalled disk" + ca.hasClusterwideAction = true } else if a.IsClusterPrimary && !a.LastCheckValid && a.CountReplicas == 0 { a.Analysis = DeadPrimaryWithoutReplicas a.Description = "Primary cannot be reached by vtorc and has no replica" diff --git a/go/vt/vtorc/inst/analysis_dao_test.go b/go/vt/vtorc/inst/analysis_dao_test.go index ae4f7279403..baa1121b776 100644 --- a/go/vt/vtorc/inst/analysis_dao_test.go +++ b/go/vt/vtorc/inst/analysis_dao_test.go @@ -34,10 +34,10 @@ var ( // The initialSQL is a set of insert commands copied from a dump of an actual running VTOrc instances. The relevant insert commands are here. // This is a dump taken from a test running 4 tablets, zone1-101 is the primary, zone1-100 is a replica, zone1-112 is a rdonly and zone2-200 is a cross-cell replica. initialSQL = []string{ - `INSERT INTO database_instance VALUES('zone1-0000000112','localhost',6747,'2022-12-28 07:26:04','2022-12-28 07:26:04',213696377,'8.0.31','ROW',1,1,'vt-0000000112-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000112-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-9240-92a06c3be3c2','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10816929,0,0,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-9240-92a06c3be3c2',1,1,'',1000000000000000000,1,0,0,0);`, - `INSERT INTO database_instance VALUES('zone1-0000000100','localhost',6711,'2022-12-28 07:26:04','2022-12-28 07:26:04',1094500338,'8.0.31','ROW',1,1,'vt-0000000100-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000100-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-acf8-d6b0ef9f4eaa','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10103920,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-acf8-d6b0ef9f4eaa',1,1,'',1000000000000000000,1,0,1,0);`, - `INSERT INTO database_instance VALUES('zone1-0000000101','localhost',6714,'2022-12-28 07:26:04','2022-12-28 07:26:04',390954723,'8.0.31','ROW',1,1,'vt-0000000101-bin.000001',15583,'',0,0,0,0,0,'',0,'',0,NULL,NULL,0,'','',0,'',0,0,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a4cc4-8680-11ed-a104-47706090afbd','2022-12-28 07:26:04','',0,0,0,'Homebrew','8.0','FULL',11366095,1,1,'ON',1,'','','729a4cc4-8680-11ed-a104-47706090afbd',-1,-1,'',1000000000000000000,1,1,0,2);`, - `INSERT INTO database_instance VALUES('zone2-0000000200','localhost',6756,'2022-12-28 07:26:05','2022-12-28 07:26:05',444286571,'8.0.31','ROW',1,1,'vt-0000000200-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000200-relay-bin.000002',15815,1,0,'zone2','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a497c-8680-11ed-8ad4-3f51d747db75','2022-12-28 07:26:05','',1,0,0,'Homebrew','8.0','FULL',10443112,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a497c-8680-11ed-8ad4-3f51d747db75',1,1,'',1000000000000000000,1,0,1,0);`, + `INSERT INTO database_instance VALUES('zone1-0000000112','localhost',6747,'2022-12-28 07:26:04','2022-12-28 07:26:04',213696377,'8.0.31','ROW',1,1,'vt-0000000112-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000112-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-9240-92a06c3be3c2','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10816929,0,0,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-9240-92a06c3be3c2',1,1,'',1000000000000000000,1,0,0,0,false);`, + `INSERT INTO database_instance VALUES('zone1-0000000100','localhost',6711,'2022-12-28 07:26:04','2022-12-28 07:26:04',1094500338,'8.0.31','ROW',1,1,'vt-0000000100-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000100-relay-bin.000002',15815,1,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a5138-8680-11ed-acf8-d6b0ef9f4eaa','2022-12-28 07:26:04','',1,0,0,'Homebrew','8.0','FULL',10103920,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a5138-8680-11ed-acf8-d6b0ef9f4eaa',1,1,'',1000000000000000000,1,0,1,0,false);`, + `INSERT INTO database_instance VALUES('zone1-0000000101','localhost',6714,'2022-12-28 07:26:04','2022-12-28 07:26:04',390954723,'8.0.31','ROW',1,1,'vt-0000000101-bin.000001',15583,'',0,0,0,0,0,'',0,'',0,NULL,NULL,0,'','',0,'',0,0,0,'zone1','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a4cc4-8680-11ed-a104-47706090afbd','2022-12-28 07:26:04','',0,0,0,'Homebrew','8.0','FULL',11366095,1,1,'ON',1,'','','729a4cc4-8680-11ed-a104-47706090afbd',-1,-1,'',1000000000000000000,1,1,0,2,false);`, + `INSERT INTO database_instance VALUES('zone2-0000000200','localhost',6756,'2022-12-28 07:26:05','2022-12-28 07:26:05',444286571,'8.0.31','ROW',1,1,'vt-0000000200-bin.000001',15963,'localhost',6714,8,4.0,1,1,'vt-0000000101-bin.000001',15583,'vt-0000000101-bin.000001',15583,0,0,1,'','',1,'vt-0000000200-relay-bin.000002',15815,1,0,'zone2','',0,0,0,1,'729a4cc4-8680-11ed-a104-47706090afbd:1-54','729a497c-8680-11ed-8ad4-3f51d747db75','2022-12-28 07:26:05','',1,0,0,'Homebrew','8.0','FULL',10443112,0,1,'ON',1,'729a4cc4-8680-11ed-a104-47706090afbd','','729a4cc4-8680-11ed-a104-47706090afbd,729a497c-8680-11ed-8ad4-3f51d747db75',1,1,'',1000000000000000000,1,0,1,0,false);`, `INSERT INTO vitess_tablet VALUES('zone1-0000000100','localhost',6711,'ks','0','zone1',2,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130307d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731307d20706f72745f6d61703a7b6b65793a227674222076616c75653a363730397d206b657973706163653a226b73222073686172643a22302220747970653a5245504c494341206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363731312064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, `INSERT INTO vitess_tablet VALUES('zone1-0000000101','localhost',6714,'ks','0','zone1',1,'2022-12-28 07:23:25.129898+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3130317d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363731337d20706f72745f6d61703a7b6b65793a227674222076616c75653a363731327d206b657973706163653a226b73222073686172643a22302220747970653a5052494d415259206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a36373134207072696d6172795f7465726d5f73746172745f74696d653a7b7365636f6e64733a31363732323132323035206e616e6f7365636f6e64733a3132393839383030307d2064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, `INSERT INTO vitess_tablet VALUES('zone1-0000000112','localhost',6747,'ks','0','zone1',3,'0001-01-01 00:00:00+00:00',X'616c6961733a7b63656c6c3a227a6f6e653122207569643a3131327d20686f73746e616d653a226c6f63616c686f73742220706f72745f6d61703a7b6b65793a2267727063222076616c75653a363734367d20706f72745f6d61703a7b6b65793a227674222076616c75653a363734357d206b657973706163653a226b73222073686172643a22302220747970653a52444f4e4c59206d7973716c5f686f73746e616d653a226c6f63616c686f737422206d7973716c5f706f72743a363734372064625f7365727665725f76657273696f6e3a22382e302e3331222064656661756c745f636f6e6e5f636f6c6c6174696f6e3a3435');`, @@ -96,6 +96,29 @@ func TestGetReplicationAnalysisDecision(t *testing.T) { keyspaceWanted: "ks", shardWanted: "0", codeWanted: PrimaryTabletDeleted, + }, { + name: "StalledDiskPrimary", + info: []*test.InfoForRecoveryAnalysis{{ + TabletInfo: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{Cell: "zon1", Uid: 100}, + Hostname: "localhost", + Keyspace: "ks", + Shard: "0", + Type: topodatapb.TabletType_PRIMARY, + MysqlHostname: "localhost", + MysqlPort: 6709, + }, + DurabilityPolicy: "none", + LastCheckValid: 0, + CountReplicas: 4, + CountValidReplicas: 4, + CountValidReplicatingReplicas: 0, + IsPrimary: 1, + IsStalledDisk: 1, + }}, + keyspaceWanted: "ks", + shardWanted: "0", + codeWanted: PrimaryDiskStalled, }, { name: "DeadPrimary", info: []*test.InfoForRecoveryAnalysis{{ diff --git a/go/vt/vtorc/inst/instance.go b/go/vt/vtorc/inst/instance.go index fef1e90acce..b7b097bb14d 100644 --- a/go/vt/vtorc/inst/instance.go +++ b/go/vt/vtorc/inst/instance.go @@ -91,6 +91,7 @@ type Instance struct { IsUpToDate bool IsRecentlyChecked bool SecondsSinceLastSeen sql.NullInt64 + StalledDisk bool AllowTLS bool diff --git a/go/vt/vtorc/inst/instance_dao.go b/go/vt/vtorc/inst/instance_dao.go index 9198514d6ed..f92a15079dd 100644 --- a/go/vt/vtorc/inst/instance_dao.go +++ b/go/vt/vtorc/inst/instance_dao.go @@ -175,6 +175,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named var tablet *topodatapb.Tablet var fs *replicationdatapb.FullStatus readingStartTime := time.Now() + stalledDisk := false instance := NewInstance() instanceFound := false partialSuccess := false @@ -205,6 +206,9 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named fs, err = fullStatus(tabletAlias) if err != nil { + if config.GetStalledDiskPrimaryRecovery() && strings.Contains(err.Error(), "stalled disk") { + stalledDisk = true + } goto Cleanup } partialSuccess = true // We at least managed to read something from the server. @@ -381,9 +385,10 @@ Cleanup: // Something is wrong, could be network-wise. Record that we // tried to check the instance. last_attempted_check is also - // updated on success by writeInstance. + // updated on success by writeInstance. If the reason is a + // stalled disk, we can record that as well. latency.Start("backend") - _ = UpdateInstanceLastChecked(tabletAlias, partialSuccess) + _ = UpdateInstanceLastChecked(tabletAlias, partialSuccess, stalledDisk) latency.Stop("backend") return nil, err } @@ -874,6 +879,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool, "semi_sync_primary_clients", "semi_sync_replica_status", "last_discovery_latency", + "is_disk_stalled", } values := make([]string, len(columns)) @@ -953,6 +959,7 @@ func mkInsertForInstances(instances []*Instance, instanceWasActuallyFound bool, args = append(args, instance.SemiSyncPrimaryClients) args = append(args, instance.SemiSyncReplicaStatus) args = append(args, instance.LastDiscoveryLatency.Nanoseconds()) + args = append(args, instance.StalledDisk) } sql, err := mkInsert("database_instance", columns, values, len(instances), insertIgnore) @@ -998,16 +1005,18 @@ func WriteInstance(instance *Instance, instanceWasActuallyFound bool, lastError // UpdateInstanceLastChecked updates the last_check timestamp in the vtorc backed database // for a given instance -func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool) error { +func UpdateInstanceLastChecked(tabletAlias string, partialSuccess bool, stalledDisk bool) error { writeFunc := func() error { _, err := db.ExecVTOrc(`UPDATE database_instance SET last_checked = DATETIME('now'), - last_check_partial_success = ? + last_check_partial_success = ?, + is_disk_stalled = ? WHERE alias = ? `, partialSuccess, + stalledDisk, tabletAlias, ) if err != nil { diff --git a/go/vt/vtorc/inst/instance_dao_test.go b/go/vt/vtorc/inst/instance_dao_test.go index 1a14041450c..c3b99455741 100644 --- a/go/vt/vtorc/inst/instance_dao_test.go +++ b/go/vt/vtorc/inst/instance_dao_test.go @@ -64,13 +64,13 @@ func TestMkInsertSingle(t *testing.T) { version, major_version, version_comment, binlog_server, read_only, binlog_format, binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval, replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, - source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen) + source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, is_disk_stalled, last_seen) VALUES - (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')) + (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')) ` a1 := `zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, - false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0,` + false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false,` sql1, args1, err := mkInsertForInstances(instances[:1], false, true) require.NoError(t, err) @@ -87,16 +87,16 @@ func TestMkInsertThree(t *testing.T) { version, major_version, version_comment, binlog_server, read_only, binlog_format, binlog_row_image, log_bin, log_replica_updates, binary_log_file, binary_log_pos, source_host, source_port, replica_net_timeout, heartbeat_interval, replica_sql_running, replica_io_running, replication_sql_thread_state, replication_io_thread_state, has_replication_filters, supports_oracle_gtid, oracle_gtid, source_uuid, ancestry_uuid, executed_gtid_set, gtid_mode, gtid_purged, gtid_errant, - source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, last_seen) + source_log_file, read_source_log_pos, relay_source_log_file, exec_source_log_pos, relay_log_file, relay_log_pos, last_sql_error, last_io_error, replication_lag_seconds, replica_lag_seconds, sql_delay, data_center, region, physical_environment, replication_depth, is_co_primary, has_replication_credentials, allow_tls, semi_sync_enforced, semi_sync_primary_enabled, semi_sync_primary_timeout, semi_sync_primary_wait_for_replica_count, semi_sync_replica_enabled, semi_sync_primary_status, semi_sync_primary_clients, semi_sync_replica_status, last_discovery_latency, is_disk_stalled, last_seen) VALUES - (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')), - (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')), - (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')) + (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')), + (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')), + (?, ?, ?, DATETIME('now'), DATETIME('now'), 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, DATETIME('now')) ` a3 := ` - zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, - zone1-i720, i720, 3306, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, - zone1-i730, i730, 3306, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, + zone1-i710, i710, 3306, 710, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 10, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false, + zone1-i720, i720, 3306, 720, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 20, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false, + zone1-i730, i730, 3306, 730, , 5.6.7, 5.6, MySQL, false, false, STATEMENT, FULL, false, false, , 0, , 0, 0, 0, false, false, 0, 0, false, false, false, , , , , , , , 0, mysql.000007, 30, , 0, , , {0 false}, {0 false}, 0, , , , 0, false, false, false, false, false, 0, 0, false, false, 0, false, 0, false, ` sql3, args3, err := mkInsertForInstances(instances[:3], true, true) @@ -483,9 +483,9 @@ func TestReadOutdatedInstanceKeys(t *testing.T) { tabletAliases, err := ReadOutdatedInstanceKeys() - errInDataCollection := db.QueryVTOrcRowsMap(`select alias, -last_checked, -last_attempted_check, + errInDataCollection := db.QueryVTOrcRowsMap(`select alias, +last_checked, +last_attempted_check, ROUND((JULIANDAY(DATETIME('now')) - JULIANDAY(last_checked)) * 86400) AS difference, last_attempted_check <= last_checked as use1, last_checked < DATETIME('now', '-1500 second') as is_outdated1, @@ -507,22 +507,32 @@ func TestUpdateInstanceLastChecked(t *testing.T) { name string tabletAlias string partialSuccess bool + stalledDisk bool conditionToCheck string }{ { name: "Verify updated last checked", tabletAlias: "zone1-0000000100", partialSuccess: false, - conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = false", + stalledDisk: false, + conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = false and is_disk_stalled = false", }, { name: "Verify partial success", tabletAlias: "zone1-0000000100", partialSuccess: true, - conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = true", + stalledDisk: false, + conditionToCheck: "last_checked >= datetime('now', '-30 second') and last_check_partial_success = true and is_disk_stalled = false", + }, { + name: "Verify stalled disk", + tabletAlias: "zone1-0000000100", + partialSuccess: false, + stalledDisk: true, + conditionToCheck: "last_checked >= DATETIME('now', '-30 second') and last_check_partial_success = false and is_disk_stalled = true", }, { name: "Verify no error on unknown tablet", tabletAlias: "unknown tablet", partialSuccess: true, + stalledDisk: true, }, } @@ -537,7 +547,7 @@ func TestUpdateInstanceLastChecked(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := UpdateInstanceLastChecked(tt.tabletAlias, tt.partialSuccess) + err := UpdateInstanceLastChecked(tt.tabletAlias, tt.partialSuccess, tt.stalledDisk) require.NoError(t, err) if tt.conditionToCheck != "" { diff --git a/go/vt/vtorc/logic/topology_recovery.go b/go/vt/vtorc/logic/topology_recovery.go index 0d0bbff5b53..ab41d1fa988 100644 --- a/go/vt/vtorc/logic/topology_recovery.go +++ b/go/vt/vtorc/logic/topology_recovery.go @@ -285,7 +285,7 @@ func checkAndRecoverGenericProblem(ctx context.Context, analysisEntry *inst.Repl func getCheckAndRecoverFunctionCode(analysisCode inst.AnalysisCode, tabletAlias string) recoveryFunction { switch analysisCode { // primary - case inst.DeadPrimary, inst.DeadPrimaryAndSomeReplicas: + case inst.DeadPrimary, inst.DeadPrimaryAndSomeReplicas, inst.PrimaryDiskStalled: // If ERS is disabled, we have no way of repairing the cluster. if !config.ERSEnabled() { log.Infof("VTOrc not configured to run ERS, skipping recovering %v", analysisCode) diff --git a/go/vt/vtorc/logic/topology_recovery_test.go b/go/vt/vtorc/logic/topology_recovery_test.go index f7658060b95..ca164d78836 100644 --- a/go/vt/vtorc/logic/topology_recovery_test.go +++ b/go/vt/vtorc/logic/topology_recovery_test.go @@ -42,6 +42,11 @@ func TestAnalysisEntriesHaveSameRecovery(t *testing.T) { prevAnalysisCode: inst.DeadPrimary, newAnalysisCode: inst.DeadPrimaryAndSomeReplicas, shouldBeEqual: true, + }, { + // DeadPrimary and StalledDiskPrimary have the same recovery + prevAnalysisCode: inst.DeadPrimary, + newAnalysisCode: inst.PrimaryDiskStalled, + shouldBeEqual: true, }, { // DeadPrimary and PrimaryTabletDeleted are different recoveries. prevAnalysisCode: inst.DeadPrimary, @@ -215,6 +220,16 @@ func TestGetCheckAndRecoverFunctionCode(t *testing.T) { ersEnabled: false, analysisCode: inst.DeadPrimary, wantRecoveryFunction: noRecoveryFunc, + }, { + name: "StalledDiskPrimary with ERS enabled", + ersEnabled: true, + analysisCode: inst.PrimaryDiskStalled, + wantRecoveryFunction: recoverDeadPrimaryFunc, + }, { + name: "StalledDiskPrimary with ERS disabled", + ersEnabled: false, + analysisCode: inst.PrimaryDiskStalled, + wantRecoveryFunction: noRecoveryFunc, }, { name: "PrimaryTabletDeleted with ERS enabled", ersEnabled: true, diff --git a/go/vt/vtorc/test/recovery_analysis.go b/go/vt/vtorc/test/recovery_analysis.go index 218a679bdb0..bb6e4132243 100644 --- a/go/vt/vtorc/test/recovery_analysis.go +++ b/go/vt/vtorc/test/recovery_analysis.go @@ -80,6 +80,7 @@ type InfoForRecoveryAnalysis struct { MaxReplicaGTIDMode string MaxReplicaGTIDErrant string ReadOnly uint + IsStalledDisk uint } func (info *InfoForRecoveryAnalysis) ConvertToRowMap() sqlutils.RowMap { @@ -145,6 +146,7 @@ func (info *InfoForRecoveryAnalysis) ConvertToRowMap() sqlutils.RowMap { rowMap["semi_sync_replica_enabled"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.SemiSyncReplicaEnabled), Valid: true} res, _ := prototext.Marshal(info.TabletInfo) rowMap["tablet_info"] = sqlutils.CellData{String: string(res), Valid: true} + rowMap["is_disk_stalled"] = sqlutils.CellData{String: fmt.Sprintf("%v", info.IsStalledDisk), Valid: true} return rowMap } diff --git a/go/vt/vttablet/tabletmanager/disk_health_monitor.go b/go/vt/vttablet/tabletmanager/disk_health_monitor.go new file mode 100644 index 00000000000..e35bc662a12 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/disk_health_monitor.go @@ -0,0 +1,131 @@ +package tabletmanager + +import ( + "context" + "os" + "path" + "strconv" + "sync" + "time" +) + +type DiskHealthMonitor interface { + // IsDiskStalled returns true if the disk is stalled or rejecting writes. + IsDiskStalled() bool +} + +func newDiskHealthMonitor(ctx context.Context) DiskHealthMonitor { + if stalledDiskWriteDir == "" { + return newNoopDiskHealthMonitor() + } + + return newPollingDiskHealthMonitor(ctx, attemptFileWrite, stalledDiskWriteInterval, stalledDiskWriteTimeout) +} + +type writeFunction func() error + +func attemptFileWrite() error { + file, err := os.Create(path.Join(stalledDiskWriteDir, ".stalled_disk_check")) + if err != nil { + return err + } + _, err = file.WriteString(strconv.FormatInt(time.Now().UnixNano(), 10)) + if err != nil { + return err + } + err = file.Sync() + if err != nil { + return err + } + return file.Close() +} + +type pollingDiskHealthMonitor struct { + stalledMutex sync.RWMutex + stalled bool + writeInProgressMutex sync.RWMutex + writeInProgress bool + writeFunc writeFunction + pollingInterval time.Duration + writeTimeout time.Duration +} + +var _ DiskHealthMonitor = &pollingDiskHealthMonitor{} + +func newPollingDiskHealthMonitor(ctx context.Context, writeFunc writeFunction, pollingInterval, writeTimeout time.Duration) *pollingDiskHealthMonitor { + fs := &pollingDiskHealthMonitor{ + stalledMutex: sync.RWMutex{}, + stalled: false, + writeInProgressMutex: sync.RWMutex{}, + writeInProgress: false, + writeFunc: writeFunc, + pollingInterval: pollingInterval, + writeTimeout: writeTimeout, + } + go fs.poll(ctx) + return fs +} + +func (fs *pollingDiskHealthMonitor) poll(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(fs.pollingInterval): + if fs.isWriteInProgress() { + continue + } + + ch := make(chan error, 1) + go func() { + fs.setIsWriteInProgress(true) + err := fs.writeFunc() + fs.setIsWriteInProgress(false) + ch <- err + }() + + select { + case <-time.After(fs.writeTimeout): + fs.setIsDiskStalled(true) + case err := <-ch: + fs.setIsDiskStalled(err != nil) + } + } + } +} + +func (fs *pollingDiskHealthMonitor) IsDiskStalled() bool { + fs.stalledMutex.RLock() + defer fs.stalledMutex.RUnlock() + return fs.stalled +} + +func (fs *pollingDiskHealthMonitor) setIsDiskStalled(isStalled bool) { + fs.stalledMutex.Lock() + defer fs.stalledMutex.Unlock() + fs.stalled = isStalled +} + +func (fs *pollingDiskHealthMonitor) isWriteInProgress() bool { + fs.writeInProgressMutex.RLock() + defer fs.writeInProgressMutex.RUnlock() + return fs.writeInProgress +} + +func (fs *pollingDiskHealthMonitor) setIsWriteInProgress(isInProgress bool) { + fs.writeInProgressMutex.Lock() + defer fs.writeInProgressMutex.Unlock() + fs.writeInProgress = isInProgress +} + +type noopDiskHealthMonitor struct{} + +var _ DiskHealthMonitor = &noopDiskHealthMonitor{} + +func newNoopDiskHealthMonitor() DiskHealthMonitor { + return &noopDiskHealthMonitor{} +} + +func (fs *noopDiskHealthMonitor) IsDiskStalled() bool { + return false +} diff --git a/go/vt/vttablet/tabletmanager/disk_health_monitor_test.go b/go/vt/vttablet/tabletmanager/disk_health_monitor_test.go new file mode 100644 index 00000000000..68930f3061d --- /dev/null +++ b/go/vt/vttablet/tabletmanager/disk_health_monitor_test.go @@ -0,0 +1,103 @@ +package tabletmanager + +import ( + "context" + "errors" + "sync" + "testing" + "time" +) + +func TestDiskHealthMonitor_noStall(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockFileWriter := &sequencedMockWriter{} + diskHealthMonitor := newPollingDiskHealthMonitor(ctx, mockFileWriter.mockWriteFunction, 50*time.Millisecond, 25*time.Millisecond) + + time.Sleep(300 * time.Millisecond) + if totalCreateCalls := mockFileWriter.getTotalCreateCalls(); totalCreateCalls != 5 { + t.Fatalf("expected 5 calls to createFile, got %d", totalCreateCalls) + } + if isStalled := diskHealthMonitor.IsDiskStalled(); isStalled { + t.Fatalf("expected isStalled to be false") + } +} + +func TestDiskHealthMonitor_stallAndRecover(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockFileWriter := &sequencedMockWriter{sequencedWriteFunctions: []writeFunction{delayedWriteFunction(10*time.Millisecond, nil), delayedWriteFunction(300*time.Millisecond, nil)}} + diskHealthMonitor := newPollingDiskHealthMonitor(ctx, mockFileWriter.mockWriteFunction, 50*time.Millisecond, 25*time.Millisecond) + + time.Sleep(300 * time.Millisecond) + if totalCreateCalls := mockFileWriter.getTotalCreateCalls(); totalCreateCalls != 2 { + t.Fatalf("expected 2 calls to createFile, got %d", totalCreateCalls) + } + if isStalled := diskHealthMonitor.IsDiskStalled(); !isStalled { + t.Fatalf("expected isStalled to be true") + } + + time.Sleep(300 * time.Millisecond) + if totalCreateCalls := mockFileWriter.getTotalCreateCalls(); totalCreateCalls < 5 { + t.Fatalf("expected at least 5 calls to createFile, got %d", totalCreateCalls) + } + if isStalled := diskHealthMonitor.IsDiskStalled(); isStalled { + t.Fatalf("expected isStalled to be false") + } +} + +func TestDiskHealthMonitor_stallDetected(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockFileWriter := &sequencedMockWriter{defaultWriteFunction: delayedWriteFunction(10*time.Millisecond, errors.New("test error"))} + diskHealthMonitor := newPollingDiskHealthMonitor(ctx, mockFileWriter.mockWriteFunction, 50*time.Millisecond, 25*time.Millisecond) + + time.Sleep(300 * time.Millisecond) + if totalCreateCalls := mockFileWriter.getTotalCreateCalls(); totalCreateCalls != 5 { + t.Fatalf("expected 5 calls to createFile, got %d", totalCreateCalls) + } + if isStalled := diskHealthMonitor.IsDiskStalled(); !isStalled { + t.Fatalf("expected isStalled to be true") + } +} + +type sequencedMockWriter struct { + defaultWriteFunction writeFunction + sequencedWriteFunctions []writeFunction + + totalCreateCalls int + totalCreateCallsMutex sync.RWMutex +} + +func (smw *sequencedMockWriter) mockWriteFunction() error { + functionIndex := smw.getTotalCreateCalls() + smw.incrementTotalCreateCalls() + + if functionIndex >= len(smw.sequencedWriteFunctions) { + if smw.defaultWriteFunction != nil { + return smw.defaultWriteFunction() + } + return delayedWriteFunction(10*time.Millisecond, nil)() + } + + return smw.sequencedWriteFunctions[functionIndex]() +} + +func (smw *sequencedMockWriter) incrementTotalCreateCalls() { + smw.totalCreateCallsMutex.Lock() + defer smw.totalCreateCallsMutex.Unlock() + smw.totalCreateCalls += 1 +} + +func (smw *sequencedMockWriter) getTotalCreateCalls() int { + smw.totalCreateCallsMutex.RLock() + defer smw.totalCreateCallsMutex.RUnlock() + return smw.totalCreateCalls +} + +func delayedWriteFunction(delay time.Duration, err error) writeFunction { + return func() error { + time.Sleep(delay) + return err + } +} diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 47794e92b9a..b27b25d87c6 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -18,6 +18,7 @@ package tabletmanager import ( "context" + "errors" "fmt" "runtime" "strings" @@ -60,6 +61,13 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful if err := tm.waitForGrantsToHaveApplied(ctx); err != nil { return nil, err } + + // Return error if the disk is stalled or rejecting writes. + // Noop by default, must be enabled with the flag "disk-write-dir". + if tm.dhMonitor.IsDiskStalled() { + return nil, errors.New("stalled disk") + } + // Server ID - "select @@global.server_id" serverID, err := tm.MysqlDaemon.GetServerID(ctx) if err != nil { diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index fbef04de357..c22ea0a6e51 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -95,8 +95,11 @@ var ( skipBuildInfoTags = "/.*/" initTags flagutil.StringMapValue - initTimeout = 1 * time.Minute - mysqlShutdownTimeout = mysqlctl.DefaultShutdownTimeout + initTimeout = 1 * time.Minute + mysqlShutdownTimeout = mysqlctl.DefaultShutdownTimeout + stalledDiskWriteDir = "" + stalledDiskWriteTimeout = 30 * time.Second + stalledDiskWriteInterval = 5 * time.Second ) func registerInitFlags(fs *pflag.FlagSet) { @@ -109,6 +112,9 @@ func registerInitFlags(fs *pflag.FlagSet) { fs.Var(&initTags, "init_tags", "(init parameter) comma separated list of key:value pairs used to tag the tablet") fs.DurationVar(&initTimeout, "init_timeout", initTimeout, "(init parameter) timeout to use for the init phase.") fs.DurationVar(&mysqlShutdownTimeout, "mysql-shutdown-timeout", mysqlShutdownTimeout, "timeout to use when MySQL is being shut down.") + fs.StringVar(&stalledDiskWriteDir, "disk-write-dir", stalledDiskWriteDir, "if provided, tablet will attempt to write a file to this directory to check if the disk is stalled") + fs.DurationVar(&stalledDiskWriteTimeout, "disk-write-timeout", stalledDiskWriteTimeout, "if writes exceed this duration, the disk is considered stalled") + fs.DurationVar(&stalledDiskWriteInterval, "disk-write-interval", stalledDiskWriteInterval, "how often to write to the disk to check whether it is stalled") } var ( @@ -164,6 +170,7 @@ type TabletManager struct { VREngine *vreplication.Engine VDiffEngine *vdiff.Engine Env *vtenv.Environment + dhMonitor DiskHealthMonitor // tmc is used to run an RPC against other vttablets. tmc tmclient.TabletManagerClient @@ -372,6 +379,7 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, config *tabletenv.Tabl tm.tmc = tmclient.NewTabletManagerClient() tm.tmState = newTMState(tm, tablet) tm.actionSema = semaphore.NewWeighted(1) + tm.dhMonitor = newDiskHealthMonitor(tm.BatchCtx) tm._waitForGrantsComplete = make(chan struct{}) tm.baseTabletType = tablet.Type