Skip to content

Commit

Permalink
Revert "adapter+server: Autoparameterize range queries"
Browse files Browse the repository at this point in the history
This reverts commit 3b0412a0e9d2fffbc15d7e8ea9f81a5983a86372.

Change-Id: I02e4a5af23a2f18dd2ff0c4eb1e2c7dabb1843be
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6997
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <[email protected]>
  • Loading branch information
ethowitz committed Feb 27, 2024
1 parent 301f78f commit 9c97688
Show file tree
Hide file tree
Showing 19 changed files with 143 additions and 800 deletions.
4 changes: 2 additions & 2 deletions benchmarks/src/bin/write_propagation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl Writer {
let auto_increments: Arc<RwLock<HashMap<Relation, AtomicUsize>>> = Arc::default();
let view_name_cache = SharedCache::new();
let view_cache = SharedCache::new();
let adapter_rewrite_params = ch.adapter_rewrite_params().await?;
let server_supports_pagination = ch.supports_pagination().await?;
let (dialect, nom_sql_dialect) = match DatabaseURL::from_str(&self.database_url)? {
DatabaseURL::MySQL(_) => (Dialect::DEFAULT_MYSQL, nom_sql::Dialect::MySQL),
DatabaseURL::PostgreSQL(_) => {
Expand All @@ -123,7 +123,7 @@ impl Writer {
dialect,
nom_sql_dialect,
vec![],
adapter_rewrite_params,
server_supports_pagination,
)
.await;

Expand Down
81 changes: 25 additions & 56 deletions logictests/ranges.test
Original file line number Diff line number Diff line change
Expand Up @@ -9,64 +9,33 @@ insert into t1 (x, y) values
(3, 3),
(4, 5);

# query I valuesort
# select x from t1 where y < ?;
# ? = 5
# ----
# 1
# 2
# 3
#
# query I valuesort
# select x from t1 where x >= ? and y >= ?;
# ? = 1
# ? = 2
# ----
# 1
# 1
# 3
# 4
#
# query I valuesort
# select x from t1 where x > ? and y > ?
# ? = 2
# ? = 4
# ----
# 4
#
# query I valuesort
# select x from t1 where x > ? and y > ?
# ? = 4
# ? = 4
# ----
#
# query I valuesort
# select x from t1 where x >= 2 and x <= 4
# ----
# 2
# 3
# 4
#
# query I valuesort
# select x from t1 where x >= 2 and x < 4
# ----
# 2
# 3
#
# query I valuesort
# select x from t1 where x > 2 and x <= 4
# ----
# 3
# 4
#
# query I valuesort
# select x from t1 where x > 2 and x < 4
# ----
# 3
query I valuesort
select x from t1 where y < ?;
? = 5
----
1
2
3

query I valuesort
select x from t1 where x > ? and y < ?
select x from t1 where x >= ? and y >= ?;
? = 1
? = 2
? = 4
----
1
1
3
4

query I valuesort
select x from t1 where x > ? and y > ?
? = 2
? = 4
----
4

query I valuesort
select x from t1 where x > ? and y > ?
? = 4
? = 4
----
11 changes: 7 additions & 4 deletions readyset-adapter/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@ where
stmt: &nom_sql::SelectStatement,
) -> ReadySetResult<(nom_sql::SelectStatement, bool)> {
let mut rewritten = stmt.clone();
adapter_rewrites::process_query(&mut rewritten, self.noria.rewrite_params())?;
adapter_rewrites::process_query(&mut rewritten, self.noria.server_supports_pagination())?;
// Attempt ReadySet unless the query is unsupported or dropped
let should_do_readyset = !matches!(
self.state
Expand Down Expand Up @@ -1814,7 +1814,7 @@ where
}
}
// Now migrate the new query
adapter_rewrites::process_query(&mut stmt, self.noria.rewrite_params())?;
adapter_rewrites::process_query(&mut stmt, self.noria.server_supports_pagination())?;
let migration_state = match self
.noria
.handle_create_cached_query(
Expand Down Expand Up @@ -2200,7 +2200,7 @@ where
let mut stmt = *stmt.clone();
adapter_rewrites::process_query(
&mut stmt,
self.noria.rewrite_params(),
self.noria.server_supports_pagination(),
)?;

ViewCreateRequest::new(
Expand Down Expand Up @@ -2579,7 +2579,10 @@ where
Option<QueryStatus>,
ReadySetResult<ProcessedQueryParams>,
) {
match adapter_rewrites::process_query(&mut q.statement, self.noria.rewrite_params()) {
match adapter_rewrites::process_query(
&mut q.statement,
self.noria.server_supports_pagination(),
) {
Ok(processed_query_params) => {
let s = self.state.query_status_cache.query_status(q);
let should_try = if self.state.proxy_state.should_proxy() {
Expand Down
29 changes: 14 additions & 15 deletions readyset-adapter/src/backend/noria_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use readyset_errors::{
ReadySetResult,
};
use readyset_server::worker::readers::{CallResult, ReadRequestHandler};
use readyset_sql_passes::adapter_rewrites::{self, AdapterRewriteParams, ProcessedQueryParams};
use readyset_sql_passes::adapter_rewrites::{self, ProcessedQueryParams};
use readyset_util::redacted::Sensitive;
use readyset_util::shared_cache::{self, LocalCache};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -62,7 +62,9 @@ pub struct NoriaBackendInner {
noria: ReadySetHandle,
tables: BTreeMap<Relation, Table>,
views: LocalCache<Relation, View>,
rewrite_params: AdapterRewriteParams,
/// The server can handle (non-parameterized) LIMITs and (parameterized) OFFSETs in the
/// dataflow graph
server_supports_pagination: bool,
}

macro_rules! noria_await {
Expand All @@ -78,13 +80,13 @@ impl NoriaBackendInner {
async fn new(
ch: ReadySetHandle,
views: LocalCache<Relation, View>,
rewrite_params: AdapterRewriteParams,
server_supports_pagination: bool,
) -> Self {
NoriaBackendInner {
tables: BTreeMap::new(),
views,
noria: ch,
rewrite_params,
server_supports_pagination,
}
}

Expand Down Expand Up @@ -363,7 +365,7 @@ impl NoriaConnector {
dialect: Dialect,
parse_dialect: nom_sql::Dialect,
schema_search_path: Vec<SqlIdentifier>,
rewrite_params: AdapterRewriteParams,
server_supports_pagination: bool,
) -> Self {
NoriaConnector::new_with_local_reads(
ch,
Expand All @@ -375,7 +377,7 @@ impl NoriaConnector {
dialect,
parse_dialect,
schema_search_path,
rewrite_params,
server_supports_pagination,
)
.await
}
Expand All @@ -391,9 +393,9 @@ impl NoriaConnector {
dialect: Dialect,
parse_dialect: nom_sql::Dialect,
schema_search_path: Vec<SqlIdentifier>,
rewrite_params: AdapterRewriteParams,
server_supports_pagination: bool,
) -> Self {
let backend = NoriaBackendInner::new(ch, view_cache, rewrite_params).await;
let backend = NoriaBackendInner::new(ch, view_cache, server_supports_pagination).await;

NoriaConnector {
inner: NoriaBackend {
Expand Down Expand Up @@ -552,15 +554,12 @@ impl NoriaConnector {
.collect())
}

pub(crate) fn rewrite_params(&self) -> AdapterRewriteParams {
pub(crate) fn server_supports_pagination(&self) -> bool {
self.inner
.inner
.as_ref()
.map(|v| v.rewrite_params)
.unwrap_or_else(|| AdapterRewriteParams {
server_supports_pagination: false,
server_supports_mixed_comparisons: false,
})
.map(|v| v.server_supports_pagination)
.unwrap_or(false)
}

// TODO(andrew): Allow client to map table names to NodeIndexes without having to query ReadySet
Expand Down Expand Up @@ -1416,7 +1415,7 @@ impl NoriaConnector {

trace!("select::collapse where-in clauses");
let processed_query_params =
adapter_rewrites::process_query(&mut statement, self.rewrite_params())?;
adapter_rewrites::process_query(&mut statement, self.server_supports_pagination())?;

// check if we already have this query prepared
trace!("select::access view");
Expand Down
14 changes: 3 additions & 11 deletions readyset-client-test-helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ pub struct TestBuilder {
storage_dir_path: Option<PathBuf>,
authority: Option<Arc<Authority>>,
replication_server_id: Option<ReplicationServerId>,
allow_mixed_comparisons: bool,
}

impl Default for TestBuilder {
Expand All @@ -114,7 +113,6 @@ impl TestBuilder {
storage_dir_path: None,
authority: None,
replication_server_id: None,
allow_mixed_comparisons: true,
}
}

Expand Down Expand Up @@ -187,11 +185,6 @@ impl TestBuilder {
self
}

pub fn allow_mixed_comparisons(mut self, allow_mixed_comparisons: bool) -> Self {
self.allow_mixed_comparisons = allow_mixed_comparisons;
self
}

pub async fn build<A>(self) -> (A::ConnectionOpts, Handle, ShutdownSender)
where
A: Adapter + 'static,
Expand Down Expand Up @@ -239,8 +232,7 @@ impl TestBuilder {
builder.set_persistence(persistence);
builder.set_allow_topk(true);
builder.set_allow_paginate(true);
builder.set_allow_mixed_comparisons(self.allow_mixed_comparisons);

builder.set_allow_mixed_comparisons(true);
if !self.partial {
builder.disable_partial();
}
Expand Down Expand Up @@ -315,7 +307,7 @@ impl TestBuilder {
};

let mut rh = ReadySetHandle::new(authority.clone()).await;
let adapter_rewrite_params = rh.adapter_rewrite_params().await.unwrap();
let server_supports_pagination = rh.supports_pagination().await.unwrap();
let noria = NoriaConnector::new(
rh.clone(),
auto_increments,
Expand All @@ -325,7 +317,7 @@ impl TestBuilder {
A::EXPR_DIALECT,
A::DIALECT,
schema_search_path,
adapter_rewrite_params,
server_supports_pagination,
)
.await;

Expand Down
5 changes: 2 additions & 3 deletions readyset-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use petgraph::graph::NodeIndex;
use readyset_errors::{
internal, internal_err, rpc_err, rpc_err_no_downcast, ReadySetError, ReadySetResult,
};
use readyset_sql_passes::adapter_rewrites::AdapterRewriteParams;
use replication_offset::ReplicationOffsets;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -926,8 +925,8 @@ impl ReadySetHandle {
);

simple_request!(
/// Returns the params to use when performing adapter rewrites.
adapter_rewrite_params() -> AdapterRewriteParams
/// Returns true if topk and pagination support are enabled on the server
supports_pagination() -> bool
);

simple_request!(
Expand Down
6 changes: 3 additions & 3 deletions readyset-client/src/recipe/changelist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use nom_sql::{
};
use readyset_data::DfType;
use readyset_errors::{internal, unsupported, ReadySetError, ReadySetResult};
use readyset_sql_passes::adapter_rewrites::{self, AdapterRewriteParams};
use readyset_sql_passes::adapter_rewrites;
use serde::{Deserialize, Serialize};
use test_strategy::Arbitrary;
use tracing::error;
Expand Down Expand Up @@ -483,7 +483,7 @@ impl Change {
/// rewrites on the parsed query string before passing it to the server via `/extend_recipe`.
pub fn from_cache_ddl_request(
ddl_req: &CacheDDLRequest,
adapter_rewrite_params: AdapterRewriteParams,
server_supports_pagination: bool,
) -> ReadySetResult<Self> {
macro_rules! mk_error {
($str:expr) => {
Expand Down Expand Up @@ -531,7 +531,7 @@ impl Change {

adapter_rewrites::process_query(
&mut statement,
adapter_rewrite_params
server_supports_pagination,
)?;

Change::CreateCache(CreateCache {
Expand Down
4 changes: 2 additions & 2 deletions readyset-logictest/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ impl TestScript {

let mut rh = ReadySetHandle::new(authority.clone()).await;

let adapter_rewrite_params = rh.adapter_rewrite_params().await.unwrap();
let server_supports_pagination = rh.supports_pagination().await.unwrap();
let adapter_start_time = SystemTime::now();

let task = tokio::spawn(async move {
Expand All @@ -572,7 +572,7 @@ impl TestScript {
DatabaseType::PostgreSQL => nom_sql::Dialect::PostgreSQL,
},
Default::default(),
adapter_rewrite_params,
server_supports_pagination,
)
.await;
let query_status_cache: &'static _ = Box::leak(Box::new(QueryStatusCache::new()));
Expand Down
7 changes: 1 addition & 6 deletions readyset-psql/tests/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2598,12 +2598,7 @@ async fn drop_and_recreate_demo_cache() {
// There was an error returned at one point in doing this, so this test is an attempt to block
// a breaking change to the demo script at CI time.
readyset_tracing::init_test_logging();
let (opts, _handle, shutdown_tx) = TestBuilder::default()
.fallback(true)
.allow_mixed_comparisons(false)
.build::<PostgreSQLAdapter>()
.await;

let (opts, _handle, shutdown_tx) = setup().await;
let conn = connect(opts).await;

let queries = [
Expand Down
Loading

0 comments on commit 9c97688

Please sign in to comment.