Skip to content

Commit

Permalink
feat(query,insert): add query-level settings override (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
slvrtrn authored Aug 7, 2024
1 parent 94bcfbb commit c47ca82
Show file tree
Hide file tree
Showing 13 changed files with 490 additions and 73 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- query/bind: support `Option` in `query.bind(arg)` ([#119], [#120]).
- client: `Client::with_header()` to provide custom headers ([#98], [#108]).
- query: added `Query::with_option()` similar to `Client::with_option()` ([#123]).
- insert: added `Insert::with_option()` similar to `Client::with_option()` ([#123]).
- inserter: added `Inserter::with_option()` similar to `Client::with_option()` ([#123]).

### Changed
- insert: the outgoing request is now created after the first `Insert::write` call instead of `Insert::new` ([#123]).

[#123]: https://github.com/ClickHouse/clickhouse-rs/pull/123
[#120]: https://github.com/ClickHouse/clickhouse-rs/pull/120
[#119]: https://github.com/ClickHouse/clickhouse-rs/issues/119
[#108]: https://github.com/ClickHouse/clickhouse-rs/pull/108
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ uuid = { version = "1", optional = true }
time = { version = "0.3", optional = true }
bstr = { version = "1.2", default-features = false }
quanta = { version = "0.12", optional = true }
replace_with = { version = "0.1.7" }

[dev-dependencies]
criterion = "0.5.0"
Expand Down
23 changes: 23 additions & 0 deletions examples/clickhouse_settings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use clickhouse::{error::Result, Client};

/// Besides [`Client::query`], it works similarly with [`Client::insert`] and [`Client::inserter`].
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::default()
.with_url("http://localhost:8123")
// This setting is global and will be applied to all queries.
.with_option("limit", "100");

let numbers = client
.query("SELECT number FROM system.numbers")
// This setting will be applied to this particular query only;
// it will override the global client setting.
.with_option("limit", "3")
.fetch_all::<u64>()
.await?;

// note that it prints the first 3 numbers only (because of the setting override)
println!("{numbers:?}");

Ok(())
}
223 changes: 154 additions & 69 deletions src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{future::Future, marker::PhantomData, mem, panic, pin::Pin, time::Durat

use bytes::{Bytes, BytesMut};
use hyper::{self, Request};
use replace_with::replace_with_or_abort;
use serde::Serialize;
use tokio::{
task::JoinHandle,
Expand All @@ -28,19 +29,71 @@ const MIN_CHUNK_SIZE: usize = BUFFER_SIZE - 1024; // slightly less to avoid extr
/// Rows are being sent progressively to spread network load.
#[must_use]
pub struct Insert<T> {
state: InsertState,
buffer: BytesMut,
sender: Option<ChunkSender>,
#[cfg(feature = "lz4")]
compression: Compression,
send_timeout: Option<Duration>,
end_timeout: Option<Duration>,
// Use boxed `Sleep` to reuse a timer entry, it improves performance.
// Also, `tokio::time::timeout()` significantly increases a future's size.
sleep: Pin<Box<Sleep>>,
handle: JoinHandle<Result<()>>,
_marker: PhantomData<fn() -> T>, // TODO: test contravariance.
}

enum InsertState {
NotStarted {
client: Box<Client>,
sql: String,
},
Active {
sender: ChunkSender,
handle: JoinHandle<Result<()>>,
},
Terminated {
handle: JoinHandle<Result<()>>,
},
Completed,
}

impl InsertState {
fn sender(&mut self) -> Option<&mut ChunkSender> {
match self {
InsertState::Active { sender, .. } => Some(sender),
_ => None,
}
}
fn handle(&mut self) -> Option<&mut JoinHandle<Result<()>>> {
match self {
InsertState::Active { handle, .. } | InsertState::Terminated { handle } => Some(handle),
_ => None,
}
}
fn client_with_sql(&self) -> Option<(&Client, &str)> {
match self {
InsertState::NotStarted { client, sql } => Some((client, sql)),
_ => None,
}
}
fn terminated(&mut self) {
debug_assert!(matches!(self, InsertState::Active { .. }));
replace_with_or_abort(self, |_self| match _self {
InsertState::Active { handle, .. } => InsertState::Terminated { handle },
_ => unreachable!(),
});
}
fn with_option(&mut self, name: impl Into<String>, value: impl Into<String>) {
assert!(matches!(self, InsertState::NotStarted { .. }));
replace_with_or_abort(self, |_self| match _self {
InsertState::NotStarted { mut client, sql } => {
client.add_option(name, value);
InsertState::NotStarted { client, sql }
}
_ => unreachable!(),
});
}
}

// It should be a regular function, but it decreases performance.
macro_rules! timeout {
($self:expr, $timeout:ident, $fut:expr) => {{
Expand All @@ -56,70 +109,29 @@ macro_rules! timeout {
}

impl<T> Insert<T> {
// TODO: remove Result
pub(crate) fn new(client: &Client, table: &str) -> Result<Self>
where
T: Row,
{
let mut url = Url::parse(&client.url).map_err(|err| Error::InvalidParams(err.into()))?;
let mut pairs = url.query_pairs_mut();
pairs.clear();

if let Some(database) = &client.database {
pairs.append_pair("database", database);
}

let fields = row::join_column_names::<T>()
.expect("the row type must be a struct or a wrapper around it");

// TODO: what about escaping a table name?
// https://clickhouse.yandex/docs/en/query_language/syntax/#syntax-identifiers
let query = format!("INSERT INTO {table}({fields}) FORMAT RowBinary");
pairs.append_pair("query", &query);

if client.compression.is_lz4() {
pairs.append_pair("decompress", "1");
}

for (name, value) in &client.options {
pairs.append_pair(name, value);
}

drop(pairs);

let mut builder = Request::post(url.as_str());

for (name, value) in &client.headers {
builder = builder.header(name, value);
}

if let Some(user) = &client.user {
builder = builder.header("X-ClickHouse-User", user);
}

if let Some(password) = &client.password {
builder = builder.header("X-ClickHouse-Key", password);
}

let (sender, body) = RequestBody::chunked();

let request = builder
.body(body)
.map_err(|err| Error::InvalidParams(Box::new(err)))?;

let future = client.http.request(request);
// TODO: introduce `Executor` to allow bookkeeping of spawned tasks.
let handle =
tokio::spawn(async move { Response::new(future, Compression::None).finish().await });
// https://clickhouse.com/docs/en/sql-reference/syntax#identifiers
let sql = format!("INSERT INTO {}({}) FORMAT RowBinary", table, fields);

Ok(Self {
state: InsertState::NotStarted {
client: Box::new(client.clone()),
sql,
},
buffer: BytesMut::with_capacity(BUFFER_SIZE),
sender: Some(sender),
#[cfg(feature = "lz4")]
compression: client.compression,
send_timeout: None,
end_timeout: None,
sleep: Box::pin(tokio::time::sleep(Duration::new(0, 0))),
handle,
_marker: PhantomData,
})
}
Expand All @@ -128,13 +140,13 @@ impl<T> Insert<T> {
///
/// `send_timeout` restricts time on sending a data chunk to a socket.
/// `None` disables the timeout, it's a default.
/// It's roughly equivalent to `tokio::time::timeout(insert.write(..))`.
/// It's roughly equivalent to `tokio::time::timeout(insert.write(...))`.
///
/// `end_timeout` restricts time on waiting for a response from the CH
/// server. Thus, it includes all work needed to handle `INSERT` by the
/// CH server, e.g. handling all materialized views and so on.
/// `None` disables the timeout, it's a default.
/// It's roughly equivalent to `tokio::time::timeout(insert.end(..))`.
/// It's roughly equivalent to `tokio::time::timeout(insert.end(...))`.
///
/// These timeouts are much more performant (~x10) than wrapping `write()`
/// and `end()` calls into `tokio::time::timeout()`.
Expand All @@ -147,6 +159,15 @@ impl<T> Insert<T> {
self
}

/// Similar to [`Client::with_option`], but for this particular INSERT statement only.
/// # Panics
/// If called after the insert request is started, e.g., after [`Insert::write`].
#[track_caller]
pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.state.with_option(name, value);
self
}

pub(crate) fn set_timeouts(
&mut self,
send_timeout: Option<Duration>,
Expand All @@ -172,7 +193,7 @@ impl<T> Insert<T> {
/// used anymore.
///
/// # Panics
/// If called after previous call returned an error.
/// If called after the previous call that returned an error.
pub fn write<'a>(&'a mut self, row: &T) -> impl Future<Output = Result<()>> + 'a + Send
where
T: Serialize,
Expand All @@ -193,7 +214,11 @@ impl<T> Insert<T> {
where
T: Serialize,
{
assert!(self.sender.is_some(), "write() after error");
match self.state {
InsertState::NotStarted { .. } => self.init_request(),
InsertState::Active { .. } => Ok(()),
_ => panic!("write() after error"),
}?;

let old_buf_size = self.buffer.len();
let result = rowbinary::serialize_into(&mut self.buffer, row);
Expand All @@ -216,22 +241,19 @@ impl<T> Insert<T> {
if !self.buffer.is_empty() {
self.send_chunk().await?;
}

self.sender = None; // terminate the sender successfully
self.state.terminated();
self.wait_handle().await
}

async fn send_chunk(&mut self) -> Result<()> {
if self.sender.is_none() {
return Ok(());
}
debug_assert!(matches!(self.state, InsertState::Active { .. }));

// Hyper uses non-trivial and inefficient schema of buffering chunks.
// It's difficult to determine when allocations occur.
// So, instead we control it manually here and rely on the system allocator.
let chunk = self.take_and_prepare_chunk()?;

let sender = self.sender.as_mut().unwrap(); // checked above
let sender = self.state.sender().unwrap(); // checked above

let is_timed_out = match timeout!(self, send_timeout, sender.send(chunk)) {
Some(true) => return Ok(()),
Expand All @@ -255,15 +277,22 @@ impl<T> Insert<T> {
}

async fn wait_handle(&mut self) -> Result<()> {
match timeout!(self, end_timeout, &mut self.handle) {
Some(Ok(res)) => res,
Some(Err(err)) if err.is_panic() => panic::resume_unwind(err.into_panic()),
Some(Err(err)) => Err(Error::Custom(format!("unexpected error: {err}"))),
None => {
// We can do nothing useful here, so just shut down the background task.
self.handle.abort();
Err(Error::TimedOut)
match self.state.handle() {
Some(handle) => {
let result = match timeout!(self, end_timeout, &mut *handle) {
Some(Ok(res)) => res,
Some(Err(err)) if err.is_panic() => panic::resume_unwind(err.into_panic()),
Some(Err(err)) => Err(Error::Custom(format!("unexpected error: {err}"))),
None => {
// We can do nothing useful here, so just shut down the background task.
handle.abort();
Err(Error::TimedOut)
}
};
self.state = InsertState::Completed;
result
}
_ => Ok(()),
}
}

Expand All @@ -283,8 +312,64 @@ impl<T> Insert<T> {
Ok(mem::replace(&mut self.buffer, BytesMut::with_capacity(BUFFER_SIZE)).freeze())
}

#[cold]
#[track_caller]
#[inline(never)]
fn init_request(&mut self) -> Result<()> {
debug_assert!(matches!(self.state, InsertState::NotStarted { .. }));
let (client, sql) = self.state.client_with_sql().unwrap(); // checked above

let mut url = Url::parse(&client.url).map_err(|err| Error::InvalidParams(err.into()))?;
let mut pairs = url.query_pairs_mut();
pairs.clear();

if let Some(database) = &client.database {
pairs.append_pair("database", database);
}

pairs.append_pair("query", sql);

if client.compression.is_lz4() {
pairs.append_pair("decompress", "1");
}

for (name, value) in &client.options {
pairs.append_pair(name, value);
}

drop(pairs);

let mut builder = Request::post(url.as_str());

for (name, value) in &client.headers {
builder = builder.header(name, value);
}

if let Some(user) = &client.user {
builder = builder.header("X-ClickHouse-User", user);
}

if let Some(password) = &client.password {
builder = builder.header("X-ClickHouse-Key", password);
}

let (sender, body) = RequestBody::chunked();

let request = builder
.body(body)
.map_err(|err| Error::InvalidParams(Box::new(err)))?;

let future = client.http.request(request);
// TODO: introduce `Executor` to allow bookkeeping of spawned tasks.
let handle =
tokio::spawn(async move { Response::new(future, Compression::None).finish().await });

self.state = InsertState::Active { handle, sender };
Ok(())
}

fn abort(&mut self) {
if let Some(sender) = self.sender.take() {
if let Some(sender) = self.state.sender() {
sender.abort();
}
}
Expand Down
Loading

0 comments on commit c47ca82

Please sign in to comment.