diff --git a/benchmarks/src/benchmark.rs b/benchmarks/src/benchmark.rs index 8497a10f45..3990491ba4 100644 --- a/benchmarks/src/benchmark.rs +++ b/benchmarks/src/benchmark.rs @@ -23,7 +23,6 @@ use serde::{Deserialize, Serialize}; use crate::cache_hit_benchmark::CacheHitBenchmark; use crate::eviction_benchmark::EvictionBenchmark; -use crate::fallback_benchmark::FallbackBenchmark; use crate::graph::ArgOverride; use crate::migration_benchmark::MigrationBenchmark; use crate::query_benchmark::QueryBenchmark; @@ -54,7 +53,6 @@ pub enum Benchmark { MigrationBenchmark, EvictionBenchmark, ReadWriteBenchmark, - FallbackBenchmark, SingleQueryBenchmark, WorkloadEmulator, } @@ -72,7 +70,6 @@ impl Benchmark { Self::MigrationBenchmark(_) => "migration_benchmark", Self::EvictionBenchmark(_) => "eviction", Self::ReadWriteBenchmark(_) => "read_write_benchmark", - Self::FallbackBenchmark(_) => "fallback_benchmark", Self::SingleQueryBenchmark(_) => "single_query_benchmark", Self::WorkloadEmulator(_) => "workload_emulator", } @@ -91,7 +88,6 @@ impl Benchmark { Benchmark::MigrationBenchmark(x) => x.update_from(itr), Benchmark::EvictionBenchmark(x) => x.update_from(itr), Benchmark::ReadWriteBenchmark(x) => x.update_from(itr), - Benchmark::FallbackBenchmark(x) => x.update_from(itr), Benchmark::SingleQueryBenchmark(x) => x.update_from(itr), Benchmark::WorkloadEmulator(x) => x.update_from(itr), }, @@ -126,9 +122,6 @@ pub struct DeploymentParameters { #[clap(long, env = "SETUP_CONN_STR", default_value = "")] pub setup_conn_str: String, - #[clap(long)] - pub enable_fallback_cache: bool, - #[clap(long)] pub database_type: DatabaseType, diff --git a/benchmarks/src/fallback_benchmark.rs b/benchmarks/src/fallback_benchmark.rs deleted file mode 100644 index 362d8ca0b0..0000000000 --- a/benchmarks/src/fallback_benchmark.rs +++ /dev/null @@ -1,254 +0,0 @@ -//! This benchmark compares ReadySet fallback behavior to executing queries -//! directly against the upstream database. This is accomplished via wrapping -//! queries in a transaction, as all transactions are sent to fallback directly. -use std::collections::HashMap; -use std::convert::TryFrom; -use std::str::FromStr; -use std::time::{Duration, Instant}; - -use anyhow::Result; -use async_trait::async_trait; -use clap::Parser; -use database_utils::{DatabaseURL, QueryableConnection}; -use metrics::Unit; -use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc::UnboundedSender; -use tracing::{debug, error}; - -use crate::benchmark::{BenchmarkControl, BenchmarkResults, DeploymentParameters, MetricGoal}; -use crate::utils::generate::DataGenerator; -use crate::utils::multi_thread::{self, MultithreadBenchmark}; -use crate::utils::prometheus::ForwardPrometheusMetrics; -use crate::utils::query::ArbitraryQueryParameters; -use crate::utils::us_to_ms; - -const REPORT_RESULTS_INTERVAL: Duration = Duration::from_secs(2); - -#[derive(Parser, Clone, Default, Serialize, Deserialize)] -pub struct FallbackBenchmark { - /// Parameters to handle generating parameters for arbitrary queries. - #[clap(flatten)] - query: ArbitraryQueryParameters, - - /// The target rate to issue queries at if attainable on this - /// machine with up to `threads`. - #[clap(long)] - target_qps: Option, - - /// The number of threads to execute the fallback benchmark across. - #[clap(long, default_value = "1")] - threads: u64, - - /// Install and generate from an arbitrary schema. - #[clap(flatten)] - data_generator: DataGenerator, - - /// The duration, specified as the number of seconds that the benchmark - /// should be running. If `None` is provided, the benchmark will run - /// until it is interrupted. - #[clap(long, value_parser = crate::utils::seconds_as_str_to_duration, default_value="30")] - run_for: Duration, - - /// Whether fallback cache should be enabled for this benchmark - #[clap(long)] - enable_fallback_cache: bool, -} - -#[derive(Clone)] -pub struct FallbackBenchmarkThreadParams { - query: ArbitraryQueryParameters, - target_qps: Option, - threads: u64, - upstream_conn_str: String, -} - -#[async_trait] -impl BenchmarkControl for FallbackBenchmark { - async fn setup(&self, deployment: &DeploymentParameters) -> Result<()> { - self.data_generator - .install(&deployment.setup_conn_str) - .await?; - self.data_generator - .generate(&deployment.setup_conn_str) - .await?; - - // Explicitly migrate the query before benchmarking. - let mut conn = DatabaseURL::from_str(&deployment.target_conn_str)? - .connect(None) - .await?; - // For now drop the result of migrate as CREATE CACHE does not support - // non-select queries. - let _ = self.query.migrate(&mut conn).await; - - Ok(()) - } - - async fn reset(&self, deployment: &DeploymentParameters) -> Result<()> { - // Explicitly migrate the query before benchmarking. - let mut conn = DatabaseURL::from_str(&deployment.target_conn_str)? - .connect(None) - .await?; - // For now drop the result of migrate as CREATE CACHE does not support - // non-select queries. - let _ = self.query.unmigrate(&mut conn).await; - Ok(()) - } - - async fn benchmark(&self, deployment: &DeploymentParameters) -> Result { - // Explicitly migrate the query before benchmarking. - let mut conn = DatabaseURL::from_str(&deployment.target_conn_str)? - .connect(None) - .await?; - // For now drop the result of migrate as CREATE CACHE does not support - // non-select queries. - let _ = self.query.migrate(&mut conn).await; - - debug!("Benchmarking against fallback through the adapter"); - let fallback_results = multi_thread::run_multithread_benchmark::( - self.threads, - FallbackBenchmarkThreadParams { - query: self.query.clone(), - target_qps: self.target_qps, - threads: self.threads, - upstream_conn_str: deployment.target_conn_str.clone(), - }, - Some(self.run_for), - ) - .await? - .prefix("fallback"); - // TODO(justin): Prometheus metrics of the results. - - // Run these with the upstream databases connection string directly. - debug!("Benchmarking against fallback through the adapter"); - let direct_results = multi_thread::run_multithread_benchmark::( - self.threads, - FallbackBenchmarkThreadParams { - query: self.query.clone(), - target_qps: self.target_qps, - threads: self.threads, - upstream_conn_str: deployment.setup_conn_str.clone(), - }, - Some(self.run_for), - ) - .await? - .prefix("upstream"); - - Ok(BenchmarkResults::merge(vec![ - fallback_results, - direct_results, - ])) - } - - fn labels(&self) -> HashMap { - let mut labels = HashMap::new(); - labels.extend(self.query.labels()); - labels.extend(self.data_generator.labels()); - labels - } - - fn forward_metrics(&self, _: &DeploymentParameters) -> Vec { - vec![] - } - - fn name(&self) -> &'static str { - "fallback_benchmark" - } - - fn data_generator(&mut self) -> Option<&mut DataGenerator> { - Some(&mut self.data_generator) - } -} - -#[derive(Debug, Clone)] -/// A batched set of results sent on an interval by the fallback benchmark thread. -pub(crate) struct FallbackBenchmarkResultBatch { - /// Query end-to-end latency in ms. - queries: Vec, -} - -impl FallbackBenchmarkResultBatch { - fn new() -> Self { - Self { - queries: Vec::new(), - } - } -} - -#[async_trait] -impl MultithreadBenchmark for FallbackBenchmark { - type BenchmarkResult = FallbackBenchmarkResultBatch; - type Parameters = FallbackBenchmarkThreadParams; - - async fn handle_benchmark_results( - results: Vec, - interval: Duration, - benchmark_results: &mut BenchmarkResults, - ) -> Result<()> { - let results_data = - benchmark_results.entry("duration", Unit::Microseconds, MetricGoal::Decreasing); - let mut hist = hdrhistogram::Histogram::::new(3).unwrap(); - for u in results { - for l in u.queries { - results_data.push(l as f64); - hist.record(u64::try_from(l).unwrap()).unwrap(); - } - } - let qps = hist.len() as f64 / interval.as_secs() as f64; - debug!( - "qps: {:.0}\tp50: {:.1} ms\tp90: {:.1} ms\tp99: {:.1} ms\tp99.99: {:.1} ms", - qps, - us_to_ms(hist.value_at_quantile(0.5)), - us_to_ms(hist.value_at_quantile(0.9)), - us_to_ms(hist.value_at_quantile(0.99)), - us_to_ms(hist.value_at_quantile(0.9999)) - ); - - Ok(()) - } - - async fn benchmark_thread( - params: Self::Parameters, - sender: UnboundedSender, - ) -> Result<()> { - // Prepare the query to retrieve the query schema. - let mut conn = DatabaseURL::from_str(¶ms.upstream_conn_str)? - .connect(None) - .await?; - - let mut prepared_statement = params.query.prepared_statement(&mut conn).await?; - - let mut throttle_interval = - multi_thread::throttle_interval(params.target_qps, params.threads); - let mut last_report = Instant::now(); - let mut result_batch = FallbackBenchmarkResultBatch::new(); - loop { - // Report results every REPORT_RESULTS_INTERVAL. - if last_report.elapsed() > REPORT_RESULTS_INTERVAL { - let mut new_results = FallbackBenchmarkResultBatch::new(); - std::mem::swap(&mut new_results, &mut result_batch); - sender.send(new_results)?; - last_report = Instant::now(); - } - - if let Some(interval) = &mut throttle_interval { - interval.tick().await; - } - - let (query, params) = prepared_statement.generate_query(); - let start = Instant::now(); - - // Wrap the query in a transaction to force fallback. - let mut transaction = conn.transaction().await?; - { - let res = transaction.execute(query, params).await; - if let Err(e) = res { - error!(err = %e, "Error on exec"); - return Err(e.into()); - } - } - transaction.commit().await?; - - result_batch.queries.push(start.elapsed().as_micros()); - } - } -} diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index c11350f8c7..a2c22966d7 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -98,7 +98,6 @@ pub mod utils; // Benchmarks mod cache_hit_benchmark; mod eviction_benchmark; -mod fallback_benchmark; mod migration_benchmark; mod query_benchmark; mod read_write_benchmark; diff --git a/benchmarks/src/utils/backend.rs b/benchmarks/src/utils/backend.rs index bdfe57e3b3..7ced098ab5 100644 --- a/benchmarks/src/utils/backend.rs +++ b/benchmarks/src/utils/backend.rs @@ -26,7 +26,7 @@ impl Backend { match DatabaseURL::from_str(url)? { DatabaseURL::MySQL(_) => { - let upstream = MySqlUpstream::connect(UpstreamConfig::from_url(url), None).await?; + let upstream = MySqlUpstream::connect(UpstreamConfig::from_url(url)).await?; Ok(Self::MySql( BackendBuilder::new() @@ -36,8 +36,7 @@ impl Backend { )) } DatabaseURL::PostgreSQL(_) => { - let upstream = - PostgreSqlUpstream::connect(UpstreamConfig::from_url(url), None).await?; + let upstream = PostgreSqlUpstream::connect(UpstreamConfig::from_url(url)).await?; Ok(Self::PostgreSql( BackendBuilder::new() diff --git a/readyset-adapter/Cargo.toml b/readyset-adapter/Cargo.toml index 1c426750a1..84ebe4ad5f 100644 --- a/readyset-adapter/Cargo.toml +++ b/readyset-adapter/Cargo.toml @@ -74,5 +74,4 @@ harness = false [features] ryw = [] -fallback_cache = ["readyset-client-metrics/fallback_cache"] -failure_injection = ["fail/failpoints"] +failure_injection = ["fail/failpoints"] \ No newline at end of file diff --git a/readyset-adapter/src/fallback_cache.rs b/readyset-adapter/src/fallback_cache.rs deleted file mode 100644 index 4ca27a2419..0000000000 --- a/readyset-adapter/src/fallback_cache.rs +++ /dev/null @@ -1,339 +0,0 @@ -//! The fallback cache provides a thread-safe backup cache for queries that we can't parse, or -//! otherwise support in readyset-server. -//! -//! For now this is just a POC, and isn't intended for use by customers. -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering::Relaxed; -use std::sync::Arc; -use std::time::{Duration, Instant}; - -use async_trait::async_trait; -use dashmap::DashMap; - -// TODO: Also model SSD speeds as that may be more likely used. -/// This is naively based on averages for spinning disk found on Google. Generally standard HDD -/// read/write rates seem to be listed as 80-160 MB/s, so this is the median of that range (120 -/// MB/s). Since this is for rough approximation benchmarks, this is probably fine. -const HDD_BYTES_PER_SEC: f64 = 125_829_120.0; - -/// A cache of all queries that we can't currently parse. -#[derive(Debug, Clone)] -pub struct SimpleFallbackCache { - /// A thread-safe hash map that holds a cache of unparsed and unsupported queries to their - /// respective QueryResult. - queries: DashMap>, - /// The configured ttl for all queries cached in the FallbackCache. - ttl: Duration, -} - -#[derive(Debug, Clone)] -pub struct QueryResult { - /// The query results that were last cached from the upstream database. - result: R, - /// The time this query was last cached in the FallbackCache. Used in tandem with the ttl to - /// determine when to refresh the queries result set. - last_cached: Instant, -} - -#[async_trait] -pub trait FallbackCacheApi { - /// Inserts a query along with it's upstream query result into the cache. - async fn insert(&mut self, q: String, result: R); - - /// Clear all cached queries. - async fn clear(&self); - - /// Retrieves the results for a query based on a given query string. - async fn get(&self, query: &str) -> Option; - - /// Revokes a query from the cache. - async fn revoke(&self, query: &str); -} - -#[derive(Debug, Clone)] -pub enum FallbackCache { - Simple(SimpleFallbackCache), - Disk(DiskModeledCache), - Eviction(EvictionModeledCache), -} - -#[async_trait] -impl FallbackCacheApi for FallbackCache -where - R: Clone + Sized + Send + Sync, -{ - async fn insert(&mut self, q: String, result: R) { - match self { - FallbackCache::Simple(s) => s.insert(q, result).await, - FallbackCache::Disk(d) => d.insert(q, result).await, - FallbackCache::Eviction(e) => e.insert(q, result).await, - } - } - - async fn clear(&self) { - match self { - FallbackCache::Simple(s) => s.clear().await, - FallbackCache::Disk(d) => d.clear().await, - FallbackCache::Eviction(e) => e.clear().await, - } - } - - async fn get(&self, query: &str) -> Option { - match self { - FallbackCache::Simple(s) => s.get(query).await, - FallbackCache::Disk(d) => d.get(query).await, - FallbackCache::Eviction(e) => e.get(query).await, - } - } - - async fn revoke(&self, query: &str) { - match self { - FallbackCache::Simple(s) => s.revoke(query).await, - FallbackCache::Disk(d) => d.revoke(query).await, - FallbackCache::Eviction(e) => e.revoke(query).await, - } - } -} - -impl From> for FallbackCache { - fn from(simple: SimpleFallbackCache) -> Self { - FallbackCache::Simple(simple) - } -} - -impl From> for FallbackCache { - fn from(disk: DiskModeledCache) -> Self { - FallbackCache::Disk(disk) - } -} - -impl From> for FallbackCache { - fn from(eviction: EvictionModeledCache) -> Self { - FallbackCache::Eviction(eviction) - } -} - -impl SimpleFallbackCache -where - R: Clone + Sized + Send + Sync, -{ - /// Constructs a new FallbackCache. - pub fn new(ttl: Duration) -> SimpleFallbackCache { - SimpleFallbackCache { - queries: DashMap::new(), - ttl, - } - } - - /// Converts the inner dashmap into an std::collections::HashMap. - fn current_size(&self) -> usize { - self.queries - .iter() - .map(|r| { - std::mem::size_of_val::<[u8]>(r.key().clone().as_bytes()) - + std::mem::size_of_val::>(r.value()) - }) - .sum::() - } -} - -#[async_trait] -impl FallbackCacheApi for SimpleFallbackCache -where - R: Clone + Sized + Send + Sync, -{ - /// Inserts a query along with it's upstream query result into the cache. - async fn insert(&mut self, q: String, result: R) { - self.queries.insert( - q, - QueryResult { - result, - last_cached: Instant::now(), - }, - ); - } - - /// Clear all cached queries. - async fn clear(&self) { - self.queries.clear() - } - - /// Retrieves the results for a query based on a given query string. - async fn get(&self, query: &str) -> Option { - self.queries - .get(query) - .filter(|r| r.last_cached.elapsed() < self.ttl) - .map(|r| r.result.clone()) - } - - async fn revoke(&self, query: &str) { - self.queries.remove(query); - } -} - -#[derive(Clone, Debug)] -pub struct DiskModeledCache { - cache: SimpleFallbackCache, - /// Current size of the fallback cache in bytes as of the last write to it. - current_size: usize, -} - -impl DiskModeledCache -where - R: Clone + Sized + Send + Sync, -{ - /// Constructs a new DiskModeledCacheWrapper. - pub fn new(ttl: Duration) -> DiskModeledCache { - DiskModeledCache { - cache: SimpleFallbackCache::new(ttl), - current_size: 0, - } - } - - /// Simulates an hdd by adding an async delay calculated based on the current size of the in - /// memory cache, and average read/write rates for spinning disk. - /// - /// Takes in the current already elapsed time, which is used in combination with spinning disk - /// rates to achieve the correct offset delay. - async fn simulate_hdd_delay(&self, elapsed: Duration) { - let delay_time = Duration::from_secs_f64(self.current_size as f64 / HDD_BYTES_PER_SEC); - if elapsed >= delay_time { - return; - } - tokio::time::sleep(delay_time - elapsed).await; - } -} - -#[async_trait] -impl FallbackCacheApi for DiskModeledCache -where - R: Clone + Sized + Send + Sync, -{ - /// Inserts a query along with it's upstream query result into the cache. - /// On each write, we update the cache current size to a field on the DiskModeledCacheWrapper - /// struct. - /// - /// Simulates writing to disk each time. - async fn insert(&mut self, q: String, result: R) { - let start = Instant::now(); - self.cache.insert(q, result).await; - self.current_size = self.cache.current_size(); - self.simulate_hdd_delay(start.elapsed()).await; - } - - /// Clear all cached queries. - async fn clear(&self) { - let start = Instant::now(); - self.cache.clear().await; - self.simulate_hdd_delay(start.elapsed()).await; - } - - /// Retrieves the results for a query based on a given query string. - /// - /// Simulates reading off disk each time. - async fn get(&self, query: &str) -> Option { - let start = Instant::now(); - let res = self.cache.get(query).await; - self.simulate_hdd_delay(start.elapsed()).await; - res - } - - async fn revoke(&self, query: &str) { - let start = Instant::now(); - self.cache.revoke(query).await; - self.simulate_hdd_delay(start.elapsed()).await; - } -} - -#[derive(Clone, Debug)] -pub struct EvictionModeledCache { - cache: SimpleFallbackCache, - /// The rate that we randomly evict cached queries. - eviction_rate: f64, - /// A counter for the number of times we've looked up a query. Used in conjunction with the - /// eviction rate. - lookup_counter: Arc, -} - -impl EvictionModeledCache -where - R: Clone + Sized + Send + Sync, -{ - /// Constructs a new EvictionModeledCache. - pub fn new(ttl: Duration, eviction_rate: f64) -> EvictionModeledCache { - EvictionModeledCache { - cache: SimpleFallbackCache::new(ttl), - eviction_rate, - lookup_counter: Arc::new(AtomicU64::new(0)), - } - } - - async fn maybe_evict(&self, query: &str) { - let rate_size = (1.0 / self.eviction_rate).round() as u64; - if self.lookup_counter.load(Relaxed) % rate_size == 0 { - self.revoke(query).await; - } - } -} - -#[async_trait] -impl FallbackCacheApi for EvictionModeledCache -where - R: Clone + Sized + Send + Sync, -{ - /// Inserts a query along with it's upstream query result into the cache. - /// On each write, we update the cache current size to a field on the DiskModeledCacheWrapper - /// struct. - async fn insert(&mut self, q: String, result: R) { - self.cache.insert(q, result).await; - } - - /// Clear all cached queries. - async fn clear(&self) { - self.cache.clear().await; - } - - /// Retrieves the results for a query based on a given query string. - async fn get(&self, query: &str) -> Option { - self.maybe_evict(query).await; - let res = self.cache.get(query).await; - self.lookup_counter.fetch_add(1, Relaxed); - res - } - - /// Revokes a query from the underlying cache. - async fn revoke(&self, query: &str) { - self.cache.revoke(query).await; - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn get_returns_none_past_ttl() { - // Set a TTL of 0 seconds so we pass it immediately. - let mut fallback_cache = SimpleFallbackCache::new(Duration::new(0, 0)); - let query = "SELECT * FROM t1".to_string(); - fallback_cache.insert(query.clone(), (0, 1)).await; - assert!(fallback_cache.get(&query).await.is_none()) - } - - #[tokio::test] - async fn get_returns_some_before_ttl() { - let mut fallback_cache = SimpleFallbackCache::new(Duration::new(10_000, 0)); - let query = "SELECT * FROM t1".to_string(); - fallback_cache.insert(query.clone(), (0, 1)).await; - assert!(fallback_cache.get(&query).await.is_some()) - } - - #[tokio::test] - async fn multiple_insert_updates_results() { - let mut fallback_cache = SimpleFallbackCache::new(Duration::new(10_000, 0)); - let query = "SELECT * FROM t1".to_string(); - fallback_cache.insert(query.clone(), (0, 1)).await; - fallback_cache.insert(query.clone(), (1, 2)).await; - assert_eq!(fallback_cache.get(&query).await, Some((1, 2))) - } -} diff --git a/readyset-adapter/src/lib.rs b/readyset-adapter/src/lib.rs index dd6ace6de9..57143ce1b8 100644 --- a/readyset-adapter/src/lib.rs +++ b/readyset-adapter/src/lib.rs @@ -9,7 +9,6 @@ #![deny(unreachable_pub)] pub mod backend; -pub mod fallback_cache; pub mod http_router; pub mod migration_handler; pub mod proxied_queries_reporter; diff --git a/readyset-adapter/src/upstream_database.rs b/readyset-adapter/src/upstream_database.rs index 6e1d489a17..277291609a 100644 --- a/readyset-adapter/src/upstream_database.rs +++ b/readyset-adapter/src/upstream_database.rs @@ -8,8 +8,6 @@ use readyset_client_metrics::QueryDestination; use readyset_data::DfValue; use readyset_errors::ReadySetError; -use crate::fallback_cache::FallbackCache; - /// Information about a statement that has been prepared in an [`UpstreamDatabase`] pub struct UpstreamPrepare { pub statement_id: u32, @@ -64,9 +62,6 @@ pub trait UpstreamDatabase: Sized + Send { where Self: 'a; - /// A read result that has been cached in the underlying FallbackCache. - type CachedReadResult: Debug + Send + Sync + Clone; - /// A type representing metadata about a prepared statement. /// /// This type is used as a field of [`UpstreamPrepare`], returned from @@ -92,10 +87,7 @@ pub trait UpstreamDatabase: Sized + Send { /// Create a new connection to this upstream database /// /// Connect will return an error if the upstream database is running an unsupported version. - async fn connect( - upstream_config: UpstreamConfig, - fallback_cache: Option>, - ) -> Result; + async fn connect(upstream_config: UpstreamConfig) -> Result; /// Resets the connection with the upstream database async fn reset(&mut self) -> Result<(), Self::Error>; diff --git a/readyset-client-metrics/Cargo.toml b/readyset-client-metrics/Cargo.toml index 72c8b09e87..85345d6a6b 100644 --- a/readyset-client-metrics/Cargo.toml +++ b/readyset-client-metrics/Cargo.toml @@ -18,5 +18,3 @@ nom-sql = { path = "../nom-sql" } [lib] path = "src/lib.rs" -[features] -fallback_cache = [] diff --git a/readyset-client-metrics/src/lib.rs b/readyset-client-metrics/src/lib.rs index bf4228ed60..44868deae0 100644 --- a/readyset-client-metrics/src/lib.rs +++ b/readyset-client-metrics/src/lib.rs @@ -53,8 +53,6 @@ pub enum QueryDestination { ReadysetThenUpstream, Upstream, Both, - #[cfg(feature = "fallback_cache")] - FallbackCache, } impl TryFrom<&str> for QueryDestination { @@ -65,8 +63,6 @@ impl TryFrom<&str> for QueryDestination { "readyset_then_upstream" => Ok(QueryDestination::ReadysetThenUpstream), "upstream" => Ok(QueryDestination::Upstream), "both" => Ok(QueryDestination::Both), - #[cfg(feature = "fallback_cache")] - "fallback_cache" => Ok(QueryDestination::FallbackCache), _ => Err(ReadySetError::Internal( "Invalid query destination".to_string(), )), @@ -81,8 +77,6 @@ impl fmt::Display for QueryDestination { QueryDestination::ReadysetThenUpstream => "readyset_then_upstream", QueryDestination::Upstream => "upstream", QueryDestination::Both => "both", - #[cfg(feature = "fallback_cache")] - QueryDestination::FallbackCache => "fallback_cache", }; write!(f, "{}", s) } diff --git a/readyset-client-test-helpers/src/lib.rs b/readyset-client-test-helpers/src/lib.rs index 1f42741a3d..43eeb248a7 100644 --- a/readyset-client-test-helpers/src/lib.rs +++ b/readyset-client-test-helpers/src/lib.rs @@ -47,7 +47,7 @@ pub trait Adapter: Send { fn upstream_url(_db_name: &str) -> String; async fn make_upstream(addr: String) -> Self::Upstream { - Self::Upstream::connect(UpstreamConfig::from_url(addr), None) + Self::Upstream::connect(UpstreamConfig::from_url(addr)) .await .unwrap() } diff --git a/readyset-logictest/src/runner.rs b/readyset-logictest/src/runner.rs index aaf4876891..869ca4ba32 100644 --- a/readyset-logictest/src/runner.rs +++ b/readyset-logictest/src/runner.rs @@ -576,12 +576,9 @@ impl TestScript { #[allow(clippy::manual_map)] let upstream = match &replication_url { Some(url) => Some( - <$upstream as UpstreamDatabase>::connect( - UpstreamConfig::from_url(url), - None, - ) - .await - .unwrap(), + <$upstream as UpstreamDatabase>::connect(UpstreamConfig::from_url(url)) + .await + .unwrap(), ), None => None, }; diff --git a/readyset-mysql/Cargo.toml b/readyset-mysql/Cargo.toml index 8b2d6873cf..c4c97e9de7 100644 --- a/readyset-mysql/Cargo.toml +++ b/readyset-mysql/Cargo.toml @@ -57,4 +57,3 @@ vertical_tests = [] # Redact the display of strings marked sensitive from logs and error messages redact_sensitive = ["readyset-util/redact_sensitive"] failure_injection = ["fail/failpoints", "readyset-client/failure_injection", "readyset-server/failure_injection"] -fallback_cache = ["readyset-adapter/fallback_cache", "readyset-client-metrics/fallback_cache"] diff --git a/readyset-mysql/src/upstream.rs b/readyset-mysql/src/upstream.rs index c7880678d1..1a3272c521 100644 --- a/readyset-mysql/src/upstream.rs +++ b/readyset-mysql/src/upstream.rs @@ -1,15 +1,9 @@ -#[cfg(feature = "fallback_cache")] -use std::collections::hash_map::DefaultHasher; use std::collections::HashMap; use std::convert::TryInto; -#[cfg(feature = "fallback_cache")] -use std::hash::{Hash, Hasher}; use std::sync::Arc; use async_trait::async_trait; use futures_util::Stream; -#[cfg(feature = "fallback_cache")] -use futures_util::StreamExt; use mysql_async::consts::{CapabilityFlags, StatusFlags}; use mysql_async::prelude::Queryable; use mysql_async::{ @@ -17,9 +11,6 @@ use mysql_async::{ }; use nom_sql::{SqlIdentifier, StartTransactionStatement}; use pin_project::pin_project; -use readyset_adapter::fallback_cache::FallbackCache; -#[cfg(feature = "fallback_cache")] -use readyset_adapter::fallback_cache::FallbackCacheApi; use readyset_adapter::upstream_database::UpstreamDestination; use readyset_adapter::{UpstreamConfig, UpstreamDatabase, UpstreamPrepare}; use readyset_client_metrics::QueryDestination; @@ -80,16 +71,6 @@ pub enum QueryResult<'a> { } impl<'a> UpstreamDestination for QueryResult<'a> { - #[cfg(feature = "fallback_cache")] - fn destination(&self) -> QueryDestination { - if matches!(self, QueryResult::CachedReadResult(..)) { - QueryDestination::FallbackCache - } else { - QueryDestination::Upstream - } - } - - #[cfg(not(feature = "fallback_cache"))] fn destination(&self) -> QueryDestination { QueryDestination::Upstream } @@ -108,49 +89,11 @@ impl<'a> From for QueryResult<'a> { } } -#[cfg(feature = "fallback_cache")] -impl<'a> QueryResult<'a> { - /// Can convert a stream ReadResult into a CachedReadResult. - async fn async_try_into(self) -> Result { - match self { - QueryResult::ReadResult { - mut stream, - columns, - } => { - let mut rows = vec![]; - while let Some(row) = stream.next().await { - match row { - Ok(row) => rows.push(row), - Err(e) => { - // TODO: Update to be more sophisticated than this hack. - return Err(Error::ReadySet(internal_err!( - "Found error from MySQL rather than row {}", - e - ))); - } - } - } - let status_flags = stream.status_flags(); - Ok(CachedReadResult { - data: rows, - columns: columns.clone(), - status_flags, - }) - } - _ => Err(Error::ReadySet(internal_err!( - "Temp error: Can't convert because not a read result" - ))), - } - } -} - /// A connector to an underlying mysql store. This is really just a wrapper for the mysql crate. pub struct MySqlUpstream { conn: Conn, prepared_statements: HashMap, upstream_config: UpstreamConfig, - #[cfg(feature = "fallback_cache")] - fallback_cache: Option>, } #[derive(Debug, Clone)] @@ -289,32 +232,12 @@ impl MySqlUpstream { #[async_trait] impl UpstreamDatabase for MySqlUpstream { type QueryResult<'a> = QueryResult<'a>; - type CachedReadResult = CachedReadResult; type StatementMeta = StatementMeta; type PrepareData<'a> = (); type Error = Error; const DEFAULT_DB_VERSION: &'static str = "8.0.26-readyset\0"; - #[cfg(feature = "fallback_cache")] - async fn connect( - upstream_config: UpstreamConfig, - fallback_cache: Option>, - ) -> Result { - let (conn, prepared_statements, upstream_config) = - Self::connect_inner(upstream_config).await?; - Ok(Self { - conn, - prepared_statements, - upstream_config, - fallback_cache, - }) - } - - #[cfg(not(feature = "fallback_cache"))] - async fn connect( - upstream_config: UpstreamConfig, - _: Option>, - ) -> Result { + async fn connect(upstream_config: UpstreamConfig) -> Result { let (conn, prepared_statements, upstream_config) = Self::connect_inner(upstream_config).await?; Ok(Self { @@ -345,32 +268,6 @@ impl UpstreamDatabase for MySqlUpstream { format!("{major}.{minor}.{patch}-readyset\0") } - #[cfg(feature = "fallback_cache")] - async fn reset(&mut self) -> Result<(), Error> { - let opts = self.conn.opts().clone(); - let conn = Conn::new(opts).await?; - let prepared_statements = HashMap::new(); - let upstream_config = self.upstream_config.clone(); - let fallback_cache = if let Some(ref cache) = self.fallback_cache { - cache.clear().await; - Some(cache.clone()) - } else { - None - }; - let old_self = std::mem::replace( - self, - Self { - conn, - prepared_statements, - upstream_config, - fallback_cache, - }, - ); - let _ = old_self.conn.disconnect().await as Result<(), _>; - Ok(()) - } - - #[cfg(not(feature = "fallback_cache"))] async fn reset(&mut self) -> Result<(), Error> { let opts = self.conn.opts().clone(); let conn = Conn::new(opts).await?; @@ -410,54 +307,6 @@ impl UpstreamDatabase for MySqlUpstream { }) } - #[cfg(feature = "fallback_cache")] - async fn execute<'a>( - &'a mut self, - id: u32, - params: &[DfValue], - ) -> Result, Error> { - if let Some(ref mut cache) = self.fallback_cache { - let mut s = DefaultHasher::new(); - (id, params).hash(&mut s); - let hash = format!("{:x}", s.finish()); - if let Some(query_r) = cache.get(&hash).await { - return Ok(query_r.into()); - } - let params = dt_to_value_params(params)?; - let result = self - .conn - .exec_iter( - self.prepared_statements.get(&id).ok_or(Error::ReadySet( - ReadySetError::PreparedStatementMissing { statement_id: id }, - ))?, - params, - ) - .await?; - let r = handle_query_result!(result); - match r { - Ok(query_result @ QueryResult::ReadResult { .. }) => { - let cached_result: CachedReadResult = query_result.async_try_into().await?; - cache.insert(hash, cached_result.clone()).await; - Ok(cached_result.into()) - } - _ => r, - } - } else { - let params = dt_to_value_params(params)?; - let result = self - .conn - .exec_iter( - self.prepared_statements.get(&id).ok_or(Error::ReadySet( - ReadySetError::PreparedStatementMissing { statement_id: id }, - ))?, - params, - ) - .await?; - handle_query_result!(result) - } - } - - #[cfg(not(feature = "fallback_cache"))] async fn execute<'a>( &'a mut self, id: u32, @@ -476,30 +325,6 @@ impl UpstreamDatabase for MySqlUpstream { handle_query_result!(result) } - #[cfg(feature = "fallback_cache")] - async fn query<'a>(&'a mut self, query: &'a str) -> Result, Error> { - if let Some(ref mut cache) = self.fallback_cache { - if let Some(query_r) = cache.get(query.as_ref()).await { - return Ok(query_r.into()); - } - let query_str = query.as_ref().to_owned(); - let result = self.conn.query_iter(query).await?; - let r = handle_query_result!(result); - match r { - Ok(query_result @ QueryResult::ReadResult { .. }) => { - let cached_result: CachedReadResult = query_result.async_try_into().await?; - cache.insert(query_str, cached_result.clone()).await; - Ok(cached_result.into()) - } - _ => r, - } - } else { - let result = self.conn.query_iter(query).await?; - handle_query_result!(result) - } - } - - #[cfg(not(feature = "fallback_cache"))] async fn query<'a>(&'a mut self, query: &'a str) -> Result, Error> { let result = self.conn.query_iter(query).await?; handle_query_result!(result) diff --git a/readyset-psql/Cargo.toml b/readyset-psql/Cargo.toml index 22a48271d3..4ccdb8db51 100644 --- a/readyset-psql/Cargo.toml +++ b/readyset-psql/Cargo.toml @@ -59,4 +59,3 @@ harness = false # Redact the display of strings marked sensitive from logs and error messages redact_sensitive = ["readyset-util/redact_sensitive"] failure_injection = ["fail/failpoints", "readyset-client/failure_injection", "readyset-server/failure_injection"] -fallback_cache = ["readyset-adapter/fallback_cache"] diff --git a/readyset-psql/src/upstream.rs b/readyset-psql/src/upstream.rs index d5298a72d8..2448882aad 100644 --- a/readyset-psql/src/upstream.rs +++ b/readyset-psql/src/upstream.rs @@ -14,7 +14,6 @@ use pgsql::types::Type; use pgsql::{GenericResult, ResultStream, Row, SimpleQueryMessage}; use postgres_types::Kind; use psql_srv::Column; -use readyset_adapter::fallback_cache::FallbackCache; use readyset_adapter::upstream_database::UpstreamDestination; use readyset_adapter::{UpstreamConfig, UpstreamDatabase, UpstreamPrepare}; use readyset_data::DfValue; @@ -156,16 +155,11 @@ fn convert_params_for_upstream(params: &[DfValue], types: &[Type]) -> ReadySetRe impl UpstreamDatabase for PostgreSqlUpstream { type StatementMeta = StatementMeta; type QueryResult<'a> = QueryResult; - // TODO: Actually fill this in. - type CachedReadResult = (); type PrepareData<'a> = &'a [Type]; type Error = Error; const DEFAULT_DB_VERSION: &'static str = "13.4 (ReadySet)"; - async fn connect( - upstream_config: UpstreamConfig, - _: Option>, - ) -> Result { + async fn connect(upstream_config: UpstreamConfig) -> Result { let url = upstream_config .upstream_db_url .as_ref() @@ -231,10 +225,7 @@ impl UpstreamDatabase for PostgreSqlUpstream { } async fn reset(&mut self) -> Result<(), Error> { - let old_self = std::mem::replace( - self, - Self::connect(self.upstream_config.clone(), None).await?, - ); + let old_self = std::mem::replace(self, Self::connect(self.upstream_config.clone()).await?); drop(old_self); Ok(()) } diff --git a/readyset/Cargo.toml b/readyset/Cargo.toml index c982842f4d..b44f070343 100644 --- a/readyset/Cargo.toml +++ b/readyset/Cargo.toml @@ -49,4 +49,3 @@ replicators = { path = "../replicators" } [features] failure_injection = ["fail/failpoints", "readyset-client/failure_injection", "readyset-server/failure_injection"] -fallback_cache = ["readyset-adapter/fallback_cache", "readyset-client-metrics/fallback_cache"] diff --git a/readyset/src/lib.rs b/readyset/src/lib.rs index 6d2e252f27..3029d52d3f 100644 --- a/readyset/src/lib.rs +++ b/readyset/src/lib.rs @@ -29,9 +29,6 @@ use metrics_exporter_prometheus::PrometheusBuilder; use nom_sql::Relation; use readyset_adapter::backend::noria_connector::{NoriaConnector, ReadBehavior}; use readyset_adapter::backend::MigrationMode; -use readyset_adapter::fallback_cache::{ - DiskModeledCache, EvictionModeledCache, FallbackCache, SimpleFallbackCache, -}; use readyset_adapter::http_router::NoriaAdapterHttpRouter; use readyset_adapter::migration_handler::MigrationHandler; use readyset_adapter::proxied_queries_reporter::ProxiedQueriesReporter; @@ -380,11 +377,6 @@ pub struct Options { #[clap(long, hide = true)] wait_for_failpoint: bool, - // TODO: This feature in general needs to be fleshed out significantly more. Off by default for - // now. - #[clap(flatten)] - fallback_cache_options: FallbackCacheOptions, - /// Whether to allow ReadySet to automatically create inlined caches when we receive a CREATE /// CACHE command for a query with unsupported placeholders. /// @@ -438,58 +430,6 @@ impl Options { } } -// Command-line options for running the experimental fallback_cache. -// -// This option struct is intended to be embedded inside of a larger option struct using -// `#[clap(flatten)]`. -#[allow(missing_docs)] // Allows us to exclude docs (from doc comments) from --help text -#[derive(Parser, Debug)] -pub struct FallbackCacheOptions { - /// Used to enable the fallback cache, which can handle serving all queries that we can't parse - /// or support from an in-memory cache that lives in the adapter. - #[clap(long, hide = true)] - enable_fallback_cache: bool, - - /// Specifies a ttl in seconds for queries cached using the fallback cache. - #[clap(long, hide = true, default_value = "120")] - ttl_seconds: u64, - - /// If enabled, will model running the fallback cache off spinning disk. - #[clap(long, hide = true)] - model_disk: bool, - - #[clap(flatten)] - eviction_options: FallbackCacheEvictionOptions, -} - -// TODO: -// Change this to an enum that allows for a probabilistic strategy -// -// enum FallbackCacheEvictionStrategy { -// /// Don't model eviction -// None, -// /// This Cl's strategy -// Rate(f64), -// /// probabilistic strategy -// Random(f64) -// } -// -// Command-line options for running the experimental fallback_cache with eviction modeling. -// -// This option struct is intended to be embedded inside of a larger option struct using -// `#[clap(flatten)]`. -#[allow(missing_docs)] // Allows us to exclude docs (from doc comments) from --help text -#[derive(Parser, Debug)] -pub struct FallbackCacheEvictionOptions { - /// If enabled, will model running the fallback cache with eviction. - #[clap(long, hide = true)] - model_eviction: bool, - - /// Provides a rate at which we will randomly evict queries. - #[clap(long, hide = true, default_value = "0.01")] - eviction_rate: f64, -} - impl NoriaAdapter where H: ConnectionHandler + Clone + Send + Sync + 'static, @@ -828,41 +768,6 @@ where rt.block_on(fut); } - let fallback_cache: Option< - FallbackCache< - <::UpstreamDatabase as UpstreamDatabase>::CachedReadResult, - >, - > = if cfg!(feature = "fallback_cache") - && options.fallback_cache_options.enable_fallback_cache - { - let cache = if options.fallback_cache_options.model_disk { - DiskModeledCache::new(Duration::new(options.fallback_cache_options.ttl_seconds, 0)) - .into() - } else if options - .fallback_cache_options - .eviction_options - .model_eviction - { - EvictionModeledCache::new( - Duration::new(options.fallback_cache_options.ttl_seconds, 0), - options - .fallback_cache_options - .eviction_options - .eviction_rate, - ) - .into() - } else { - SimpleFallbackCache::new(Duration::new( - options.fallback_cache_options.ttl_seconds, - 0, - )) - .into() - }; - Some(cache) - } else { - None - }; - if let MigrationMode::OutOfBand = migration_mode { set_failpoint!("adapter-out-of-band"); let rh = rh.clone(); @@ -874,7 +779,6 @@ where let upstream_config = options.server_worker_options.replicator_config.clone(); let expr_dialect = self.expr_dialect; let parse_dialect = self.parse_dialect; - let fallback_cache = fallback_cache.clone(); rs_connect.in_scope(|| info!("Spawning migration handler task")); let fut = async move { @@ -886,26 +790,23 @@ where Default::default() } else { loop { - let mut upstream = match H::UpstreamDatabase::connect( - upstream_config.clone(), - fallback_cache.clone(), - ) - .instrument( - connection - .in_scope(|| span!(Level::INFO, "Connecting to upstream database")), - ) - .await - { - Ok(upstream) => upstream, - Err(error) => { - error!( - %error, - "Error connecting to upstream database, retrying after 1s" - ); - tokio::time::sleep(UPSTREAM_CONNECTION_RETRY_INTERVAL).await; - continue; - } - }; + let mut upstream = + match H::UpstreamDatabase::connect(upstream_config.clone()) + .instrument(connection.in_scope(|| { + span!(Level::INFO, "Connecting to upstream database") + })) + .await + { + Ok(upstream) => upstream, + Err(error) => { + error!( + %error, + "Error connecting to upstream database, retrying after 1s" + ); + tokio::time::sleep(UPSTREAM_CONNECTION_RETRY_INTERVAL).await; + continue; + } + }; match upstream.schema_search_path().await { Ok(ssp) => break ssp, @@ -1072,14 +973,13 @@ where let query_status_cache = query_status_cache; let upstream_config = upstream_config.clone(); - let fallback_cache = fallback_cache.clone(); let fut = async move { let upstream_res = if upstream_config.upstream_db_url.is_some() && !no_upstream_connections { set_failpoint!(failpoints::UPSTREAM); timeout( UPSTREAM_CONNECTION_TIMEOUT, - H::UpstreamDatabase::connect(upstream_config, fallback_cache), + H::UpstreamDatabase::connect(upstream_config), ) .instrument(debug_span!("Connecting to upstream database")) .await diff --git a/system-benchmarks/Cargo.toml b/system-benchmarks/Cargo.toml index eb3e60ea79..d1452e3af6 100644 --- a/system-benchmarks/Cargo.toml +++ b/system-benchmarks/Cargo.toml @@ -51,7 +51,3 @@ bench = false [[bench]] name = "workload" harness = false - -[features] -fallback_cache = ["readyset/fallback_cache", "readyset-mysql/fallback_cache"] -disk_modeled = ["fallback_cache", "readyset/fallback_cache", "readyset-mysql/fallback_cache"] diff --git a/system-benchmarks/benches/workload.rs b/system-benchmarks/benches/workload.rs index aacdd11402..6eaf5f228b 100644 --- a/system-benchmarks/benches/workload.rs +++ b/system-benchmarks/benches/workload.rs @@ -570,62 +570,13 @@ async fn prepare_db>(path: P, args: &SystemBenchArgs) -> anyhow Ok(()) } -/// Start the ReadySet adapter in standalone mode without fallback cache enabled. -#[cfg(not(feature = "fallback_cache"))] -fn start_adapter(upstream_url: &str) -> anyhow::Result<()> { - start_adapter_with_options( - FallbackCacheOptions { - enable: false, - disk_modeled: false, - }, - upstream_url, - ) -} - -/// Start the ReadySet adapter in standalone mode with fallback cache enabled and disk -/// modeling disabled. -#[cfg(all(feature = "fallback_cache", not(feature = "disk_modeled")))] -fn start_adapter(upstream_url: &str) -> anyhow::Result<()> { - start_adapter_with_options( - FallbackCacheOptions { - enable: true, - disk_modeled: false, - }, - upstream_url, - ) -} - -/// Start the ReadySet adapter in standalone mode with fallback cache enabled and disk -/// modeling enabled. -#[cfg(all(feature = "fallback_cache", feature = "disk_modeled"))] -fn start_adapter(upstream_url: &str) -> anyhow::Result<()> { - start_adapter_with_options( - FallbackCacheOptions { - enable: true, - disk_modeled: true, - }, - upstream_url, - ) -} - -/// Various options for enabling and configuring the fallback cache. -struct FallbackCacheOptions { - /// Whether the fallback cache is enabled or not. - enable: bool, - /// Whether the fallback cache should model running off spinning disk. - disk_modeled: bool, -} - /// Start the ReadySet adapter in standalone mode with options. -fn start_adapter_with_options( - fallback_cache_options: FallbackCacheOptions, - upstream_url: &str, -) -> anyhow::Result<()> { +fn start_adapter(upstream_url: &str) -> anyhow::Result<()> { let database_type = DatabaseURL::from_str(upstream_url)?.database_type(); let database_type_flag = format!("--database-type={}", database_type); let temp_dir = temp_dir::TempDir::new().unwrap(); let log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "error".into()); - let mut options = vec![ + let options = vec![ "bench", // This is equivalent to the program name in argv, ignored "--deployment", DB_NAME, @@ -648,14 +599,6 @@ fn start_adapter_with_options( &database_type_flag, ]; - if fallback_cache_options.enable { - options.push("--enable-fallback-cache"); - } - - if fallback_cache_options.disk_modeled { - options.push("--model-disk"); - } - let adapter_options = Options::parse_from(options); match database_type {