Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query/insert level settings override #123

Merged
merged 12 commits into from
Aug 7, 2024
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- query/bind: support `Option` in `query.bind(arg)` ([#119], [#120]).
- client: `Client::with_header()` to provide custom headers ([#98], [#108]).
- query: added `Query::with_option()` similar to `Client::with_option()` ([#123]).
- insert: added `Insert::with_option()` similar to `Client::with_option()` ([#123]).
- inserter: added `Inserter::with_option()` similar to `Client::with_option()` ([#123]).

### Changed
- insert: the outgoing request is now created after the first `Insert::write` call instead of `Insert::new` ([#123]).

[#123]: https://github.com/ClickHouse/clickhouse-rs/pull/123
[#120]: https://github.com/ClickHouse/clickhouse-rs/pull/120
[#119]: https://github.com/ClickHouse/clickhouse-rs/issues/119
[#108]: https://github.com/ClickHouse/clickhouse-rs/pull/108
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ uuid = { version = "1", optional = true }
time = { version = "0.3", optional = true }
bstr = { version = "1.2", default-features = false }
quanta = { version = "0.12", optional = true }
replace_with = { version = "0.1.7" }
loyd marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
criterion = "0.5.0"
Expand Down
23 changes: 23 additions & 0 deletions examples/clickhouse_settings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use clickhouse::{error::Result, Client};

/// Besides [`Client::query`], it works similarly with [`Client::insert`] and [`Client::inserter`].
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::default()
.with_url("http://localhost:8123")
// This setting is global and will be applied to all queries.
.with_option("limit", "100");

let numbers = client
.query("SELECT number FROM system.numbers")
// This setting will be applied to this particular query only;
// it will override the global client setting.
.with_option("limit", "3")
.fetch_all::<u64>()
.await?;

// note that it prints the first 3 numbers only (because of the setting override)
println!("{numbers:?}");

Ok(())
}
223 changes: 154 additions & 69 deletions src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{future::Future, marker::PhantomData, mem, panic, pin::Pin, time::Durat

use bytes::{Bytes, BytesMut};
use hyper::{self, Request};
use replace_with::replace_with_or_abort;
use serde::Serialize;
use tokio::{
task::JoinHandle,
Expand All @@ -28,19 +29,71 @@ const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; // slightly less to avoid extr
/// Rows are being sent progressively to spread network load.
#[must_use]
pub struct Insert<T> {
state: InsertState,
buffer: BytesMut,
sender: Option<ChunkSender>,
#[cfg(feature = "lz4")]
compression: Compression,
send_timeout: Option<Duration>,
end_timeout: Option<Duration>,
// Use boxed `Sleep` to reuse a timer entry, it improves performance.
// Also, `tokio::time::timeout()` significantly increases a future's size.
sleep: Pin<Box<Sleep>>,
handle: JoinHandle<Result<()>>,
_marker: PhantomData<fn() -> T>, // TODO: test contravariance.
}

enum InsertState {
NotStarted {
client: Box<Client>,
sql: String,
},
Active {
sender: ChunkSender,
handle: JoinHandle<Result<()>>,
},
Terminated {
handle: JoinHandle<Result<()>>,
},
Completed,
}

impl InsertState {
fn sender(&mut self) -> Option<&mut ChunkSender> {
match self {
InsertState::Active { sender, .. } => Some(sender),
_ => None,
}
}
fn handle(&mut self) -> Option<&mut JoinHandle<Result<()>>> {
match self {
InsertState::Active { handle, .. } | InsertState::Terminated { handle } => Some(handle),
_ => None,
}
}
fn client_with_sql(&self) -> Option<(&Client, &str)> {
match self {
InsertState::NotStarted { client, sql } => Some((client, sql)),
_ => None,
}
}
fn terminated(&mut self) {
debug_assert!(matches!(self, InsertState::Active { .. }));
replace_with_or_abort(self, |_self| match _self {
InsertState::Active { handle, .. } => InsertState::Terminated { handle },
_ => unreachable!(),
});
}
fn with_option(&mut self, name: impl Into<String>, value: impl Into<String>) {
assert!(matches!(self, InsertState::NotStarted { .. }));
replace_with_or_abort(self, |_self| match _self {
InsertState::NotStarted { mut client, sql } => {
client.add_option(name, value);
InsertState::NotStarted { client, sql }
}
_ => unreachable!(),
});
}
}

// It should be a regular function, but it decreases performance.
macro_rules! timeout {
($self:expr, $timeout:ident, $fut:expr) => {{
Expand All @@ -56,70 +109,29 @@ macro_rules! timeout {
}

impl<T> Insert<T> {
// TODO: remove Result
pub(crate) fn new(client: &Client, table: &str) -> Result<Self>
where
loyd marked this conversation as resolved.
Show resolved Hide resolved
T: Row,
{
let mut url = Url::parse(&client.url).map_err(|err| Error::InvalidParams(err.into()))?;
let mut pairs = url.query_pairs_mut();
pairs.clear();

if let Some(database) = &client.database {
pairs.append_pair("database", database);
}

let fields = row::join_column_names::<T>()
.expect("the row type must be a struct or a wrapper around it");

// TODO: what about escaping a table name?
// https://clickhouse.yandex/docs/en/query_language/syntax/#syntax-identifiers
let query = format!("INSERT INTO {table}({fields}) FORMAT RowBinary");
pairs.append_pair("query", &query);

if client.compression.is_lz4() {
pairs.append_pair("decompress", "1");
}

for (name, value) in &client.options {
pairs.append_pair(name, value);
}

drop(pairs);

let mut builder = Request::post(url.as_str());

for (name, value) in &client.headers {
builder = builder.header(name, value);
}

if let Some(user) = &client.user {
builder = builder.header("X-ClickHouse-User", user);
}

if let Some(password) = &client.password {
builder = builder.header("X-ClickHouse-Key", password);
}

let (sender, body) = RequestBody::chunked();

let request = builder
.body(body)
.map_err(|err| Error::InvalidParams(Box::new(err)))?;

let future = client.http.request(request);
// TODO: introduce `Executor` to allow bookkeeping of spawned tasks.
let handle =
tokio::spawn(async move { Response::new(future, Compression::None).finish().await });
// https://clickhouse.com/docs/en/sql-reference/syntax#identifiers
let sql = format!("INSERT INTO {}({}) FORMAT RowBinary", table, fields);

Ok(Self {
state: InsertState::NotStarted {
client: Box::new(client.clone()),
sql,
},
buffer: BytesMut::with_capacity(BUFFER_SIZE),
sender: Some(sender),
#[cfg(feature = "lz4")]
compression: client.compression,
send_timeout: None,
end_timeout: None,
sleep: Box::pin(tokio::time::sleep(Duration::new(0, 0))),
handle,
_marker: PhantomData,
})
}
Expand All @@ -128,13 +140,13 @@ impl<T> Insert<T> {
///
/// `send_timeout` restricts time on sending a data chunk to a socket.
/// `None` disables the timeout, it's a default.
/// It's roughly equivalent to `tokio::time::timeout(insert.write(..))`.
/// It's roughly equivalent to `tokio::time::timeout(insert.write(...))`.
///
/// `end_timeout` restricts time on waiting for a response from the CH
/// server. Thus, it includes all work needed to handle `INSERT` by the
/// CH server, e.g. handling all materialized views and so on.
/// `None` disables the timeout, it's a default.
/// It's roughly equivalent to `tokio::time::timeout(insert.end(..))`.
/// It's roughly equivalent to `tokio::time::timeout(insert.end(...))`.
///
/// These timeouts are much more performant (~x10) than wrapping `write()`
/// and `end()` calls into `tokio::time::timeout()`.
Expand All @@ -147,6 +159,15 @@ impl<T> Insert<T> {
self
}

/// Similar to [`Client::with_option`], but for this particular INSERT statement only.
/// # Panics
/// If called after the insert request is started, e.g., after [`Insert::write`].
#[track_caller]
pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
loyd marked this conversation as resolved.
Show resolved Hide resolved
self.state.with_option(name, value);
self
}

pub(crate) fn set_timeouts(
&mut self,
send_timeout: Option<Duration>,
Expand All @@ -172,7 +193,7 @@ impl<T> Insert<T> {
/// used anymore.
///
/// # Panics
/// If called after previous call returned an error.
/// If called after the previous call that returned an error.
pub fn write<'a>(&'a mut self, row: &T) -> impl Future<Output = Result<()>> + 'a + Send
where
T: Serialize,
Expand All @@ -193,7 +214,11 @@ impl<T> Insert<T> {
where
T: Serialize,
{
assert!(self.sender.is_some(), "write() after error");
loyd marked this conversation as resolved.
Show resolved Hide resolved
match self.state {
InsertState::NotStarted { .. } => self.init_request(),
InsertState::Active { .. } => Ok(()),
_ => panic!("write() after error"),
}?;

let old_buf_size = self.buffer.len();
let result = rowbinary::serialize_into(&mut self.buffer, row);
Expand All @@ -216,22 +241,19 @@ impl<T> Insert<T> {
if !self.buffer.is_empty() {
self.send_chunk().await?;
}

self.sender = None; // terminate the sender successfully
self.state.terminated();
self.wait_handle().await
}

async fn send_chunk(&mut self) -> Result<()> {
if self.sender.is_none() {
return Ok(());
}
debug_assert!(matches!(self.state, InsertState::Active { .. }));

// Hyper uses non-trivial and inefficient schema of buffering chunks.
// It's difficult to determine when allocations occur.
// So, instead we control it manually here and rely on the system allocator.
let chunk = self.take_and_prepare_chunk()?;

let sender = self.sender.as_mut().unwrap(); // checked above
let sender = self.state.sender().unwrap(); // checked above

let is_timed_out = match timeout!(self, send_timeout, sender.send(chunk)) {
Some(true) => return Ok(()),
Expand All @@ -255,15 +277,22 @@ impl<T> Insert<T> {
}

async fn wait_handle(&mut self) -> Result<()> {
match timeout!(self, end_timeout, &mut self.handle) {
Some(Ok(res)) => res,
Some(Err(err)) if err.is_panic() => panic::resume_unwind(err.into_panic()),
Some(Err(err)) => Err(Error::Custom(format!("unexpected error: {err}"))),
None => {
// We can do nothing useful here, so just shut down the background task.
self.handle.abort();
Err(Error::TimedOut)
match self.state.handle() {
Some(handle) => {
let result = match timeout!(self, end_timeout, &mut *handle) {
Some(Ok(res)) => res,
Some(Err(err)) if err.is_panic() => panic::resume_unwind(err.into_panic()),
Some(Err(err)) => Err(Error::Custom(format!("unexpected error: {err}"))),
None => {
// We can do nothing useful here, so just shut down the background task.
handle.abort();
Err(Error::TimedOut)
}
};
self.state = InsertState::Completed;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If panic happens, this isn't executed. In general, it should never happen, although.

result
}
_ => Ok(()),
}
}

Expand All @@ -283,8 +312,64 @@ impl<T> Insert<T> {
Ok(mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)).freeze())
}

#[cold]
#[track_caller]
#[inline(never)]
fn init_request(&mut self) -> Result<()> {
loyd marked this conversation as resolved.
Show resolved Hide resolved
debug_assert!(matches!(self.state, InsertState::NotStarted { .. }));
let (client, sql) = self.state.client_with_sql().unwrap(); // checked above

let mut url = Url::parse(&client.url).map_err(|err| Error::InvalidParams(err.into()))?;
let mut pairs = url.query_pairs_mut();
pairs.clear();

if let Some(database) = &client.database {
pairs.append_pair("database", database);
}

pairs.append_pair("query", sql);

if client.compression.is_lz4() {
pairs.append_pair("decompress", "1");
}

for (name, value) in &client.options {
pairs.append_pair(name, value);
}

drop(pairs);

let mut builder = Request::post(url.as_str());

for (name, value) in &client.headers {
builder = builder.header(name, value);
}

if let Some(user) = &client.user {
builder = builder.header("X-ClickHouse-User", user);
}

if let Some(password) = &client.password {
builder = builder.header("X-ClickHouse-Key", password);
}

let (sender, body) = RequestBody::chunked();

let request = builder
.body(body)
.map_err(|err| Error::InvalidParams(Box::new(err)))?;

let future = client.http.request(request);
// TODO: introduce `Executor` to allow bookkeeping of spawned tasks.
let handle =
tokio::spawn(async move { Response::new(future, Compression::None).finish().await });

self.state = InsertState::Active { handle, sender };
Ok(())
}

fn abort(&mut self) {
if let Some(sender) = self.sender.take() {
if let Some(sender) = self.state.sender() {
sender.abort();
}
}
Expand Down
Loading