diff --git a/CHANGELOG.md b/CHANGELOG.md index c285b00..a73b23f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - query/bind: support `Option` in `query.bind(arg)` ([#119], [#120]). - client: `Client::with_header()` to provide custom headers ([#98], [#108]). +- query: added `Query::with_option()` similar to `Client::with_option()` ([#123]). +- insert: added `Insert::with_option()` similar to `Client::with_option()` ([#123]). +- inserter: added `Inserter::with_option()` similar to `Client::with_option()` ([#123]). +### Changed +- insert: the outgoing request is now created after the first `Insert::write` call instead of `Insert::new` ([#123]). + +[#123]: https://github.com/ClickHouse/clickhouse-rs/pull/123 [#120]: https://github.com/ClickHouse/clickhouse-rs/pull/120 [#119]: https://github.com/ClickHouse/clickhouse-rs/issues/119 [#108]: https://github.com/ClickHouse/clickhouse-rs/pull/108 diff --git a/Cargo.toml b/Cargo.toml index c868e71..099c870 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,7 @@ uuid = { version = "1", optional = true } time = { version = "0.3", optional = true } bstr = { version = "1.2", default-features = false } quanta = { version = "0.12", optional = true } +replace_with = { version = "0.1.7" } [dev-dependencies] criterion = "0.5.0" diff --git a/examples/clickhouse_settings.rs b/examples/clickhouse_settings.rs new file mode 100644 index 0000000..dd53d0a --- /dev/null +++ b/examples/clickhouse_settings.rs @@ -0,0 +1,23 @@ +use clickhouse::{error::Result, Client}; + +/// Besides [`Client::query`], it works similarly with [`Client::insert`] and [`Client::inserter`]. +#[tokio::main] +async fn main() -> Result<()> { + let client = Client::default() + .with_url("http://localhost:8123") + // This setting is global and will be applied to all queries. + .with_option("limit", "100"); + + let numbers = client + .query("SELECT number FROM system.numbers") + // This setting will be applied to this particular query only; + // it will override the global client setting. + .with_option("limit", "3") + .fetch_all::() + .await?; + + // note that it prints the first 3 numbers only (because of the setting override) + println!("{numbers:?}"); + + Ok(()) +} diff --git a/src/insert.rs b/src/insert.rs index 0ea351b..4775cef 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -2,6 +2,7 @@ use std::{future::Future, marker::PhantomData, mem, panic, pin::Pin, time::Durat use bytes::{Bytes, BytesMut}; use hyper::{self, Request}; +use replace_with::replace_with_or_abort; use serde::Serialize; use tokio::{ task::JoinHandle, @@ -28,8 +29,8 @@ const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; // slightly less to avoid extr /// Rows are being sent progressively to spread network load. #[must_use] pub struct Insert { + state: InsertState, buffer: BytesMut, - sender: Option, #[cfg(feature = "lz4")] compression: Compression, send_timeout: Option, @@ -37,10 +38,62 @@ pub struct Insert { // Use boxed `Sleep` to reuse a timer entry, it improves performance. // Also, `tokio::time::timeout()` significantly increases a future's size. sleep: Pin>, - handle: JoinHandle>, _marker: PhantomData T>, // TODO: test contravariance. } +enum InsertState { + NotStarted { + client: Box, + sql: String, + }, + Active { + sender: ChunkSender, + handle: JoinHandle>, + }, + Terminated { + handle: JoinHandle>, + }, + Completed, +} + +impl InsertState { + fn sender(&mut self) -> Option<&mut ChunkSender> { + match self { + InsertState::Active { sender, .. } => Some(sender), + _ => None, + } + } + fn handle(&mut self) -> Option<&mut JoinHandle>> { + match self { + InsertState::Active { handle, .. } | InsertState::Terminated { handle } => Some(handle), + _ => None, + } + } + fn client_with_sql(&self) -> Option<(&Client, &str)> { + match self { + InsertState::NotStarted { client, sql } => Some((client, sql)), + _ => None, + } + } + fn terminated(&mut self) { + debug_assert!(matches!(self, InsertState::Active { .. })); + replace_with_or_abort(self, |_self| match _self { + InsertState::Active { handle, .. } => InsertState::Terminated { handle }, + _ => unreachable!(), + }); + } + fn with_option(&mut self, name: impl Into, value: impl Into) { + assert!(matches!(self, InsertState::NotStarted { .. })); + replace_with_or_abort(self, |_self| match _self { + InsertState::NotStarted { mut client, sql } => { + client.add_option(name, value); + InsertState::NotStarted { client, sql } + } + _ => unreachable!(), + }); + } +} + // It should be a regular function, but it decreases performance. macro_rules! timeout { ($self:expr, $timeout:ident, $fut:expr) => {{ @@ -56,70 +109,29 @@ macro_rules! timeout { } impl Insert { + // TODO: remove Result pub(crate) fn new(client: &Client, table: &str) -> Result where T: Row, { - let mut url = Url::parse(&client.url).map_err(|err| Error::InvalidParams(err.into()))?; - let mut pairs = url.query_pairs_mut(); - pairs.clear(); - - if let Some(database) = &client.database { - pairs.append_pair("database", database); - } - let fields = row::join_column_names::() .expect("the row type must be a struct or a wrapper around it"); // TODO: what about escaping a table name? - // https://clickhouse.yandex/docs/en/query_language/syntax/#syntax-identifiers - let query = format!("INSERT INTO {table}({fields}) FORMAT RowBinary"); - pairs.append_pair("query", &query); - - if client.compression.is_lz4() { - pairs.append_pair("decompress", "1"); - } - - for (name, value) in &client.options { - pairs.append_pair(name, value); - } - - drop(pairs); - - let mut builder = Request::post(url.as_str()); - - for (name, value) in &client.headers { - builder = builder.header(name, value); - } - - if let Some(user) = &client.user { - builder = builder.header("X-ClickHouse-User", user); - } - - if let Some(password) = &client.password { - builder = builder.header("X-ClickHouse-Key", password); - } - - let (sender, body) = RequestBody::chunked(); - - let request = builder - .body(body) - .map_err(|err| Error::InvalidParams(Box::new(err)))?; - - let future = client.http.request(request); - // TODO: introduce `Executor` to allow bookkeeping of spawned tasks. - let handle = - tokio::spawn(async move { Response::new(future, Compression::None).finish().await }); + // https://clickhouse.com/docs/en/sql-reference/syntax#identifiers + let sql = format!("INSERT INTO {}({}) FORMAT RowBinary", table, fields); Ok(Self { + state: InsertState::NotStarted { + client: Box::new(client.clone()), + sql, + }, buffer: BytesMut::with_capacity(BUFFER_SIZE), - sender: Some(sender), #[cfg(feature = "lz4")] compression: client.compression, send_timeout: None, end_timeout: None, sleep: Box::pin(tokio::time::sleep(Duration::new(0, 0))), - handle, _marker: PhantomData, }) } @@ -128,13 +140,13 @@ impl Insert { /// /// `send_timeout` restricts time on sending a data chunk to a socket. /// `None` disables the timeout, it's a default. - /// It's roughly equivalent to `tokio::time::timeout(insert.write(..))`. + /// It's roughly equivalent to `tokio::time::timeout(insert.write(...))`. /// /// `end_timeout` restricts time on waiting for a response from the CH /// server. Thus, it includes all work needed to handle `INSERT` by the /// CH server, e.g. handling all materialized views and so on. /// `None` disables the timeout, it's a default. - /// It's roughly equivalent to `tokio::time::timeout(insert.end(..))`. + /// It's roughly equivalent to `tokio::time::timeout(insert.end(...))`. /// /// These timeouts are much more performant (~x10) than wrapping `write()` /// and `end()` calls into `tokio::time::timeout()`. @@ -147,6 +159,15 @@ impl Insert { self } + /// Similar to [`Client::with_option`], but for this particular INSERT statement only. + /// # Panics + /// If called after the insert request is started, e.g., after [`Insert::write`]. + #[track_caller] + pub fn with_option(mut self, name: impl Into, value: impl Into) -> Self { + self.state.with_option(name, value); + self + } + pub(crate) fn set_timeouts( &mut self, send_timeout: Option, @@ -172,7 +193,7 @@ impl Insert { /// used anymore. /// /// # Panics - /// If called after previous call returned an error. + /// If called after the previous call that returned an error. pub fn write<'a>(&'a mut self, row: &T) -> impl Future> + 'a + Send where T: Serialize, @@ -193,7 +214,11 @@ impl Insert { where T: Serialize, { - assert!(self.sender.is_some(), "write() after error"); + match self.state { + InsertState::NotStarted { .. } => self.init_request(), + InsertState::Active { .. } => Ok(()), + _ => panic!("write() after error"), + }?; let old_buf_size = self.buffer.len(); let result = rowbinary::serialize_into(&mut self.buffer, row); @@ -216,22 +241,19 @@ impl Insert { if !self.buffer.is_empty() { self.send_chunk().await?; } - - self.sender = None; // terminate the sender successfully + self.state.terminated(); self.wait_handle().await } async fn send_chunk(&mut self) -> Result<()> { - if self.sender.is_none() { - return Ok(()); - } + debug_assert!(matches!(self.state, InsertState::Active { .. })); // Hyper uses non-trivial and inefficient schema of buffering chunks. // It's difficult to determine when allocations occur. // So, instead we control it manually here and rely on the system allocator. let chunk = self.take_and_prepare_chunk()?; - let sender = self.sender.as_mut().unwrap(); // checked above + let sender = self.state.sender().unwrap(); // checked above let is_timed_out = match timeout!(self, send_timeout, sender.send(chunk)) { Some(true) => return Ok(()), @@ -255,15 +277,22 @@ impl Insert { } async fn wait_handle(&mut self) -> Result<()> { - match timeout!(self, end_timeout, &mut self.handle) { - Some(Ok(res)) => res, - Some(Err(err)) if err.is_panic() => panic::resume_unwind(err.into_panic()), - Some(Err(err)) => Err(Error::Custom(format!("unexpected error: {err}"))), - None => { - // We can do nothing useful here, so just shut down the background task. - self.handle.abort(); - Err(Error::TimedOut) + match self.state.handle() { + Some(handle) => { + let result = match timeout!(self, end_timeout, &mut *handle) { + Some(Ok(res)) => res, + Some(Err(err)) if err.is_panic() => panic::resume_unwind(err.into_panic()), + Some(Err(err)) => Err(Error::Custom(format!("unexpected error: {err}"))), + None => { + // We can do nothing useful here, so just shut down the background task. + handle.abort(); + Err(Error::TimedOut) + } + }; + self.state = InsertState::Completed; + result } + _ => Ok(()), } } @@ -283,8 +312,64 @@ impl Insert { Ok(mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)).freeze()) } + #[cold] + #[track_caller] + #[inline(never)] + fn init_request(&mut self) -> Result<()> { + debug_assert!(matches!(self.state, InsertState::NotStarted { .. })); + let (client, sql) = self.state.client_with_sql().unwrap(); // checked above + + let mut url = Url::parse(&client.url).map_err(|err| Error::InvalidParams(err.into()))?; + let mut pairs = url.query_pairs_mut(); + pairs.clear(); + + if let Some(database) = &client.database { + pairs.append_pair("database", database); + } + + pairs.append_pair("query", sql); + + if client.compression.is_lz4() { + pairs.append_pair("decompress", "1"); + } + + for (name, value) in &client.options { + pairs.append_pair(name, value); + } + + drop(pairs); + + let mut builder = Request::post(url.as_str()); + + for (name, value) in &client.headers { + builder = builder.header(name, value); + } + + if let Some(user) = &client.user { + builder = builder.header("X-ClickHouse-User", user); + } + + if let Some(password) = &client.password { + builder = builder.header("X-ClickHouse-Key", password); + } + + let (sender, body) = RequestBody::chunked(); + + let request = builder + .body(body) + .map_err(|err| Error::InvalidParams(Box::new(err)))?; + + let future = client.http.request(request); + // TODO: introduce `Executor` to allow bookkeeping of spawned tasks. + let handle = + tokio::spawn(async move { Response::new(future, Compression::None).finish().await }); + + self.state = InsertState::Active { handle, sender }; + Ok(()) + } + fn abort(&mut self) { - if let Some(sender) = self.sender.take() { + if let Some(sender) = self.state.sender() { sender.abort(); } } diff --git a/src/inserter.rs b/src/inserter.rs index 8c631c5..3e42901 100644 --- a/src/inserter.rs +++ b/src/inserter.rs @@ -141,6 +141,12 @@ where self } + /// Similar to [`Client::with_option`], but for the INSERT statements generated by this [`Inserter`] only. + pub fn with_option(mut self, name: impl Into, value: impl Into) -> Self { + self.client.add_option(name, value); + self + } + /// See [`Inserter::with_timeouts()`]. pub fn set_timeouts(&mut self, send_timeout: Option, end_timeout: Option) { self.send_timeout = send_timeout; @@ -186,10 +192,10 @@ where /// Serializes the provided row into an internal buffer. /// - /// To check limits and sent to ClickHouse, call [`Inserter::commit()`]. + /// To check the limits and send the data to ClickHouse, call [`Inserter::commit()`]. /// /// # Panics - /// If called after previous call returned an error. + /// If called after the previous call that returned an error. #[inline] pub fn write(&mut self, row: &T) -> Result<()> where diff --git a/src/lib.rs b/src/lib.rs index 55692e0..518d728 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -221,4 +221,9 @@ impl Client { pub fn watch(&self, query: &str) -> watch::Watch { watch::Watch::new(self, query) } + + /// Used internally to modify the options map of an _already cloned_ [`Client`] instance. + pub(crate) fn add_option(&mut self, name: impl Into, value: impl Into) { + self.options.insert(name.into(), value.into()); + } } diff --git a/src/query.rs b/src/query.rs index 28253f9..551e7a0 100644 --- a/src/query.rs +++ b/src/query.rs @@ -185,6 +185,12 @@ impl Query { let future = self.client.http.request(request); Ok(Response::new(future, self.client.compression)) } + + /// Similar to [`Client::with_option`], but for this particular query only. + pub fn with_option(mut self, name: impl Into, value: impl Into) -> Self { + self.client.add_option(name, value); + self + } } /// A cursor that emits rows. diff --git a/src/rowbinary/de.rs b/src/rowbinary/de.rs index 0644fc3..aee8b49 100644 --- a/src/rowbinary/de.rs +++ b/src/rowbinary/de.rs @@ -19,7 +19,7 @@ pub(crate) fn deserialize_from<'de, T: Deserialize<'de>>( /// A deserializer for the RowBinary format. /// -/// See https://clickhouse.yandex/docs/en/interfaces/formats/#rowbinary for details. +/// See https://clickhouse.com/docs/en/interfaces/formats#rowbinary for details. struct RowBinaryDeserializer<'de, B> { input: B, temp_buf: &'de mut [u8], diff --git a/src/rowbinary/ser.rs b/src/rowbinary/ser.rs index 03646eb..bd1b7ff 100644 --- a/src/rowbinary/ser.rs +++ b/src/rowbinary/ser.rs @@ -15,7 +15,7 @@ pub(crate) fn serialize_into(buffer: impl BufMut, value: &impl Serialize) -> Res /// A serializer for the RowBinary format. /// -/// See https://clickhouse.yandex/docs/en/interfaces/formats/#rowbinary for details. +/// See https://clickhouse.com/docs/en/interfaces/formats#rowbinary for details. struct RowBinarySerializer { buffer: B, } diff --git a/tests/it/insert.rs b/tests/it/insert.rs new file mode 100644 index 0000000..12de6cb --- /dev/null +++ b/tests/it/insert.rs @@ -0,0 +1,101 @@ +use crate::{create_simple_table, fetch_simple_rows, flush_query_log, SimpleRow}; + +#[tokio::test] +async fn keeps_client_options() { + let table_name = "insert_keeps_client_options"; + let query_id = uuid::Uuid::new_v4().to_string(); + let (client_setting_name, client_setting_value) = ("max_block_size", "1000"); + let (insert_setting_name, insert_setting_value) = ("async_insert", "1"); + + let client = prepare_database!().with_option(client_setting_name, client_setting_value); + create_simple_table(&client, table_name).await; + + let row = SimpleRow::new(42, "foo"); + + let mut insert = client + .insert(table_name) + .unwrap() + .with_option(insert_setting_name, insert_setting_value) + .with_option("query_id", &query_id); + + insert.write(&row).await.unwrap(); + insert.end().await.unwrap(); + + flush_query_log(&client).await; + + let (has_insert_setting, has_client_setting) = client + .query(&format!( + " + SELECT + Settings['{insert_setting_name}'] = '{insert_setting_value}', + Settings['{client_setting_name}'] = '{client_setting_value}' + FROM system.query_log + WHERE query_id = ? + AND type = 'QueryFinish' + AND query_kind = 'Insert' + " + )) + .bind(&query_id) + .fetch_one::<(bool, bool)>() + .await + .unwrap(); + + assert!( + has_insert_setting, "{}", + format!("should contain {insert_setting_name} = {insert_setting_value} (from the insert options)") + ); + assert!( + has_client_setting, "{}", + format!("should contain {client_setting_name} = {client_setting_value} (from the client options)") + ); + + let rows = fetch_simple_rows(&client, table_name).await; + assert_eq!(rows, vec!(row)) +} + +#[tokio::test] +async fn overrides_client_options() { + let table_name = "insert_overrides_client_options"; + let query_id = uuid::Uuid::new_v4().to_string(); + let (setting_name, setting_value, override_value) = ("async_insert", "0", "1"); + + let client = prepare_database!().with_option(setting_name, setting_value); + create_simple_table(&client, table_name).await; + + let row = SimpleRow::new(42, "foo"); + + let mut insert = client + .insert(table_name) + .unwrap() + .with_option(setting_name, override_value) + .with_option("query_id", &query_id); + + insert.write(&row).await.unwrap(); + insert.end().await.unwrap(); + + flush_query_log(&client).await; + + let has_setting_override = client + .query(&format!( + " + SELECT Settings['{setting_name}'] = '{override_value}' + FROM system.query_log + WHERE query_id = ? + AND type = 'QueryFinish' + AND query_kind = 'Insert' + " + )) + .bind(&query_id) + .fetch_one::() + .await + .unwrap(); + + assert!( + has_setting_override, + "{}", + format!("should contain {setting_name} = {override_value} (from the insert options)") + ); + + let rows = fetch_simple_rows(&client, table_name).await; + assert_eq!(rows, vec!(row)) +} diff --git a/tests/it/inserter.rs b/tests/it/inserter.rs index 0211a65..a7d5ba3 100644 --- a/tests/it/inserter.rs +++ b/tests/it/inserter.rs @@ -6,6 +6,8 @@ use serde::Serialize; use clickhouse::{inserter::Quantities, Client, Row}; +use crate::{create_simple_table, fetch_simple_rows, flush_query_log, SimpleRow}; + #[derive(Debug, Row, Serialize)] struct MyRow { data: String, @@ -183,3 +185,105 @@ async fn limited_by_time() { assert_eq!(count, rows); assert_eq!(sum, (1..=rows).sum::()); } + +/// Similar to [`crate::insert::keeps_client_options`] with minor differences. +#[tokio::test] +async fn keeps_client_options() { + let table_name = "inserter_keeps_client_options"; + let query_id = uuid::Uuid::new_v4().to_string(); + let (client_setting_name, client_setting_value) = ("max_block_size", "1000"); + let (insert_setting_name, insert_setting_value) = ("async_insert", "1"); + + let client = prepare_database!().with_option(client_setting_name, client_setting_value); + create_simple_table(&client, table_name).await; + + let row = SimpleRow::new(42, "foo"); + + let mut inserter = client + .inserter(table_name) + .unwrap() + .with_option("async_insert", "1") + .with_option("query_id", &query_id); + + inserter.write(&row).unwrap(); + inserter.end().await.unwrap(); + + flush_query_log(&client).await; + + let (has_insert_setting, has_client_setting) = client + .query(&format!( + " + SELECT + Settings['{insert_setting_name}'] = '{insert_setting_value}', + Settings['{client_setting_name}'] = '{client_setting_value}' + FROM system.query_log + WHERE query_id = ? + AND type = 'QueryFinish' + AND query_kind = 'Insert' + " + )) + .bind(&query_id) + .fetch_one::<(bool, bool)>() + .await + .unwrap(); + + assert!( + has_insert_setting, "{}", + format!("should contain {insert_setting_name} = {insert_setting_value} (from the insert options)") + ); + assert!( + has_client_setting, "{}", + format!("should contain {client_setting_name} = {client_setting_value} (from the client options)") + ); + + let rows = fetch_simple_rows(&client, table_name).await; + assert_eq!(rows, vec!(row)) +} + +/// Similar to [`crate::insert::overrides_client_options`] with minor differences. +#[tokio::test] +async fn overrides_client_options() { + let table_name = "inserter_overrides_client_options"; + let query_id = uuid::Uuid::new_v4().to_string(); + let (setting_name, setting_value, override_value) = ("async_insert", "0", "1"); + + let client = prepare_database!().with_option(setting_name, setting_value); + create_simple_table(&client, table_name).await; + + let row = SimpleRow::new(42, "foo"); + + let mut inserter = client + .inserter(table_name) + .unwrap() + .with_option("async_insert", override_value) + .with_option("query_id", &query_id); + + inserter.write(&row).unwrap(); + inserter.end().await.unwrap(); + + flush_query_log(&client).await; + + let has_setting_override = client + .query(&format!( + " + SELECT Settings['{setting_name}'] = '{override_value}' + FROM system.query_log + WHERE query_id = ? + AND type = 'QueryFinish' + AND query_kind = 'Insert' + " + )) + .bind(&query_id) + .fetch_one::() + .await + .unwrap(); + + assert!( + has_setting_override, + "{}", + format!("should contain {setting_name} = {override_value} (from the inserter options)") + ); + + let rows = fetch_simple_rows(&client, table_name).await; + assert_eq!(rows, vec!(row)) +} diff --git a/tests/it/main.rs b/tests/it/main.rs index 5e5b952..9a8c11d 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -1,4 +1,7 @@ +use clickhouse::sql::Identifier; use clickhouse::{sql, Client}; +use clickhouse_derive::Row; +use serde::{Deserialize, Serialize}; macro_rules! prepare_database { () => { @@ -13,8 +16,46 @@ macro_rules! prepare_database { }; } +#[derive(Debug, Row, Serialize, Deserialize, PartialEq)] +struct SimpleRow { + id: u64, + data: String, +} + +impl SimpleRow { + fn new(id: u64, data: impl ToString) -> Self { + Self { + id, + data: data.to_string(), + } + } +} + +async fn create_simple_table(client: &Client, table_name: &str) { + client + .query("CREATE TABLE ?(id UInt64, data String) ENGINE = MergeTree ORDER BY id") + .bind(Identifier(table_name)) + .execute() + .await + .unwrap(); +} + +async fn fetch_simple_rows(client: &Client, table_name: &str) -> Vec { + client + .query("SELECT ?fields FROM ?") + .bind(Identifier(table_name)) + .fetch_all::() + .await + .unwrap() +} + +async fn flush_query_log(client: &Client) { + client.query("SYSTEM FLUSH LOGS").execute().await.unwrap(); +} + mod compression; mod cursor_error; +mod insert; mod inserter; mod ip; mod nested; diff --git a/tests/it/query.rs b/tests/it/query.rs index 1ed81d7..aacab47 100644 --- a/tests/it/query.rs +++ b/tests/it/query.rs @@ -176,3 +176,41 @@ async fn all_floats() { assert_eq!(vec, &[42.5, 43.5]); } + +#[tokio::test] +async fn keeps_client_options() { + let (client_setting_name, client_setting_value) = ("max_block_size", "1000"); + let (query_setting_name, query_setting_value) = ("date_time_input_format", "basic"); + + let client = prepare_database!().with_option(client_setting_name, client_setting_value); + + let value = client + .query("SELECT value FROM system.settings WHERE name = ? OR name = ? ORDER BY name") + .bind(query_setting_name) + .bind(client_setting_name) + .with_option(query_setting_name, query_setting_value) + .fetch_all::() + .await + .unwrap(); + + // should keep the client options + assert_eq!(value, vec!(query_setting_value, client_setting_value)); +} + +#[tokio::test] +async fn overrides_client_options() { + let (setting_name, setting_value, override_value) = ("max_block_size", "1000", "2000"); + + let client = prepare_database!().with_option(setting_name, setting_value); + + let value = client + .query("SELECT value FROM system.settings WHERE name = ?") + .bind(setting_name) + .with_option(setting_name, override_value) + .fetch_one::() + .await + .unwrap(); + + // should override the client options + assert_eq!(value, override_value); +}