Skip to content

Commit

Permalink
style: apply more rustfmt settings
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Jul 25, 2024
1 parent 869b217 commit f6ff213
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 77 deletions.
3 changes: 1 addition & 2 deletions benches/select.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::convert::Infallible;
use std::mem;
use std::{convert::Infallible, mem};

use bytes::Bytes;
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
Expand Down
11 changes: 6 additions & 5 deletions src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use crate::request_body::RequestBody;

/// A trait for underlying HTTP client.
///
/// Firstly, now it is implemented only for `hyper_util::client::legacy::Client`,
/// it's impossible to use another HTTP client.
/// Firstly, now it is implemented only for
/// `hyper_util::client::legacy::Client`, it's impossible to use another HTTP
/// client.
///
/// Secondly, although it's stable in terms of semver, it will be changed in the future
/// (e.g. to support more runtimes, not only tokio). Thus, prefer to open a feature
/// request instead of implementing this trait manually.
/// Secondly, although it's stable in terms of semver, it will be changed in the
/// future (e.g. to support more runtimes, not only tokio). Thus, prefer to open
/// a feature request instead of implementing this trait manually.
#[sealed]
pub trait HttpClient: Send + Sync + 'static {
fn request(&self, req: Request<RequestBody>) -> ResponseFuture;
Expand Down
22 changes: 12 additions & 10 deletions src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ impl<T> Insert<T> {
/// `None` disables the timeout, it's a default.
/// It's roughly equivalent to `tokio::time::timeout(insert.write(..))`.
///
/// `end_timeout` restricts time on waiting for a response from the CH server.
/// Thus, it includes all work needed to handle `INSERT` by the CH server,
/// e.g. handling all materialized views and so on.
/// `end_timeout` restricts time on waiting for a response from the CH
/// server. Thus, it includes all work needed to handle `INSERT` by the
/// CH server, e.g. handling all materialized views and so on.
/// `None` disables the timeout, it's a default.
/// It's roughly equivalent to `tokio::time::timeout(insert.end(..))`.
///
/// These timeouts are much more performant (~x10) than wrapping `write()` and `end()` calls
/// into `tokio::time::timeout()`.
/// These timeouts are much more performant (~x10) than wrapping `write()`
/// and `end()` calls into `tokio::time::timeout()`.
pub fn with_timeouts(
mut self,
send_timeout: Option<Duration>,
Expand All @@ -153,7 +153,8 @@ impl<T> Insert<T> {
}

/// Serializes the provided row into an internal buffer.
/// Once the buffer is full, it's sent to a background task writing to the socket.
/// Once the buffer is full, it's sent to a background task writing to the
/// socket.
///
/// Close to:
/// ```ignore
Expand All @@ -162,8 +163,9 @@ impl<T> Insert<T> {
///
/// A returned future doesn't depend on the row's lifetime.
///
/// Returns an error if the row cannot be serialized or the background task failed.
/// Once failed, the whole `INSERT` is aborted and cannot be used anymore.
/// Returns an error if the row cannot be serialized or the background task
/// failed. Once failed, the whole `INSERT` is aborted and cannot be
/// used anymore.
///
/// # Panics
/// If called after previous call returned an error.
Expand Down Expand Up @@ -202,8 +204,8 @@ impl<T> Insert<T> {

/// Ends `INSERT`, the server starts processing the data.
///
/// Succeeds if the server returns 200, that means the `INSERT` was handled successfully,
/// including all materialized views and quorum writes.
/// Succeeds if the server returns 200, that means the `INSERT` was handled
/// successfully, including all materialized views and quorum writes.
///
/// NOTE: If it isn't called, the whole `INSERT` is aborted.
pub async fn end(mut self) -> Result<()> {
Expand Down
34 changes: 20 additions & 14 deletions src/inserter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use crate::{error::Result, insert::Insert, row::Row, ticks::Ticks, Client};

/// Performs multiple consecutive `INSERT`s.
///
/// By default, it doesn't end the current active `INSERT` automatically.
/// By default, it **doesn't** end the current active `INSERT` automatically.
/// Use `with_max_bytes`, `with_max_rows` and `with_period` to set limits.
/// Alternatively, use `force_commit` to end an active `INSERT` whenever you want.
/// Alternatively, call `force_commit` to forcibly end an active `INSERT`.
///
/// Rows are being sent progressively to spread network load.
///
/// All rows written by [`Inserter::write()`] between [`Inserter::commit()`] calls
/// are sent in one `INSERT` statement.
/// All rows written by [`Inserter::write()`] between [`Inserter::commit()`]
/// calls are sent in one `INSERT` statement.
#[must_use]
pub struct Inserter<T> {
client: Client,
Expand Down Expand Up @@ -83,32 +83,36 @@ where

/// The maximum number of uncompressed bytes in one `INSERT` statement.
///
/// Note: ClickHouse inserts batches atomically only if all rows fit in the same partition
/// and their number is less [`max_insert_block_size`](https://clickhouse.tech/docs/en/operations/settings/settings/#settings-max_insert_block_size).
/// Note: ClickHouse inserts batches atomically only if all rows fit in the
/// same partition and their number is less [`max_insert_block_size`].
///
/// Unlimited (`u64::MAX`) by default.
///
/// [`max_insert_block_size`]: https://clickhouse.tech/docs/en/operations/settings/settings/#settings-max_insert_block_size
pub fn with_max_bytes(mut self, threshold: u64) -> Self {
self.set_max_bytes(threshold);
self
}

/// The maximum number of rows in one `INSERT` statement.
///
/// Note: ClickHouse inserts batches atomically only if all rows fit in the same partition
/// and their number is less [`max_insert_block_size`](https://clickhouse.tech/docs/en/operations/settings/settings/#settings-max_insert_block_size).
/// Note: ClickHouse inserts batches atomically only if all rows fit in the
/// same partition and their number is less [`max_insert_block_size`].
///
/// Unlimited (`u64::MAX`) by default.
///
/// [`max_insert_block_size`]: https://clickhouse.tech/docs/en/operations/settings/settings/#settings-max_insert_block_size
pub fn with_max_rows(mut self, threshold: u64) -> Self {
self.set_max_rows(threshold);
self
}

/// The time between `INSERT`s.
///
/// Note that [`Inserter`] doesn't spawn tasks or threads to check the elapsed time,
/// all checks are performend only on [`Inserter::commit()`] calls.
/// However, it's possible to use [`Inserter::time_left()`] and set a timer up
/// to call [`Inserter::commit()`] to check passed time again.
/// Note that [`Inserter`] doesn't spawn tasks or threads to check the
/// elapsed time, all checks are performend only on [`Inserter::commit()`].
/// However, it's possible to use [`Inserter::time_left()`] and set a
/// timer up to call [`Inserter::commit()`] to check passed time again.
///
/// Extra ticks are skipped if the previous `INSERT` is still in progress:
/// ```text
Expand All @@ -122,14 +126,16 @@ where
self
}

/// Adds a bias to the period. The actual period will be in the following range:
/// Adds a bias to the period, so actual period is in the following range:
///
/// ```text
/// [period * (1 - bias), period * (1 + bias)]
/// ```
///
/// The `bias` parameter is clamped to the range `[0, 1]`.
///
/// It helps to avoid producing a lot of `INSERT`s at the same time by multiple inserters.
/// It helps to avoid producing a lot of `INSERT`s at the same time by
/// multiple inserters.
pub fn with_period_bias(mut self, bias: f64) -> Self {
self.set_period_bias(bias);
self
Expand Down
17 changes: 11 additions & 6 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,14 @@ async fn collect_bad_response(
Err(_) => return Error::BadResponse(stringify_status(status)),
};

// Try to decompress the body, because CH compresses any responses, even with errors.
// Try to decompress the body, because CH uses compression even for errors.
let stream = stream::once(future::ready(Result::<_>::Ok(raw_bytes.slice(..))));
let stream = Decompress::new(stream, compression);

// We're collecting already fetched chunks, thus only decompression errors can be here.
// If decompression is failed, we should try the raw body because it can be sent without
// any compression if some proxy is used, which typically know nothing about CH params.
// We're collecting already fetched chunks, thus only decompression errors can
// be here. If decompression is failed, we should try the raw body because
// it can be sent without any compression if some proxy is used, which
// typically know nothing about CH params.
let bytes = collect_bytes(stream).await.unwrap_or(raw_bytes);

let reason = String::from_utf8(bytes.into())
Expand Down Expand Up @@ -261,7 +262,8 @@ where
if let Some(err) = extract_exception(chunk) {
*self = Self::Exception(Some(err));

// NOTE: now `chunk` can be empty, but it's ok for callers.
// NOTE: now `chunk` can be empty, but it's ok for
// callers.
}
}

Expand All @@ -273,9 +275,12 @@ where
}

// Format:
// ```
// <data>Code: <code>. DB::Exception: <desc> (version <version> (official build))\n
// ```
fn extract_exception(chunk: &mut Bytes) -> Option<Error> {
// `))\n` is very rare in real data and occurs with a probability of ~6*10^-8 in random ones.
// `))\n` is very rare in real data, so it's fast dirty check.
// In random data, it occurs with a probability of ~6*10^-8 only.
if chunk.ends_with(b"))\n") {
extract_exception_slow(chunk)
} else {
Expand Down
23 changes: 17 additions & 6 deletions src/rowbinary/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde::{

use crate::error::{Error, Result};

/// Deserializes a value from `buffer` with a message encoded in the RowBinary format.
/// Deserializes a value from `buffer` with a row encoded in `RowBinary`.
pub(crate) fn deserialize_from<'de, T: Deserialize<'de>>(
input: impl Buf,
temp_buf: &'de mut [u8],
Expand Down Expand Up @@ -78,24 +78,35 @@ macro_rules! impl_num {
impl<'de, 'a, B: Buf> Deserializer<'de> for &'a mut RowBinaryDeserializer<'de, B> {
type Error = Error;

#[inline]
fn deserialize_any<V: Visitor<'de>>(self, _: V) -> Result<V::Value> {
Err(Error::DeserializeAnyNotSupported)
}

impl_num!(i8, deserialize_i8, visit_i8, get_i8);

impl_num!(i16, deserialize_i16, visit_i16, get_i16_le);

impl_num!(i32, deserialize_i32, visit_i32, get_i32_le);

impl_num!(i64, deserialize_i64, visit_i64, get_i64_le);

impl_num!(i128, deserialize_i128, visit_i128, get_i128_le);

impl_num!(u8, deserialize_u8, visit_u8, get_u8);

impl_num!(u16, deserialize_u16, visit_u16, get_u16_le);

impl_num!(u32, deserialize_u32, visit_u32, get_u32_le);

impl_num!(u64, deserialize_u64, visit_u64, get_u64_le);

impl_num!(u128, deserialize_u128, visit_u128, get_u128_le);

impl_num!(f32, deserialize_f32, visit_f32, get_f32_le);

impl_num!(f64, deserialize_f64, visit_f64, get_f64_le);

#[inline]
fn deserialize_any<V: Visitor<'de>>(self, _: V) -> Result<V::Value> {
Err(Error::DeserializeAnyNotSupported)
}

#[inline]
fn deserialize_unit<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
// TODO: revise this.
Expand Down
25 changes: 18 additions & 7 deletions src/rowbinary/ser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,38 @@ macro_rules! impl_num {
}

impl<'a, B: BufMut> Serializer for &'a mut RowBinarySerializer<B> {
type Ok = ();
type Error = Error;
type Ok = ();
type SerializeMap = Impossible<(), Error>;
type SerializeSeq = Self;
type SerializeStruct = Self;
type SerializeStructVariant = Impossible<(), Error>;
type SerializeTuple = Self;
type SerializeTupleStruct = Impossible<(), Error>;
type SerializeTupleVariant = Impossible<(), Error>;
type SerializeMap = Impossible<(), Error>;
type SerializeStruct = Self;
type SerializeStructVariant = Impossible<(), Error>;

impl_num!(i8, serialize_i8, put_i8);

impl_num!(i16, serialize_i16, put_i16_le);

impl_num!(i32, serialize_i32, put_i32_le);

impl_num!(i64, serialize_i64, put_i64_le);

impl_num!(i128, serialize_i128, put_i128_le);

impl_num!(u8, serialize_u8, put_u8);

impl_num!(u16, serialize_u16, put_u16_le);

impl_num!(u32, serialize_u32, put_u32_le);

impl_num!(u64, serialize_u64, put_u64_le);

impl_num!(u128, serialize_u128, put_u128_le);

impl_num!(f32, serialize_f32, put_f32_le);

impl_num!(f64, serialize_f64, put_f64_le);

#[inline]
Expand Down Expand Up @@ -191,8 +202,8 @@ impl<'a, B: BufMut> Serializer for &'a mut RowBinarySerializer<B> {
}

impl<'a, B: BufMut> SerializeStruct for &'a mut RowBinarySerializer<B> {
type Ok = ();
type Error = Error;
type Ok = ();

#[inline]
fn serialize_field<T: Serialize + ?Sized>(&mut self, _: &'static str, value: &T) -> Result<()> {
Expand All @@ -206,8 +217,8 @@ impl<'a, B: BufMut> SerializeStruct for &'a mut RowBinarySerializer<B> {
}

impl<'a, B: BufMut> SerializeSeq for &'a mut RowBinarySerializer<B> {
type Ok = ();
type Error = Error;
type Ok = ();

fn serialize_element<T: Serialize + ?Sized>(&mut self, value: &T) -> Result<()> {
value.serialize(&mut **self)
Expand All @@ -219,8 +230,8 @@ impl<'a, B: BufMut> SerializeSeq for &'a mut RowBinarySerializer<B> {
}

impl<'a, B: BufMut> SerializeTuple for &'a mut RowBinarySerializer<B> {
type Ok = ();
type Error = Error;
type Ok = ();

#[inline]
fn serialize_element<T>(&mut self, value: &T) -> Result<()>
Expand Down
40 changes: 20 additions & 20 deletions src/rowbinary/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,47 +63,47 @@ fn sample() -> Sample<'static> {
fn sample_serialized() -> Vec<u8> {
vec![
// [Int8] -42
0xd6, /**/
0xd6, //
// [Int32] -3242
0x56, 0xf3, 0xff, 0xff, /**/
0x56, 0xf3, 0xff, 0xff, //
// [Int64] -6442
0xd6, 0xe6, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, /**/
0xd6, 0xe6, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, //
// [UInt8] 42
0x2a, /**/
0x2a, //
// [UInt32] 3242
0xaa, 0x0c, 0x00, 0x00, /**/
0xaa, 0x0c, 0x00, 0x00, //
// [UInt64] 6442
0x2a, 0x19, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, /**/
0x2a, 0x19, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, //
// [Float32] 42.42
0x14, 0xae, 0x29, 0x42, /**/
0x14, 0xae, 0x29, 0x42, //
// [Float64] 42.42
0xf6, 0x28, 0x5c, 0x8f, 0xc2, 0x35, 0x45, 0x40, /**/
0xf6, 0x28, 0x5c, 0x8f, 0xc2, 0x35, 0x45, 0x40, //
// [DateTime] 2042-12-12 12:42:42
// (ts: 2301990162)
0x12, 0x95, 0x35, 0x89, /**/
0x12, 0x95, 0x35, 0x89, //
// [DateTime64(3)] 2042-12-12 12:42:42'123
// (ts: 2301990162123)
0xcb, 0x4e, 0x4e, 0xf9, 0x17, 0x02, 0x00, 0x00, /**/
0xcb, 0x4e, 0x4e, 0xf9, 0x17, 0x02, 0x00, 0x00, //
// [Decimal64(9)] 42.420000000
0x00, 0xd5, 0x6d, 0xe0, 0x09, 0x00, 0x00, 0x00, /**/
0x00, 0xd5, 0x6d, 0xe0, 0x09, 0x00, 0x00, 0x00, //
// [Decimal128(9)] 42.420000000
0x00, 0xd5, 0x6d, 0xe0, 0x09, 0x00, 0x00, 0x00, /**/
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, /**/
0x00, 0xd5, 0x6d, 0xe0, 0x09, 0x00, 0x00, 0x00, //
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, //
// [String] 5 "01234"
0x05, 0x30, 0x31, 0x32, 0x33, 0x34, /**/
0x05, 0x30, 0x31, 0x32, 0x33, 0x34, //
// [String] 5 [0, 1, 2, 3, 4]
0x05, 0x00, 0x01, 0x02, 0x03, 0x04, /**/
0x05, 0x00, 0x01, 0x02, 0x03, 0x04, //
// [Nullable(Decimal64(9))] NULL
0x01, /**/
0x01, //
// [Nullable(DateTime)] 2042-12-12 12:42:42
// (ts: 2301990162)
0x00, 0x12, 0x95, 0x35, 0x89, /**/
0x00, 0x12, 0x95, 0x35, 0x89, //
// [FixedString(4)] [b'B', b'T', b'C', 0]
0x42, 0x54, 0x43, 0x00, /**/
0x42, 0x54, 0x43, 0x00, //
// [Array(Int32)] [-42, 42, -42, 42]
0x04, 0xd6, 0x2a, 0xd6, 0x2a, /**/
0x04, 0xd6, 0x2a, 0xd6, 0x2a, //
// [Boolean] true
0x01, /**/
0x01, //
]
}

Expand Down
Loading

0 comments on commit f6ff213

Please sign in to comment.