Skip to content

Commit

Permalink
add server status
Browse files Browse the repository at this point in the history
Add server status check to move the server from online to SHUNNED in case
snapshot is still ongoing or we cannot connect to Readyset

Closes #1
  • Loading branch information
altmannmarcelo committed Apr 3, 2024
1 parent 86e6019 commit ab075e2
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 3 deletions.
38 changes: 35 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
mod config;
mod messages;
mod queries;
mod server;

use clap::Parser;
use config::read_config_file;
use mysql::OptsBuilder;
use mysql::{Pool, PoolConstraints, PoolOpts};

use file_guard::Lock;
use server::ServerStatus;
use std::fs::OpenOptions;

/// Readyset ProxySQL Scheduler
Expand Down Expand Up @@ -71,7 +73,7 @@ fn main() {
.unwrap();
let mut proxysql_conn = proxysql_pool.get_conn().unwrap();

let readyset_pool = Pool::new(
let readyset_pool = match Pool::new(
OptsBuilder::new()
.ip_or_hostname(Some(config.readyset_host.as_str()))
.tcp_port(config.readyset_port)
Expand All @@ -83,10 +85,40 @@ fn main() {
.with_reset_connection(false)
.with_constraints(PoolConstraints::new(1, 1).unwrap()),
),
)
.unwrap();
) {
Ok(conn) => conn,
Err(e) => {
messages::print_error(format!("Cannot connect to Readyset: {}.", e).as_str());
let _ =
server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Shunned);
std::process::exit(1);
}
};
let mut readyset_conn = readyset_pool.get_conn().unwrap();

match server::check_readyset_is_ready(&mut readyset_conn) {
Ok(ready) => {
if ready {
let _ =
server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Online);
} else {
messages::print_info("Readyset is still running Snapshot.");
let _ = server::change_server_status(
&mut proxysql_conn,
&config,
ServerStatus::Shunned,
);
std::process::exit(0);
}
}
Err(e) => {
messages::print_error(format!("Cannot check Readyset status: {}.", e).as_str());
let _ =
server::change_server_status(&mut proxysql_conn, &config, ServerStatus::Shunned);
std::process::exit(1);
}
};

let mut queries_added_or_change =
queries::adjust_mirror_rules(&mut proxysql_conn, &config).unwrap();

Expand Down
72 changes: 72 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use core::fmt;

use crate::{config::Config, messages};
use mysql::{prelude::Queryable, PooledConn};

#[allow(dead_code)]
pub enum ServerStatus {
//backend server is fully operational
Online,
//backend sever is temporarily taken out of use because of either too many connection errors in a time that was too short, or the replication lag exceeded the allowed threshold
Shunned,
//when a server is put into OFFLINE_SOFT mode, no new connections are created toward that server, while the existing connections are kept until they are returned to the connection pool or destructed. In other words, connections are kept in use until multiplexing is enabled again, for example when a transaction is completed. This makes it possible to gracefully detach a backend as long as multiplexing is efficient
OfflineSoft,
//when a server is put into OFFLINE_HARD mode, no new connections are created toward that server and the existing **free **connections are ** immediately dropped**, while backend connections currently associated with a client session are dropped as soon as the client tries to use them. This is equivalent to deleting the server from a hostgroup. Internally, setting a server in OFFLINE_HARD status is equivalent to deleting the server
OfflineHard,
}

impl fmt::Display for ServerStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ServerStatus::Online => write!(f, "ONLINE"),
ServerStatus::Shunned => write!(f, "SHUNNED"),
ServerStatus::OfflineSoft => write!(f, "OFFLINE_SOFT"),
ServerStatus::OfflineHard => write!(f, "OFFLINE_HARD"),
}
}
}

pub fn check_readyset_is_ready(rs_conn: &mut PooledConn) -> Result<bool, mysql::Error> {
let rows: Vec<(String, String)> = rs_conn.query("SHOW READYSET STATUS").unwrap_or(vec![]);
for (field, value) in rows {
if field == "Snapshot Status" {
return Ok(value == "Completed");
}
}
Ok(false)
}

pub fn change_server_status(
ps_conn: &mut PooledConn,
config: &Config,
server_status: ServerStatus,
) -> Result<bool, mysql::Error> {
let where_clause = format!(
"WHERE hostgroup_id = {} AND hostname = '{}' AND port = {}",
config.readyset_hostgroup, config.readyset_host, config.readyset_port
);
let select_query = format!("SELECT status FROM runtime_mysql_servers {}", where_clause);
let status: Option<String> = ps_conn.query_first(select_query)?;
if status.as_ref().unwrap() != &server_status.to_string() {
messages::print_info(
format!(
"Server HG: {}, Host: {}, Port: {} is currently {}. Changing to {}",
config.readyset_hostgroup,
config.readyset_host,
config.readyset_port,
status.unwrap(),
server_status.to_string()
)
.as_str(),
);
ps_conn.query_drop(format!(
"UPDATE mysql_servers SET status = '{}' {}",
server_status.to_string(),
where_clause
))?;
ps_conn.query_drop("LOAD MYSQL SERVERS TO RUNTIME")?;
ps_conn.query_drop("SAVE MYSQL SERVERS TO DISK")?;
}

Ok(true)
}

0 comments on commit ab075e2

Please sign in to comment.