Skip to content

Commit

Permalink
Allow failover for replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
magec committed Nov 12, 2024
1 parent 0ee59c0 commit f24a572
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 79 deletions.
9 changes: 9 additions & 0 deletions CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,15 @@ If the client doesn't specify, PgCat routes traffic to this role by default.
`replica` round-robin between replicas only without touching the primary,
`primary` all queries go to the primary unless otherwise specified.

### replica_to_primary_failover_enabled
```
path: pools.<pool_name>.replica_to_primary_failover_enabled
default: "false"
```

If set to true, when the specified role is `replica` (either by setting `default_role` or manually)
and all replicas are banned, queries will be sent to the primary (until a replica is back online).

### prepared_statements_cache_size
```
path: general.prepared_statements_cache_size
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ The setting will persist until it's changed again or the client disconnects.
By default, all queries are routed to the first available server; `default_role` setting controls this behavior.

### Failover
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If all servers become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.
All servers are checked with a `;` (very fast) query before being given to a client. Additionally, the server health is monitored with every client query that it processes. If the server is not reachable, it will be banned and cannot serve any more transactions for the duration of the ban. The queries are routed to the remaining servers. If `replica_to_primary_failover_enabled` is set to true and all replicas become banned, the query will be routed to the primary. If `replica_to_primary_failover_enabled` is false and all servers (replicas) become banned, the ban list is cleared: this is a safety precaution against false positives. The primary can never be banned.

The ban time can be changed with `ban_time`. The default is 60 seconds.

Expand Down
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,9 @@ pub struct Pool {
#[serde(default = "Pool::default_default_role")]
pub default_role: String,

#[serde(default)] // False
pub replica_to_primary_failover_enabled: bool,

#[serde(default)] // False
pub query_parser_enabled: bool,

Expand Down Expand Up @@ -734,6 +737,7 @@ impl Default for Pool {
pool_mode: Self::default_pool_mode(),
load_balancing_mode: Self::default_load_balancing_mode(),
default_role: String::from("any"),
replica_to_primary_failover_enabled: false,
query_parser_enabled: false,
query_parser_max_length: None,
query_parser_read_write_splitting: false,
Expand Down
51 changes: 37 additions & 14 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ pub struct PoolSettings {
// Default server role to connect to.
pub default_role: Option<Role>,

// Whether or not we should use primary when replicas are unavailable
pub replica_to_primary_failover_enabled: bool,

// Enable/disable query parser.
pub query_parser_enabled: bool,

Expand Down Expand Up @@ -219,6 +222,7 @@ impl Default for PoolSettings {
user: User::default(),
db: String::default(),
default_role: None,
replica_to_primary_failover_enabled: false,
query_parser_enabled: false,
query_parser_max_length: None,
query_parser_read_write_splitting: false,
Expand Down Expand Up @@ -531,6 +535,8 @@ impl ConnectionPool {
"primary" => Some(Role::Primary),
_ => unreachable!(),
},
replica_to_primary_failover_enabled: pool_config
.replica_to_primary_failover_enabled,
query_parser_enabled: pool_config.query_parser_enabled,
query_parser_max_length: pool_config.query_parser_max_length,
query_parser_read_write_splitting: pool_config
Expand Down Expand Up @@ -731,6 +737,19 @@ impl ConnectionPool {
});
}

// If the role is replica and we allow sending traffic to primary when replicas are unavailble,
// we add primary address at the end of the list of candidates, this way it will be tried when
// replicas are all unavailable.
if role == Role::Replica && self.settings.replica_to_primary_failover_enabled {
let mut primaries = self
.addresses
.iter()
.flatten()
.filter(|address| address.role == Role::Primary)
.collect::<Vec<&Address>>();
candidates.insert(0, primaries.pop().unwrap());
}

// Indicate we're waiting on a server connection from a pool.
let now = Instant::now();
client_stats.waiting();
Expand Down Expand Up @@ -935,24 +954,28 @@ impl ConnectionPool {
return true;
}

// Check if all replicas are banned, in that case unban all of them
let replicas_available = self.addresses[address.shard]
.iter()
.filter(|addr| addr.role == Role::Replica)
.count();
// If we have replica to primary failover we should not unban replicas
// as we still have the primary to server traffic.
if !self.settings.replica_to_primary_failover_enabled {
// Check if all replicas are banned, in that case unban all of them
let replicas_available = self.addresses[address.shard]
.iter()
.filter(|addr| addr.role == Role::Replica)
.count();

debug!("Available targets: {}", replicas_available);
debug!("Available targets: {}", replicas_available);

let read_guard = self.banlist.read();
let all_replicas_banned = read_guard[address.shard].len() == replicas_available;
drop(read_guard);
let read_guard = self.banlist.read();
let all_replicas_banned = read_guard[address.shard].len() == replicas_available;
drop(read_guard);

if all_replicas_banned {
let mut write_guard = self.banlist.write();
warn!("Unbanning all replicas.");
write_guard[address.shard].clear();
if all_replicas_banned {
let mut write_guard = self.banlist.write();
warn!("Unbanning all replicas.");
write_guard[address.shard].clear();

return true;
return true;
}
}

// Check if ban time is expired
Expand Down
2 changes: 2 additions & 0 deletions src/query_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1459,6 +1459,7 @@ mod test {
load_balancing_mode: crate::config::LoadBalancingMode::Random,
shards: 2,
user: crate::config::User::default(),
replica_to_primary_failover_enabled: false,
default_role: Some(Role::Replica),
query_parser_enabled: true,
query_parser_max_length: None,
Expand Down Expand Up @@ -1538,6 +1539,7 @@ mod test {
shards: 5,
user: crate::config::User::default(),
default_role: Some(Role::Replica),
replica_to_primary_failover_enabled: false,
query_parser_enabled: true,
query_parser_max_length: None,
query_parser_read_write_splitting: true,
Expand Down
Loading

0 comments on commit f24a572

Please sign in to comment.