diff --git a/CONFIG.md b/CONFIG.md index 4e984ede..f95acb1e 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -309,6 +309,15 @@ If the client doesn't specify, PgCat routes traffic to this role by default. `replica` round-robin between replicas only without touching the primary, `primary` all queries go to the primary unless otherwise specified. +### replica_to_primary_failover_enabled +``` +path: pools..replica_to_primary_failover_enabled +default: "false" +``` + +If set to true, when the specified role is `replica` (either by setting `default_role` or manually) +and all replicas are banned, queries will be sent to the primary (until a replica is back online). + ### prepared_statements_cache_size ``` path: general.prepared_statements_cache_size diff --git a/README.md b/README.md index 21e6da70..9b814d02 100644 --- a/README.md +++ b/README.md @@ -175,7 +175,7 @@ The setting will persist until it's changed again or the client disconnects. By default, all queries are routed to the first available server; `default_role` setting controls this behavior. ### Failover -All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned. +All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If `replica_to_primary_failover_enabled` is set to true and all replicas become banned, the query will be routed to the primary. If `replica_to_primary_failover_enabled` is false and all servers (replicas) become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned. The ban time can be changed with `ban_time`. The default is 60 seconds. diff --git a/src/config.rs b/src/config.rs index b0d98fb5..2eae45e5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -541,6 +541,9 @@ pub struct Pool { #[serde(default = "Pool::default_default_role")] pub default_role: String, + #[serde(default)] // False + pub replica_to_primary_failover_enabled: bool, + #[serde(default)] // False pub query_parser_enabled: bool, @@ -734,6 +737,7 @@ impl Default for Pool { pool_mode: Self::default_pool_mode(), load_balancing_mode: Self::default_load_balancing_mode(), default_role: String::from("any"), + replica_to_primary_failover_enabled: false, query_parser_enabled: false, query_parser_max_length: None, query_parser_read_write_splitting: false, diff --git a/src/pool.rs b/src/pool.rs index 7915a0a4..80d6b968 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -162,6 +162,9 @@ pub struct PoolSettings { // Default server role to connect to. pub default_role: Option, + // Whether or not we should use primary when replicas are unavailable + pub replica_to_primary_failover_enabled: bool, + // Enable/disable query parser. pub query_parser_enabled: bool, @@ -219,6 +222,7 @@ impl Default for PoolSettings { user: User::default(), db: String::default(), default_role: None, + replica_to_primary_failover_enabled: false, query_parser_enabled: false, query_parser_max_length: None, query_parser_read_write_splitting: false, @@ -531,6 +535,8 @@ impl ConnectionPool { "primary" => Some(Role::Primary), _ => unreachable!(), }, + replica_to_primary_failover_enabled: pool_config + .replica_to_primary_failover_enabled, query_parser_enabled: pool_config.query_parser_enabled, query_parser_max_length: pool_config.query_parser_max_length, query_parser_read_write_splitting: pool_config @@ -731,6 +737,19 @@ impl ConnectionPool { }); } + // If the role is replica and we allow sending traffic to primary when replicas are unavailble, + // we add primary address at the end of the list of candidates, this way it will be tried when + // replicas are all unavailable. + if role == Role::Replica && self.settings.replica_to_primary_failover_enabled { + let mut primaries = self + .addresses + .iter() + .flatten() + .filter(|address| address.role == Role::Primary) + .collect::>(); + candidates.insert(0, primaries.pop().unwrap()); + } + // Indicate we're waiting on a server connection from a pool. let now = Instant::now(); client_stats.waiting(); @@ -935,24 +954,28 @@ impl ConnectionPool { return true; } - // Check if all replicas are banned, in that case unban all of them - let replicas_available = self.addresses[address.shard] - .iter() - .filter(|addr| addr.role == Role::Replica) - .count(); + // If we have replica to primary failover we should not unban replicas + // as we still have the primary to server traffic. + if !self.settings.replica_to_primary_failover_enabled { + // Check if all replicas are banned, in that case unban all of them + let replicas_available = self.addresses[address.shard] + .iter() + .filter(|addr| addr.role == Role::Replica) + .count(); - debug!("Available targets: {}", replicas_available); + debug!("Available targets: {}", replicas_available); - let read_guard = self.banlist.read(); - let all_replicas_banned = read_guard[address.shard].len() == replicas_available; - drop(read_guard); + let read_guard = self.banlist.read(); + let all_replicas_banned = read_guard[address.shard].len() == replicas_available; + drop(read_guard); - if all_replicas_banned { - let mut write_guard = self.banlist.write(); - warn!("Unbanning all replicas."); - write_guard[address.shard].clear(); + if all_replicas_banned { + let mut write_guard = self.banlist.write(); + warn!("Unbanning all replicas."); + write_guard[address.shard].clear(); - return true; + return true; + } } // Check if ban time is expired diff --git a/src/query_router.rs b/src/query_router.rs index 2ed6b755..004b256e 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -1459,6 +1459,7 @@ mod test { load_balancing_mode: crate::config::LoadBalancingMode::Random, shards: 2, user: crate::config::User::default(), + replica_to_primary_failover_enabled: false, default_role: Some(Role::Replica), query_parser_enabled: true, query_parser_max_length: None, @@ -1538,6 +1539,7 @@ mod test { shards: 5, user: crate::config::User::default(), default_role: Some(Role::Replica), + replica_to_primary_failover_enabled: false, query_parser_enabled: true, query_parser_max_length: None, query_parser_read_write_splitting: true, diff --git a/tests/ruby/load_balancing_spec.rb b/tests/ruby/load_balancing_spec.rb index f00f8db1..085c5bab 100644 --- a/tests/ruby/load_balancing_spec.rb +++ b/tests/ruby/load_balancing_spec.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true -require_relative 'spec_helper' +require_relative "spec_helper" describe "Random Load Balancing" do let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) } @@ -8,7 +8,7 @@ processes.pgcat.shutdown end - context "under regular circumstances" do + context("under regular circumstances") do it "balances query volume between all instances" do conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) @@ -22,14 +22,14 @@ failed_count += 1 end - expect(failed_count).to eq(0) + expect(failed_count).to(eq(0)) processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share| - expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) + expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)) end end end - context "when some replicas are down" do + context("when some replicas are down") do it "balances query volume between working instances" do conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) expected_share = QUERY_COUNT / (processes.all_databases.count - 2) @@ -49,48 +49,13 @@ processes.all_databases.each do |instance| queries_routed = instance.count_select_1_plus_2 if processes.replicas[0..1].include?(instance) - expect(queries_routed).to eq(0) + expect(queries_routed).to(eq(0)) else - expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) + expect(queries_routed).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)) end end end end - - context "when all replicas are down " do - let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", {"default_role" => "replica"}) } - - it "unbans them automatically to prevent false positives in health checks that could make all replicas unavailable" do - conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) - failed_count = 0 - number_of_replicas = processes[:replicas].length - - # Take down all replicas - processes[:replicas].each(&:take_down) - - (number_of_replicas + 1).times do |n| - conn.async_exec("SELECT 1 + 2") - rescue - conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) - failed_count += 1 - end - - expect(failed_count).to eq(number_of_replicas + 1) - failed_count = 0 - - # Ban_time is configured to 60 so this reset will only work - # if the replicas are unbanned automatically - processes[:replicas].each(&:reset) - - number_of_replicas.times do - conn.async_exec("SELECT 1 + 2") - rescue - conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) - failed_count += 1 - end - expect(failed_count).to eq(0) - end - end end describe "Least Outstanding Queries Load Balancing" do @@ -100,7 +65,7 @@ processes.pgcat.shutdown end - context "under homogeneous load" do + context("under homogeneous load") do it "balances query volume between all instances" do conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) @@ -114,15 +79,15 @@ failed_count += 1 end - expect(failed_count).to eq(0) + expect(failed_count).to(eq(0)) processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share| - expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) + expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)) end end end - context "under heterogeneous load" do - xit "balances query volume between all instances based on how busy they are" do + context("under heterogeneous load") do + xit("balances query volume between all instances based on how busy they are") do slow_query_count = 2 threads = Array.new(slow_query_count) do Thread.new do @@ -143,31 +108,32 @@ failed_count += 1 end - expect(failed_count).to eq(0) + expect(failed_count).to(eq(0)) # Under LOQ, we expect replicas running the slow pg_sleep # to get no selects expect( - processes. - all_databases. - map(&:count_select_1_plus_2). - count { |instance_share| instance_share == 0 } - ).to eq(slow_query_count) + processes + .all_databases + .map(&:count_select_1_plus_2) + .count { |instance_share| instance_share == 0 } + ) + .to(eq(slow_query_count)) # We also expect the quick queries to be spread across # the idle servers only - processes. - all_databases. - map(&:count_select_1_plus_2). - reject { |instance_share| instance_share == 0 }. - each do |instance_share| - expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) - end + processes + .all_databases + .map(&:count_select_1_plus_2) + .reject { |instance_share| instance_share == 0 } + .each do |instance_share| + expect(instance_share).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)) + end threads.map(&:join) end end - context "when some replicas are down" do + context("when some replicas are down") do it "balances query volume between working instances" do conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) expected_share = QUERY_COUNT / (processes.all_databases.count - 2) @@ -184,13 +150,104 @@ end end - expect(failed_count).to be <= 2 + expect(failed_count).to(be <= 2) processes.all_databases.each do |instance| queries_routed = instance.count_select_1_plus_2 if processes.replicas[0..1].include?(instance) - expect(queries_routed).to eq(0) + expect(queries_routed).to(eq(0)) else - expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share) + expect(queries_routed).to(be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)) + end + end + end + end +end + +describe "Candidate filtering based on `default_pool`" do + let(:processes) { + Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction", "random", "debug", pool_settings) + } + + after do + processes.all_databases.map(&:reset) + processes.pgcat.shutdown + end + + context("with default_pool set to replicas") do + context("when all replicas are down ") do + let(:pool_settings) do + { + "default_role" => "replica", + "replica_to_primary_failover_enabled" => replica_to_primary_failover_enabled + } + end + + context("with `replica_to_primary_failover_enabled` set to false`") do + let(:replica_to_primary_failover_enabled) { false } + + it( + "unbans them automatically to prevent false positives in health checks that could make all replicas unavailable" + ) do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + failed_count = 0 + number_of_replicas = processes[:replicas].length + + # Take down all replicas + processes[:replicas].each(&:take_down) + + (number_of_replicas + 1).times do |n| + conn.async_exec("SELECT 1 + 2") + rescue + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + failed_count += 1 + end + + expect(failed_count).to(eq(number_of_replicas + 1)) + failed_count = 0 + + # Ban_time is configured to 60 so this reset will only work + # if the replicas are unbanned automatically + processes[:replicas].each(&:reset) + + number_of_replicas.times do + conn.async_exec("SELECT 1 + 2") + rescue + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + failed_count += 1 + end + + expect(failed_count).to(eq(0)) + end + end + + context("with `replica_to_primary_failover_enabled` set to true`") do + let(:replica_to_primary_failover_enabled) { true } + + it "does not unbans them automatically" do + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + failed_count = 0 + number_of_replicas = processes[:replicas].length + + # We need to allow pgcat to open connections to replicas + (number_of_replicas + 10).times do |n| + conn.async_exec("SELECT 1 + 2") + rescue + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + failed_count += 1 + end + expect(failed_count).to(eq(0)) + + # Take down all replicas + processes[:replicas].each(&:take_down) + + (number_of_replicas + 10).times do |n| + conn.async_exec("SELECT 1 + 2") + rescue + conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + failed_count += 1 + end + + expect(failed_count).to(eq(number_of_replicas)) end end end