diff --git a/CONFIG.md b/CONFIG.md index 4e984ede..98d3a55f 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -309,6 +309,45 @@ If the client doesn't specify, PgCat routes traffic to this role by default. `replica` round-robin between replicas only without touching the primary, `primary` all queries go to the primary unless otherwise specified. +### db_activity_based_routing +``` +path: pools..db_activity_based_routing +default: false +``` + +If enabled, PgCat will route queries to the primary if the queried table was recently written to. +Only relevant when `query_parser_enabled` is enabled. + +##### Considerations: +- *This feature is experimental and may not work as expected.* +- This feature only works when the same PgCat instance is used for both reads and writes to the database. +- This feature is not relevant when the primary is not part of the pool of databases used for load balancing of read queries. +- If more than one PgCat instance is used for HA purposes, this feature will not work as expected. A way to still make it work is by using sticky sessions. + +### db_activity_based_init_delay +``` +path: pools..db_activity_based_init_delay +default: 100 +``` + +The delay in milliseconds before the first activity-based routing check is performed. + +### db_activity_ttl +``` +path: pools..db_activity_ttl +default: 900 +``` + +The time in seconds after which a DB is considered inactive when no queries/updates are performed to it. + +### table_mutation_cache_ttl +``` +path: pools..table_mutation_cache_ttl +default: 50 +``` + +The time in milliseconds after a write to a table that all queries to that table will be routed to the primary. + ### prepared_statements_cache_size ``` path: general.prepared_statements_cache_size diff --git a/Cargo.lock b/Cargo.lock index fb58c7b2..3b39286e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -132,7 +132,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.87", ] [[package]] @@ -143,7 +143,7 @@ checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.87", ] [[package]] @@ -229,6 +229,12 @@ version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +[[package]] +name = "bytecount" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce" + [[package]] name = "byteorder" version = "1.4.3" @@ -241,6 +247,37 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "camino" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24b1f0365a6c6bb4020cd05806fd0d33c44d38046b8bd7f0e40814b9763cabfc" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "cc" version = "1.0.79" @@ -300,7 +337,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.87", ] [[package]] @@ -330,6 +367,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crypto-common" version = "0.1.6" @@ -340,6 +392,19 @@ dependencies = [ "typenum", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.4.0" @@ -402,6 +467,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "exitcode" version = "1.1.2" @@ -414,6 +488,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fastrand" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" + [[package]] name = "fnv" version = "1.0.7" @@ -485,7 +565,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.87", ] [[package]] @@ -545,6 +625,12 @@ version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.4.6" @@ -918,6 +1004,21 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mini-moka" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c325dfab65f261f386debee8b0969da215b3fa0037e74c8a1234db7ba986d803" +dependencies = [ + "crossbeam-channel", + "crossbeam-utils", + "dashmap", + "skeptic", + "smallvec", + "tagptr", + "triomphe", +] + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -992,9 +1093,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.18.0" +version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "overload" @@ -1033,7 +1134,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "pgcat" -version = "1.2.0" +version = "1.3.0" dependencies = [ "arc-swap", "async-trait", @@ -1055,6 +1156,7 @@ dependencies = [ "log", "lru", "md-5", + "mini-moka", "nix", "num_cpus", "once_cell", @@ -1069,6 +1171,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", + "serial_test", "sha-1", "sha2", "socket2 0.4.9", @@ -1114,7 +1217,7 @@ dependencies = [ "phf_shared", "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.87", ] [[package]] @@ -1143,7 +1246,7 @@ checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.87", ] [[package]] @@ -1184,13 +1287,24 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.66" +version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" +checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" dependencies = [ "unicode-ident", ] +[[package]] +name = "pulldown-cmark" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b" +dependencies = [ + "bitflags 2.3.3", + "memchr", + "unicase", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -1199,9 +1313,9 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quote" -version = "1.0.31" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fe8a65d69dd0808184ebb5f836ab526bb259db23c657efa38711b1072ee47f0" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" dependencies = [ "proc-macro2", ] @@ -1380,6 +1494,24 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "scc" +version = "2.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8d25269dd3a12467afe2e510f69fb0b46b698e5afb296b59f2145259deaf8e8" +dependencies = [ + "sdd", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1396,24 +1528,39 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sdd" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49c1eeaf4b6a87c7479688c6d52b9f1153cedd3c489300564f932b065c6eab95" + +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +dependencies = [ + "serde", +] + [[package]] name = "serde" -version = "1.0.171" +version = "1.0.214" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9" +checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.171" +version = "1.0.214" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" +checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.87", ] [[package]] @@ -1436,6 +1583,31 @@ dependencies = [ "serde", ] +[[package]] +name = "serial_test" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b4b487fe2acf240a021cf57c6b2b4903b1e78ca0ecd862a71b71d2a51fed77d" +dependencies = [ + "futures", + "log", + "once_cell", + "parking_lot", + "scc", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82fe9db325bcef1fbcde82e078a5cc4efdf787e96b3b9cf45b50b529f2083d67" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "sha-1" version = "0.10.1" @@ -1482,6 +1654,21 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "slab" version = "0.4.8" @@ -1541,7 +1728,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.87", ] [[package]] @@ -1585,15 +1772,34 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.26" +version = "2.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970" +checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" dependencies = [ "proc-macro2", "quote", "unicode-ident", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + +[[package]] +name = "tempfile" +version = "3.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" +dependencies = [ + "cfg-if", + "fastrand", + "redox_syscall", + "rustix", + "windows-sys", +] + [[package]] name = "thiserror" version = "1.0.43" @@ -1611,7 +1817,7 @@ checksum = "463fe12d7993d3b327787537ce8dd4dfa058de32fc2b195ef3cde03dc4771e8f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.87", ] [[package]] @@ -1678,7 +1884,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.87", ] [[package]] @@ -1783,7 +1989,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.87", ] [[package]] @@ -1838,6 +2044,12 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "triomphe" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" + [[package]] name = "trust-dns-proto" version = "0.22.0" @@ -1895,6 +2107,12 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +[[package]] +name = "unicase" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e51b68083f157f853b6379db119d1c1be0e6e4dec98101079dec41f6f5cf6df" + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -1951,6 +2169,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -1993,7 +2221,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.87", "wasm-bindgen-shared", ] @@ -2015,7 +2243,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.87", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2067,6 +2295,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" +dependencies = [ + "windows-sys", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 21cfb0c3..38c7a27d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgcat" -version = "1.2.0" +version = "1.3.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -55,6 +55,10 @@ tracing-subscriber = { version = "0.3.17", features = [ "std", ] } lru = "0.12.0" +mini-moka = "0.10.3" [target.'cfg(not(target_env = "msvc"))'.dependencies] jemallocator = "0.5.0" + +[dev-dependencies] +serial_test = "*" diff --git a/charts/pgcat/Chart.yaml b/charts/pgcat/Chart.yaml index 430c839b..01294788 100644 --- a/charts/pgcat/Chart.yaml +++ b/charts/pgcat/Chart.yaml @@ -4,5 +4,5 @@ description: A Helm chart for PgCat a PostgreSQL pooler and proxy (like PgBounce maintainers: - name: PostgresML email: team@postgresml.org -appVersion: "1.2.0" +appVersion: "1.3.0" version: 0.2.5 diff --git a/charts/pgcat/templates/secret.yaml b/charts/pgcat/templates/secret.yaml index 358f4e97..c9bbd3cb 100644 --- a/charts/pgcat/templates/secret.yaml +++ b/charts/pgcat/templates/secret.yaml @@ -51,6 +51,10 @@ stringData: query_parser_enabled = {{ default true $pool.query_parser_enabled }} query_parser_read_write_splitting = {{ default true $pool.query_parser_read_write_splitting }} primary_reads_enabled = {{ default true $pool.primary_reads_enabled }} + db_activity_based_routing = {{ default false $pool.db_activity_based_routing }} + db_activity_based_init_delay = {{ default 100 $pool.db_activity_based_init_delay }} + db_activity_ttl = {{ default 900 $pool.db_activity_ttl }} + table_mutation_cache_ttl = {{ default 50 $pool.table_mutation_cache_ttl }} sharding_function = {{ default "pg_bigint_hash" $pool.sharding_function | quote }} {{- range $index, $user := $pool.users }} diff --git a/charts/pgcat/values.yaml b/charts/pgcat/values.yaml index 20a4e27b..2f7e16c7 100644 --- a/charts/pgcat/values.yaml +++ b/charts/pgcat/values.yaml @@ -298,6 +298,22 @@ configuration: # ## @param configuration.poolsPostgres.query_parser_read_write_splitting # query_parser_read_write_splitting: true + # ## Db activity based routing. If enabled, we'll route queries to the primary if the table was recently mutated. + # ## @param configuration.poolsPostgres.db_activity_based_routing + # db_activity_based_routing: false + + # ## DB activity based init delay. How long to wait before starting to route queries to the primary after a table mutation. + # ## @param configuration.poolsPostgres.db_activity_based_init_delay + # db_activity_based_init_delay: 100 + + # ## DB activity TTL. How long before marking the DB as inactive after no mutations or queries. + # ## @param configuration.poolsPostgres.db_activity_ttl + # db_activity_ttl: 900 + + # ## Table mutation cache TTL. How long to keep track of table mutations. + # ## @param configuration.poolsPostgres.table_mutation_cache_ttl + # table_mutation_cache_ttl: 50 + # ## If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for # ## load balancing of read queries. Otherwise, the primary will only be used for write # ## queries. The primary can always be explicitly selected with our custom protocol. diff --git a/src/config.rs b/src/config.rs index b0d98fb5..7c915c94 100644 --- a/src/config.rs +++ b/src/config.rs @@ -589,6 +589,19 @@ pub struct Pool { #[serde(default = "Pool::default_prepared_statements_cache_size")] pub prepared_statements_cache_size: usize, + // Support for query routing based on database activity + #[serde(default = "Pool::default_db_activity_based_routing")] + pub db_activity_based_routing: bool, + + #[serde(default = "Pool::default_db_activity_init_delay")] + pub db_activity_init_delay: u64, + + #[serde(default = "Pool::default_db_activity_ttl")] + pub db_activity_ttl: u64, + + #[serde(default = "Pool::default_table_mutation_cache_ttl")] + pub table_mutation_cache_ttl: u64, + pub plugins: Option, pub shards: BTreeMap, pub users: BTreeMap, @@ -642,6 +655,25 @@ impl Pool { 0 } + pub fn default_db_activity_based_routing() -> bool { + false + } + + pub fn default_db_activity_init_delay() -> u64 { + // 100 milliseconds + 100 + } + + pub fn default_db_activity_ttl() -> u64 { + // 15 minutes + 15 * 60 + } + + pub fn default_table_mutation_cache_ttl() -> u64 { + // 50 milliseconds + 50 + } + pub fn validate(&mut self) -> Result<(), Error> { match self.default_role.as_ref() { "any" => (), @@ -724,6 +756,23 @@ impl Pool { user.validate()?; } + if self.db_activity_based_routing { + if self.db_activity_init_delay == 0 { + error!("db_activity_init_delay must be greater than 0"); + return Err(Error::BadConfig); + } + + if self.table_mutation_cache_ttl == 0 { + error!("table_mutation_cache_ttl must be greater than 0"); + return Err(Error::BadConfig); + } + + if self.db_activity_ttl == 0 { + error!("db_activity_ttl must be greater than 0"); + return Err(Error::BadConfig); + } + } + Ok(()) } } @@ -753,6 +802,10 @@ impl Default for Pool { cleanup_server_connections: true, log_client_parameter_status_changes: false, prepared_statements_cache_size: Self::default_prepared_statements_cache_size(), + db_activity_based_routing: Self::default_db_activity_based_routing(), + db_activity_init_delay: Self::default_db_activity_init_delay(), + db_activity_ttl: Self::default_db_activity_ttl(), + table_mutation_cache_ttl: Self::default_table_mutation_cache_ttl(), plugins: None, shards: BTreeMap::from([(String::from("1"), Shard::default())]), users: BTreeMap::default(), @@ -1289,6 +1342,22 @@ impl Config { "[pool: {}] Cleanup server connections: {}", pool_name, pool_config.cleanup_server_connections ); + info!( + "[pool: {}] DB activity based routing: {}", + pool_name, pool_config.db_activity_based_routing + ); + info!( + "[pool: {}] DB activity init delay: {}", + pool_name, pool_config.db_activity_init_delay + ); + info!( + "[pool: {}] DB activity TTL: {}", + pool_name, pool_config.db_activity_ttl + ); + info!( + "[pool: {}] Table mutation cache TTL: {}", + pool_name, pool_config.table_mutation_cache_ttl + ); info!( "[pool: {}] Log client parameter status changes: {}", pool_name, pool_config.log_client_parameter_status_changes diff --git a/src/pool.rs b/src/pool.rs index 7915a0a4..979cdaa0 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -174,6 +174,18 @@ pub struct PoolSettings { // Read from the primary as well or not. pub primary_reads_enabled: bool, + // Automatic primary/replica selection based on recent activity. + pub db_activity_based_routing: bool, + + // DB activity init delay + pub db_activity_init_delay: u64, + + // DB activity TTL + pub db_activity_ttl: u64, + + // Table mutation cache TTL + pub table_mutation_cache_ttl: u64, + // Sharding function. pub sharding_function: ShardingFunction, @@ -223,6 +235,10 @@ impl Default for PoolSettings { query_parser_max_length: None, query_parser_read_write_splitting: false, primary_reads_enabled: true, + db_activity_based_routing: false, + db_activity_init_delay: 100, + db_activity_ttl: 15 * 60, + table_mutation_cache_ttl: 50, sharding_function: ShardingFunction::PgBigintHash, automatic_sharding_key: None, healthcheck_delay: General::default_healthcheck_delay(), @@ -537,6 +553,10 @@ impl ConnectionPool { .query_parser_read_write_splitting, primary_reads_enabled: pool_config.primary_reads_enabled, sharding_function: pool_config.sharding_function, + db_activity_based_routing: pool_config.db_activity_based_routing, + db_activity_init_delay: pool_config.db_activity_init_delay, + db_activity_ttl: pool_config.db_activity_ttl, + table_mutation_cache_ttl: pool_config.table_mutation_cache_ttl, automatic_sharding_key: pool_config.automatic_sharding_key.clone(), healthcheck_delay: config.general.healthcheck_delay, healthcheck_timeout: config.general.healthcheck_timeout, diff --git a/src/query_router.rs b/src/query_router.rs index 2ed6b755..439adff1 100644 --- a/src/query_router.rs +++ b/src/query_router.rs @@ -2,6 +2,7 @@ /// or implied query characteristics. use bytes::{Buf, BytesMut}; use log::{debug, error}; +use mini_moka::sync::Cache; use once_cell::sync::OnceCell; use regex::{Regex, RegexSet}; use sqlparser::ast::Statement::{Delete, Insert, Query, StartTransaction, Update}; @@ -11,6 +12,7 @@ use sqlparser::ast::{ }; use sqlparser::dialect::PostgreSqlDialect; use sqlparser::parser::Parser; +use std::sync::OnceLock; use crate::config::Role; use crate::errors::Error; @@ -21,6 +23,7 @@ use crate::sharding::Sharder; use std::collections::BTreeSet; use std::io::Cursor; +use std::time::Duration; use std::{cmp, mem}; /// Regexes used to parse custom commands. @@ -66,6 +69,18 @@ static CUSTOM_SQL_REGEX_SET: OnceCell = OnceCell::new(); // Get the value inside the custom command. static CUSTOM_SQL_REGEX_LIST: OnceCell> = OnceCell::new(); +#[derive(Debug, Clone, PartialEq)] +enum DatabaseActivityState { + Active, + Initializing, +} + +// A moka cache for the databases +// the key is the database name and the value is the database activity state +static DATABASE_ACTIVITY_CACHE: OnceLock> = OnceLock::new(); +// A moka cache for the tables, the key is the db_table. +static TABLE_MUTATIONS_CACHE: OnceLock> = OnceLock::new(); + /// The query router. pub struct QueryRouter { /// Which shard we should be talking to right now. @@ -398,6 +413,41 @@ impl QueryRouter { } } + fn database_activity_cache(&self) -> Cache { + DATABASE_ACTIVITY_CACHE + .get_or_init(|| { + Cache::builder() + .time_to_idle(Duration::from_secs(self.pool_settings.db_activity_ttl)) + .build() + }) + .clone() + } + + /// Check database activity state and reset it if necessary + fn database_activity_state(&self, db: &String) -> DatabaseActivityState { + let cache = self.database_activity_cache(); + + // Exists in cache + if cache.contains_key(db) { + return cache.get(db).unwrap(); + } + + // Not in cache + debug!("Adding database to cache: {}", db); + + cache.insert(db.to_string(), DatabaseActivityState::Initializing); + + // Set a timer to update the cache + let db = db.clone(); + let db_activity_init_delay = self.pool_settings.db_activity_init_delay; + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(db_activity_init_delay)).await; + cache.insert(db, DatabaseActivityState::Active); + }); + + DatabaseActivityState::Initializing + } + /// Try to infer which server to connect to based on the contents of the query. pub fn infer(&mut self, ast: &Vec) -> Result<(), Error> { if !self.pool_settings.query_parser_read_write_splitting { @@ -412,9 +462,23 @@ impl QueryRouter { return Err(Error::QueryRouterParserError("empty query".into())); } + let mut primary_set_based_on_activity = false; let mut visited_write_statement = false; let mut prev_inferred_shard = None; + if self.pool_settings.db_activity_based_routing { + let db = self.pool_settings.db.clone(); + let state = self.database_activity_state(&db); + debug!("Database activity state: {:?}", state); + + if let DatabaseActivityState::Initializing = state { + debug!("Database is initializing, going to primary"); + + self.active_role = Some(Role::Primary); + primary_set_based_on_activity = true; + } + } + for q in ast { match q { // All transactions go to the primary, probably a write. @@ -425,6 +489,22 @@ impl QueryRouter { // Likely a read-only query Query(query) => { + if primary_set_based_on_activity { + // If we already set the role based on activity, we don't need to do it again + continue; + } + + if self.pool_settings.db_activity_based_routing { + // Check if the tables in the query have been written to recently + if self.query_handles_tables_in_mutation_cache(query) { + debug!("Query handles tables in mutation cache, going to primary"); + + self.active_role = Some(Role::Primary); + primary_set_based_on_activity = true; + continue; + } + } + match &self.pool_settings.automatic_sharding_key { Some(_) => { // TODO: if we have multiple queries in the same message, @@ -455,6 +535,13 @@ impl QueryRouter { // Likely a write _ => { + debug!("Write statement found, going to primary"); + + if self.pool_settings.db_activity_based_routing { + // add all of the query tables to the mutation cache + self.update_mutation_cache_on_write(q); + } + match &self.pool_settings.automatic_sharding_key { Some(_) => { // TODO: similar to the above, if we have multiple queries in the @@ -497,11 +584,40 @@ impl QueryRouter { Ok(()) } - fn infer_shard_on_write(&mut self, q: &Statement) -> Result, Error> { - let mut exprs = Vec::new(); + fn table_mutations_cache(&self) -> Cache { + TABLE_MUTATIONS_CACHE + .get_or_init(|| { + Cache::builder() + .time_to_live(Duration::from_millis( + self.pool_settings.table_mutation_cache_ttl, + )) + .build() + }) + .clone() + } - // Collect all table names from the query. + fn query_handles_tables_in_mutation_cache(&self, query: &sqlparser::ast::Query) -> bool { + let table_mutations_cache = self.table_mutations_cache(); + debug!("Checking if query handles tables in mutation cache"); + debug!("Table mutations cache: {:?}", table_mutations_cache); + + for tables in self.table_names(query) { + for table in tables { + if table_mutations_cache.contains_key(&self.table_mutation_cache_key(table)) { + return true; + } + } + } + + false + } + fn extract_exprs_and_table_names<'a>( + &'a self, + q: &'a Statement, + ) -> Option<(Vec, Vec>, Option<&'a Vec>)> { + let mut exprs = Vec::new(); let mut table_names = Vec::new(); + let mut assignments_opt = None; match q { Insert { @@ -541,7 +657,7 @@ impl QueryRouter { exprs.push(expr.clone()); } - // Multi tables delete are not supported in postgres. + // Multi-table deletes are not supported in postgres. assert!(tables.is_empty()); Self::process_tables_with_join(from, &mut exprs, &mut table_names); @@ -566,14 +682,45 @@ impl QueryRouter { Self::process_table_with_join(from_tbl, &mut exprs, &mut table_names); } Self::process_selection(selection, &mut exprs); + + assignments_opt = Some(assignments); + } + _ => return None, + }; + + Some((exprs, table_names, assignments_opt)) + } + + fn infer_shard_on_write(&mut self, q: &Statement) -> Result, Error> { + if let Some((exprs, table_names, assignments_opt)) = self.extract_exprs_and_table_names(q) { + if let Some(assignments) = assignments_opt { self.assignment_parser(assignments)?; } - _ => { - return Ok(None); + + Ok(self.infer_shard_from_exprs(exprs, table_names)) + } else { + Ok(None) + } + } + + fn update_mutation_cache_on_write(&self, q: &Statement) { + if let Some((_exprs, table_names, _)) = self.extract_exprs_and_table_names(q) { + debug!("Updating mutation cache on write"); + debug!("Table names in mutation query: {:?}", table_names); + let table_mutations_cache = self.table_mutations_cache(); + for tables in table_names { + for table in tables { + table_mutations_cache.insert(self.table_mutation_cache_key(table), true); + } } - }; + } + } - Ok(self.infer_shard_from_exprs(exprs, table_names)) + // combines the database name and table name into a single string + // to be used as the key in the table mutation cache + // e.g. "mydb.mytable" + fn table_mutation_cache_key(&self, table: Ident) -> String { + format!("{}.{}", self.pool_settings.db, table.value) } fn process_query( @@ -955,6 +1102,18 @@ impl QueryRouter { self.infer_shard_from_exprs(exprs, table_names) } + /// get table names from query + fn table_names(&self, query: &sqlparser::ast::Query) -> Vec> { + let mut exprs = Vec::new(); + + let mut table_names = Vec::new(); + Self::process_query(query, &mut exprs, &mut table_names, &None); + + debug!("Table names in query: {:?}", table_names); + + table_names + } + fn infer_shard_from_exprs( &mut self, exprs: Vec, @@ -1122,6 +1281,7 @@ mod test { use crate::messages::simple_query; use crate::sharding::ShardingFunction; use bytes::BufMut; + use serial_test::serial; #[test] fn test_defaults() { @@ -1477,6 +1637,10 @@ mod test { auth_query_password: None, auth_query_user: None, db: "test".to_string(), + db_activity_based_routing: PoolSettings::default().db_activity_based_routing, + db_activity_init_delay: PoolSettings::default().db_activity_init_delay, + db_activity_ttl: PoolSettings::default().db_activity_ttl, + table_mutation_cache_ttl: PoolSettings::default().table_mutation_cache_ttl, plugins: None, }; let mut qr = QueryRouter::new(); @@ -1555,6 +1719,10 @@ mod test { auth_query_password: None, auth_query_user: None, db: "test".to_string(), + db_activity_based_routing: PoolSettings::default().db_activity_based_routing, + db_activity_init_delay: PoolSettings::default().db_activity_init_delay, + db_activity_ttl: PoolSettings::default().db_activity_ttl, + table_mutation_cache_ttl: PoolSettings::default().table_mutation_cache_ttl, plugins: None, }; @@ -1970,4 +2138,150 @@ mod test { assert_eq!(res, Ok(PluginOutput::Allow)); } + + #[tokio::test] + #[serial] + async fn test_db_activity_based_routing_initializing_state() { + QueryRouter::setup(); + let mut qr = QueryRouter::new(); + qr.pool_settings.db_activity_based_routing = true; + qr.pool_settings.query_parser_read_write_splitting = true; + qr.pool_settings.query_parser_enabled = true; + qr.pool_settings.db = "test_table_mutation_cache".to_string(); + + qr.database_activity_cache() + .invalidate(&qr.pool_settings.db.clone()); + + let query = simple_query("SELECT * FROM some_table"); + let ast = qr.parse(&query).unwrap(); + + // Initially, the database activity should be in the "Initializing" state + let state = qr.database_activity_state(&qr.pool_settings.db.clone()); + assert_eq!(state, DatabaseActivityState::Initializing); + + // Check that the router chooses the primary role due to "Initializing" state + assert!(qr.infer(&ast).is_ok()); + assert_eq!(qr.role(), Some(Role::Primary)); + } + + #[tokio::test] + #[serial] + async fn test_db_activity_based_routing_active_state() { + QueryRouter::setup(); + let mut qr = QueryRouter::new(); + qr.pool_settings.db_activity_based_routing = true; + qr.pool_settings.query_parser_read_write_splitting = true; + qr.pool_settings.query_parser_enabled = true; + qr.pool_settings.db = "test_table_mutation_cache".to_string(); + + let db_name = qr.pool_settings.db.clone(); + let cache = qr.database_activity_cache(); + cache.insert(db_name.clone(), DatabaseActivityState::Active); + + let query = simple_query("SELECT * FROM some_table"); + let ast = qr.parse(&query).unwrap(); + + // Check that the router can choose a replica role when in "Active" state + assert!(qr.infer(&ast).is_ok()); + assert_eq!(qr.role(), None); // Default should allow replica due to active state + } + + #[tokio::test] + #[serial] + async fn test_table_mutation_cache_on_write() { + QueryRouter::setup(); + let mut qr = QueryRouter::new(); + qr.pool_settings.db_activity_based_routing = true; + qr.pool_settings.table_mutation_cache_ttl = 20_000; // 20 seconds in milliseconds + qr.pool_settings.query_parser_enabled = true; + qr.pool_settings.query_parser_read_write_splitting = true; + qr.pool_settings.db = "test_table_mutation_cache".to_string(); + + qr.database_activity_cache() + .invalidate(&qr.pool_settings.db.clone()); + + let query = simple_query("UPDATE some_table SET col1 = 'value' WHERE col2 = 1"); + let ast = qr.parse(&query).unwrap(); + + // Simulate the mutation query which should populate the mutation cache + assert!(qr.infer(&ast).is_ok()); + assert_eq!(qr.role(), Some(Role::Primary)); + + let table_cache_key = qr.table_mutation_cache_key(Ident::new("some_table")); + let cache = qr.table_mutations_cache(); + + // Ensure the table mutation cache contains the table with recent write + assert!(cache.contains_key(&table_cache_key)); + } + + #[tokio::test] + #[serial] + async fn test_db_activity_based_routing_multi_query() { + use super::*; + use crate::messages::simple_query; + use tokio::time::Duration; + + QueryRouter::setup(); + let mut qr = QueryRouter::new(); + + // Configure the pool settings for db_activity_based_routing + qr.pool_settings.query_parser_read_write_splitting = true; + qr.pool_settings.query_parser_enabled = true; + qr.pool_settings.db_activity_based_routing = true; + qr.pool_settings.db = "test_db_activity_routing".to_string(); + + qr.database_activity_cache() + .invalidate(&qr.pool_settings.db.clone()); + + // First query when database is initializing + let query = simple_query("SELECT * FROM test_table"); + let ast = qr.parse(&query).unwrap(); + assert!(qr.infer(&ast).is_ok()); + // Should route to primary because database is initializing + assert_eq!(qr.role(), Some(Role::Primary)); + + // Wait for the initialization delay to pass + tokio::time::sleep(Duration::from_millis( + qr.pool_settings.db_activity_init_delay * 2, + )) + .await; + + // Next query after database is active + let query = simple_query("SELECT * FROM test_table"); + let ast = qr.parse(&query).unwrap(); + qr.active_role = None; // Reset the active_role + assert!(qr.infer(&ast).is_ok()); + // Should route to replica because database is active and no recent mutations + assert_eq!(qr.role(), None); + + // Simulate a write query to update the mutation cache + let query = simple_query("INSERT INTO test_table (id, name) VALUES (1, 'test')"); + let ast = qr.parse(&query).unwrap(); + qr.active_role = None; // Reset the active_role + assert!(qr.infer(&ast).is_ok()); + // Should route to primary because it's a write operation + assert_eq!(qr.role(), Some(Role::Primary)); + + // Immediately run a read query on the same table + let query = simple_query("SELECT * FROM test_table WHERE id = 1"); + let ast = qr.parse(&query).unwrap(); + qr.active_role = None; // Reset the active_role + assert!(qr.infer(&ast).is_ok()); + // Should route to primary because the table was recently mutated + assert_eq!(qr.role(), Some(Role::Primary)); + + // Wait for the mutation cache TTL to expire + tokio::time::sleep(Duration::from_millis( + qr.pool_settings.table_mutation_cache_ttl * 2, + )) + .await; + + // Run the read query again after cache expiration + let query = simple_query("SELECT * FROM test_table WHERE id = 1"); + let ast = qr.parse(&query).unwrap(); + qr.active_role = None; // Reset the active_role + assert!(qr.infer(&ast).is_ok()); + // Should route to replica because mutation cache has expired + assert_eq!(qr.role(), None); + } }