diff --git a/src/main.rs b/src/main.rs index 3ee4a5e..0a16ffc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod config; mod messages; mod queries; +mod server; use clap::Parser; use config::read_config_file; @@ -8,6 +9,7 @@ use mysql::OptsBuilder; use mysql::{Pool, PoolConstraints, PoolOpts}; use file_guard::Lock; +use server::ServerStatus; use std::fs::OpenOptions; /// Readyset ProxySQL Scheduler @@ -71,7 +73,7 @@ fn main() { .unwrap(); let mut proxysql_conn = proxysql_pool.get_conn().unwrap(); - let readyset_pool = Pool::new( + let readyset_pool = match Pool::new( OptsBuilder::new() .ip_or_hostname(Some(config.readyset_host.as_str())) .tcp_port(config.readyset_port) @@ -83,10 +85,40 @@ fn main() { .with_reset_connection(false) .with_constraints(PoolConstraints::new(1, 1).unwrap()), ), - ) - .unwrap(); + ) { + Ok(conn) => conn, + Err(e) => { + messages::print_error(format!("Cannot connect to Readyset: {}.", e).as_str()); + let _ = + server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Shunned); + std::process::exit(1); + } + }; let mut readyset_conn = readyset_pool.get_conn().unwrap(); + match server::check_readyset_is_ready(&mut readyset_conn) { + Ok(ready) => { + if ready { + let _ = + server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Online); + } else { + messages::print_info("Readyset is still running Snapshot."); + let _ = server::change_server_status( + &mut proxysql_conn, + &config, + ServerStatus::Shunned, + ); + std::process::exit(0); + } + } + Err(e) => { + messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str()); + let _ = + server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Shunned); + std::process::exit(1); + } + }; + let mut queries_added_or_change = queries::adjust_mirror_rules(&mut proxysql_conn, &config).unwrap(); diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..52c67d1 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,72 @@ +use core::fmt; + +use crate::{config::Config, messages}; +use mysql::{prelude::Queryable, PooledConn}; + +#[allow(dead_code)] +pub enum ServerStatus { + //backend server is fully operational + Online, + //backend sever is temporarily taken out of use because of either too many connection errors in a time that was too short, or the replication lag exceeded the allowed threshold + Shunned, + //when a server is put into OFFLINE_SOFT mode, no new connections are created toward that server, while the existing connections are kept until they are returned to the connection pool or destructed. In other words, connections are kept in use until multiplexing is enabled again, for example when a transaction is completed. This makes it possible to gracefully detach a backend as long as multiplexing is efficient + OfflineSoft, + //when a server is put into OFFLINE_HARD mode, no new connections are created toward that server and the existing **free **connections are ** immediately dropped**, while backend connections currently associated with a client session are dropped as soon as the client tries to use them. This is equivalent to deleting the server from a hostgroup. Internally, setting a server in OFFLINE_HARD status is equivalent to deleting the server + OfflineHard, +} + +impl fmt::Display for ServerStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ServerStatus::Online => write!(f, "ONLINE"), + ServerStatus::Shunned => write!(f, "SHUNNED"), + ServerStatus::OfflineSoft => write!(f, "OFFLINE_SOFT"), + ServerStatus::OfflineHard => write!(f, "OFFLINE_HARD"), + } + } +} + +pub fn check_readyset_is_ready(rs_conn: &mut PooledConn) -> Result { + let rows: Vec<(String, String)> = rs_conn.query("SHOW READYSET STATUS").unwrap_or(vec![]); + for (field, value) in rows { + if field == "Snapshot Status" { + return Ok(value == "Completed"); + } + } + Ok(false) +} + +pub fn change_server_status( + ps_conn: &mut PooledConn, + config: &Config, + server_status: ServerStatus, +) -> Result { + let where_clause = format!( + "WHERE hostgroup_id = {} AND hostname = '{}' AND port = {}", + config.readyset_hostgroup, config.readyset_host, config.readyset_port + ); + let select_query = format!("SELECT status FROM runtime_mysql_servers {}", where_clause); + let status: Option = ps_conn.query_first(select_query)?; + if status.as_ref().unwrap() != &server_status.to_string() { + messages::print_info( + format!( + "Server HG: {}, Host: {}, Port: {} is currently {}. Changing to {}", + config.readyset_hostgroup, + config.readyset_host, + config.readyset_port, + status.unwrap(), + server_status.to_string() + ) + .as_str(), + ); + ps_conn.query_drop(format!( + "UPDATE mysql_servers SET status = '{}' {}", + server_status.to_string(), + where_clause + ))?; + ps_conn.query_drop("LOAD MYSQL SERVERS TO RUNTIME")?; + ps_conn.query_drop("SAVE MYSQL SERVERS TO DISK")?; + } + + Ok(true) +}