From b4cbb3916e11bf9e21d9a3c2bf360422470d4bf3 Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Sun, 20 Oct 2024 11:08:52 +0400 Subject: [PATCH] perf(query): improve SELECTs performance (#169) * Improve performance of `RowCursor::next()`. * Add `RowCursor::{decoded_bytes,received_bytes}` methods. --- CHANGELOG.md | 9 +- Cargo.toml | 2 +- benches/README.md | 42 +++++++ benches/common.rs | 86 ++++++++++++-- benches/insert.rs | 16 +-- benches/select.rs | 23 ++-- benches/select_numbers.rs | 33 ++++-- src/buflist.rs | 167 ---------------------------- src/bytes_ext.rs | 80 +++++++++++++ src/compression/lz4.rs | 96 ++++++++-------- src/cursor.rs | 228 +++++++++++++++++++++----------------- src/error.rs | 6 - src/lib.rs | 2 +- src/query.rs | 20 +--- src/response.rs | 56 +++++----- src/rowbinary/de.rs | 89 +++++++-------- src/rowbinary/tests.rs | 12 +- src/test/handlers.rs | 10 +- src/watch.rs | 4 + tests/it/compression.rs | 26 +---- tests/it/cursor_stats.rs | 51 +++++++++ tests/it/main.rs | 1 + 22 files changed, 569 insertions(+), 490 deletions(-) create mode 100644 benches/README.md delete mode 100644 src/buflist.rs create mode 100644 src/bytes_ext.rs create mode 100644 tests/it/cursor_stats.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 81e9ba7..f1afd9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,16 +1,23 @@ # Changelog All notable changes to this project will be documented in this file. -The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] - ReleaseDate +### Added +- query/cursor: add `RowCursor::{decoded_bytes,received_bytes}` methods ([#169]). + +### Changed +- query/cursor: improve performance of `RowCursor::next()` ([#169]). + ### Fixed - mock: work with the advanced time via `tokio::time::advance()` ([#165]). [#165]: https://github.com/ClickHouse/clickhouse-rs/pull/165 +[#169]: https://github.com/ClickHouse/clickhouse-rs/pull/169 ## [0.13.0] - 2024-09-27 ### Added diff --git a/Cargo.toml b/Cargo.toml index 05bc62f..8ce11e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,7 +93,7 @@ url = "2.1.1" futures = "0.3.5" futures-channel = "0.3.30" static_assertions = "1.1" -sealed = "0.5" +sealed = "0.6" sha-1 = { version = "0.10", optional = true } serde_json = { version = "1.0.68", optional = true } lz4_flex = { version = "0.11.3", default-features = false, features = [ diff --git a/benches/README.md b/benches/README.md new file mode 100644 index 0000000..bd2ccf9 --- /dev/null +++ b/benches/README.md @@ -0,0 +1,42 @@ +# Benchmarks + +All cases are run with `cargo bench --bench `. + +## With a mocked server + +These benchmarks are run against a mocked server, which is a simple HTTP server that responds with a fixed response. This is useful to measure the overhead of the client itself: +* `select` checks throughput of `Client::query()`. +* `insert` checks throughput of `Client::insert()` and `Client::inserter()` (if the `inserter` features is enabled). + +### How to collect perf data + +The crate's code runs on the thread with the name `testee`: +```bash +cargo bench --bench & +perf record -p `ps -AT | grep testee | awk '{print $2}'` --call-graph dwarf,65528 --freq 5000 -g -- sleep 5 +perf script > perf.script +``` + +Then upload the `perf.script` file to [Firefox Profiler](https://profiler.firefox.com). + +## With a running ClickHouse server + +These benchmarks are run against a real ClickHouse server, so it must be started: +```bash +docker run -d -p 8123:8123 -p 9000:9000 --name ch clickhouse/clickhouse-server + +cargo bench --bench +``` + +Cases: +* `select_numbers` measures time of running a big SELECT query to the `system.numbers_mt` table. + +### How to collect perf data + +```bash +cargo bench --bench & +perf record -p `ps -AT | grep | awk '{print $2}'` --call-graph dwarf,65528 --freq 5000 -g -- sleep 5 +perf script > perf.script +``` + +Then upload the `perf.script` file to [Firefox Profiler](https://profiler.firefox.com). diff --git a/benches/common.rs b/benches/common.rs index 2572efe..637447a 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -1,6 +1,14 @@ #![allow(dead_code)] // typical for common test/bench modules :( -use std::{convert::Infallible, future::Future, net::SocketAddr, thread}; +use std::{ + convert::Infallible, + future::Future, + net::SocketAddr, + pin::Pin, + sync::atomic::{AtomicU32, Ordering}, + thread, + time::Duration, +}; use bytes::Bytes; use futures::stream::StreamExt; @@ -11,7 +19,13 @@ use hyper::{ service, Request, Response, }; use hyper_util::rt::{TokioIo, TokioTimer}; -use tokio::{net::TcpListener, runtime}; +use tokio::{ + net::TcpListener, + runtime, + sync::{mpsc, oneshot}, +}; + +use clickhouse::error::Result; pub(crate) struct ServerHandle; @@ -38,14 +52,7 @@ where } }; - thread::spawn(move || { - runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(serving); - }); - + run_on_st_runtime("server", serving); ServerHandle } @@ -57,3 +64,62 @@ pub(crate) async fn skip_incoming(request: Request) { result.unwrap(); } } + +pub(crate) struct RunnerHandle { + tx: mpsc::UnboundedSender, +} + +struct Run { + future: Pin> + Send>>, + callback: oneshot::Sender>, +} + +impl RunnerHandle { + pub(crate) fn run( + &self, + f: impl Future> + Send + 'static, + ) -> Duration { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(Run { + future: Box::pin(f), + callback: tx, + }) + .unwrap(); + + rx.blocking_recv().unwrap().unwrap() + } +} + +pub(crate) fn start_runner() -> RunnerHandle { + let (tx, mut rx) = mpsc::unbounded_channel::(); + + run_on_st_runtime("testee", async move { + while let Some(run) = rx.recv().await { + let result = run.future.await; + let _ = run.callback.send(result); + } + }); + + RunnerHandle { tx } +} + +fn run_on_st_runtime(name: &str, f: impl Future + Send + 'static) { + let name = name.to_string(); + thread::Builder::new() + .name(name.clone()) + .spawn(move || { + let no = AtomicU32::new(0); + runtime::Builder::new_current_thread() + .enable_all() + .thread_name_fn(move || { + let no = no.fetch_add(1, Ordering::Relaxed); + format!("{name}-{no}") + }) + .build() + .unwrap() + .block_on(f); + }) + .unwrap(); +} diff --git a/benches/insert.rs b/benches/insert.rs index 03a7fe3..f4ebc56 100644 --- a/benches/insert.rs +++ b/benches/insert.rs @@ -1,11 +1,14 @@ -use std::{future::Future, mem, time::Duration}; +use std::{ + future::Future, + mem, + time::{Duration, Instant}, +}; use bytes::Bytes; use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; use http_body_util::Empty; use hyper::{body::Incoming, Request, Response}; use serde::Serialize; -use tokio::{runtime::Runtime, time::Instant}; use clickhouse::{error::Result, Client, Compression, Row}; @@ -76,30 +79,29 @@ async fn run_inserter(client: Client, iters: u64) -> Re fn run(c: &mut Criterion, name: &str, port: u16, f: impl Fn(Client, u64) -> F) where - F: Future>, + F: Future> + Send + 'static, { let addr = format!("127.0.0.1:{port}").parse().unwrap(); let _server = common::start_server(addr, serve); + let runner = common::start_runner(); let mut group = c.benchmark_group(name); group.throughput(Throughput::Bytes(mem::size_of::() as u64)); group.bench_function("no compression", |b| { b.iter_custom(|iters| { - let rt = Runtime::new().unwrap(); let client = Client::default() .with_url(format!("http://{addr}")) .with_compression(Compression::None); - rt.block_on((f)(client, iters)).unwrap() + runner.run((f)(client, iters)) }) }); #[cfg(feature = "lz4")] group.bench_function("lz4", |b| { b.iter_custom(|iters| { - let rt = Runtime::new().unwrap(); let client = Client::default() .with_url(format!("http://{addr}")) .with_compression(Compression::Lz4); - rt.block_on((f)(client, iters)).unwrap() + runner.run((f)(client, iters)) }) }); group.finish(); diff --git a/benches/select.rs b/benches/select.rs index 49f8ab9..316015e 100644 --- a/benches/select.rs +++ b/benches/select.rs @@ -1,4 +1,8 @@ -use std::{convert::Infallible, mem}; +use std::{ + convert::Infallible, + mem, + time::{Duration, Instant}, +}; use bytes::Bytes; use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; @@ -9,7 +13,6 @@ use hyper::{ Request, Response, }; use serde::Deserialize; -use tokio::{runtime::Runtime, time::Instant}; use clickhouse::{error::Result, Client, Compression, Row}; @@ -47,6 +50,7 @@ fn select(c: &mut Criterion) { let addr = "127.0.0.1:6543".parse().unwrap(); let chunk = prepare_chunk(); let _server = common::start_server(addr, move |req| serve(req, chunk.clone())); + let runner = common::start_runner(); #[allow(dead_code)] #[derive(Debug, Row, Deserialize)] @@ -57,7 +61,8 @@ fn select(c: &mut Criterion) { d: u32, } - async fn run(client: Client, iters: u64) -> Result<()> { + async fn run(client: Client, iters: u64) -> Result { + let start = Instant::now(); let mut cursor = client .query("SELECT ?fields FROM some") .fetch::()?; @@ -66,32 +71,26 @@ fn select(c: &mut Criterion) { black_box(cursor.next().await?); } - Ok(()) + Ok(start.elapsed()) } let mut group = c.benchmark_group("select"); group.throughput(Throughput::Bytes(mem::size_of::() as u64)); group.bench_function("no compression", |b| { b.iter_custom(|iters| { - let rt = Runtime::new().unwrap(); let client = Client::default() .with_url(format!("http://{addr}")) .with_compression(Compression::None); - let start = Instant::now(); - rt.block_on(run(client, iters)).unwrap(); - start.elapsed() + runner.run(run(client, iters)) }) }); #[cfg(feature = "lz4")] group.bench_function("lz4", |b| { b.iter_custom(|iters| { - let rt = Runtime::new().unwrap(); let client = Client::default() .with_url(format!("http://{addr}")) .with_compression(Compression::Lz4); - let start = Instant::now(); - rt.block_on(run(client, iters)).unwrap(); - start.elapsed() + runner.run(run(client, iters)) }) }); group.finish(); diff --git a/benches/select_numbers.rs b/benches/select_numbers.rs index d08f3ac..869d6ba 100644 --- a/benches/select_numbers.rs +++ b/benches/select_numbers.rs @@ -1,15 +1,24 @@ use serde::Deserialize; -use clickhouse::{error::Result, Client, Compression, Row}; +use clickhouse::{Client, Compression, Row}; #[derive(Row, Deserialize)] struct Data { no: u64, } -async fn bench() -> u64 { +async fn bench(name: &str, compression: Compression) { + let start = std::time::Instant::now(); + let (sum, dec_mbytes, rec_mbytes) = tokio::spawn(do_bench(compression)).await.unwrap(); + assert_eq!(sum, 124999999750000000); + let elapsed = start.elapsed(); + let throughput = dec_mbytes / elapsed.as_secs_f64(); + println!("{name:>8} {elapsed:>7.3?} {throughput:>4.0} MiB/s {rec_mbytes:>4.0} MiB"); +} + +async fn do_bench(compression: Compression) -> (u64, f64, f64) { let client = Client::default() - .with_compression(Compression::None) + .with_compression(compression) .with_url("http://localhost:8123"); let mut cursor = client @@ -22,15 +31,17 @@ async fn bench() -> u64 { sum += row.no; } - sum + let dec_bytes = cursor.decoded_bytes(); + let dec_mbytes = dec_bytes as f64 / 1024.0 / 1024.0; + let recv_bytes = cursor.received_bytes(); + let recv_mbytes = recv_bytes as f64 / 1024.0 / 1024.0; + (sum, dec_mbytes, recv_mbytes) } #[tokio::main] -async fn main() -> Result<()> { - println!("Started"); - let start = std::time::Instant::now(); - let sum = tokio::spawn(bench()).await.unwrap(); - let elapsed = start.elapsed(); - println!("Done: elapsed={elapsed:?} sum={sum}"); - Ok(()) +async fn main() { + println!("compress elapsed throughput received"); + bench("none", Compression::None).await; + #[cfg(feature = "lz4")] + bench("lz4", Compression::Lz4).await; } diff --git a/src/buflist.rs b/src/buflist.rs deleted file mode 100644 index cd29bc2..0000000 --- a/src/buflist.rs +++ /dev/null @@ -1,167 +0,0 @@ -use std::collections::VecDeque; - -use bytes::Buf; - -#[derive(Debug, Default)] -pub(crate) struct BufList { - next_buf: Option, - bufs: VecDeque, - rem: usize, - cursor: usize, -} - -impl BufList { - #[inline] - pub(crate) fn push(&mut self, buf: T) { - let rem = buf.remaining(); - if rem == 0 { - return; - } - - if self.next_buf.is_none() { - self.next_buf = Some(buf); - } else { - self.bufs.push_back(buf); - } - - self.rem += rem; - } - - #[inline] - pub(crate) fn bufs_cnt(&self) -> usize { - self.next_buf.is_some() as usize + self.bufs.len() - } - - #[inline] - pub(crate) fn commit(&mut self) { - while self.cursor > 0 { - let front = self.next_buf.as_mut().unwrap(); - let rem = front.remaining(); - - if rem > self.cursor { - front.advance(self.cursor); - self.cursor = 0; - } else { - front.advance(rem); - self.cursor -= rem; - self.next_buf = self.bufs.pop_front(); - } - } - } - - pub(crate) fn rollback(&mut self) { - self.rem += self.cursor; - self.cursor = 0; - } - - #[cold] - fn chunk_slow(&self) -> &[u8] { - let mut cnt = self.cursor - self.next_buf.as_ref().map_or(0, |b| b.chunk().len()); - - for buf in &self.bufs { - let bytes = buf.chunk(); - if bytes.len() > cnt { - return &bytes[cnt..]; - } - cnt -= bytes.len(); - } - - b"" - } -} - -impl Buf for BufList { - #[inline] - fn remaining(&self) -> usize { - self.rem - } - - #[inline] - fn chunk(&self) -> &[u8] { - if let Some(buf) = &self.next_buf { - let bytes = buf.chunk(); - if bytes.len() > self.cursor { - return &bytes[self.cursor..]; - } - } - - self.chunk_slow() - } - - #[inline] - fn advance(&mut self, cnt: usize) { - self.rem -= cnt; - self.cursor += cnt; - } -} - -#[test] -fn it_advances() { - let mut list = BufList::<&[_]>::default(); - list.push(&[1, 2, 3]); - list.push(&[]); - list.push(&[4, 5, 6]); - list.push(&[7, 8, 9]); - list.push(&[]); - - assert_eq!(list.bufs_cnt(), 3); - list.advance(1); - list.commit(); - assert_eq!(list.bufs_cnt(), 3); - list.advance(4); - list.commit(); - assert_eq!(list.bufs_cnt(), 2); - list.advance(4); - list.commit(); - assert_eq!(list.bufs_cnt(), 0); -} - -#[test] -fn it_copies_to_slice() { - let mut list = BufList::<&[_]>::default(); - list.push(&[1, 2, 3]); - list.push(&[]); - list.push(&[4, 5, 6]); - list.push(&[7, 8, 9]); - list.push(&[]); - assert_eq!(list.bufs_cnt(), 3); - - let mut result = vec![0; 9]; - - list.copy_to_slice(&mut result[0..1]); - list.commit(); - assert_eq!(result, [1, 0, 0, 0, 0, 0, 0, 0, 0]); - assert_eq!(list.bufs_cnt(), 3); - - list.copy_to_slice(&mut result[1..5]); - list.commit(); - assert_eq!(result, [1, 2, 3, 4, 5, 0, 0, 0, 0]); - assert_eq!(list.bufs_cnt(), 2); - - list.copy_to_slice(&mut result[5..]); - list.commit(); - assert_eq!(result, [1, 2, 3, 4, 5, 6, 7, 8, 9]); - assert_eq!(list.bufs_cnt(), 0); -} - -#[test] -fn it_does_rollback() { - let mut list = BufList::<&[_]>::default(); - list.push(&[1, 2, 3]); - list.push(&[4, 5]); - - let mut result = vec![0; 5]; - list.copy_to_slice(&mut result[0..2]); - assert_eq!(result, [1, 2, 0, 0, 0]); - list.commit(); - list.copy_to_slice(&mut result[2..4]); - assert_eq!(result, [1, 2, 3, 4, 0]); - list.copy_to_slice(&mut result[4..]); - assert_eq!(result, [1, 2, 3, 4, 5]); - - list.rollback(); - - let mut result = vec![0; 3]; - list.copy_to_slice(&mut result); - assert_eq!(result, [3, 4, 5]); -} diff --git a/src/bytes_ext.rs b/src/bytes_ext.rs new file mode 100644 index 0000000..1911468 --- /dev/null +++ b/src/bytes_ext.rs @@ -0,0 +1,80 @@ +use bytes::{Bytes, BytesMut}; + +#[derive(Default)] +pub(crate) struct BytesExt { + bytes: Bytes, + cursor: usize, +} + +impl BytesExt { + #[inline(always)] + pub(crate) fn slice(&self) -> &[u8] { + &self.bytes[self.cursor..] + } + + #[inline(always)] + pub(crate) fn remaining(&self) -> usize { + self.bytes.len() - self.cursor + } + + #[inline(always)] + pub(crate) fn set_remaining(&mut self, n: usize) { + // We can use `bytes.advance()` here, but it's slower. + self.cursor = self.bytes.len() - n; + } + + #[cfg(any(test, feature = "lz4", feature = "watch"))] + #[inline(always)] + pub(crate) fn advance(&mut self, n: usize) { + // We can use `bytes.advance()` here, but it's slower. + self.cursor += n; + } + + #[inline(always)] + pub(crate) fn extend(&mut self, chunk: Bytes) { + if self.cursor == self.bytes.len() { + // Most of the time, we read the next chunk after consuming the previous one. + self.bytes = chunk; + self.cursor = 0; + } else { + // Some bytes are left in the buffer, we need to merge them with the next chunk. + self.extend_slow(chunk); + } + } + + #[cold] + #[inline(never)] + fn extend_slow(&mut self, chunk: Bytes) { + let total = self.remaining() + chunk.len(); + let mut new_bytes = BytesMut::with_capacity(total); + let capacity = new_bytes.capacity(); + new_bytes.extend_from_slice(self.slice()); + new_bytes.extend_from_slice(&chunk); + debug_assert_eq!(new_bytes.capacity(), capacity); + self.bytes = new_bytes.freeze(); + self.cursor = 0; + } +} + +#[test] +fn it_works() { + let mut bytes = BytesExt::default(); + assert!(bytes.slice().is_empty()); + assert_eq!(bytes.remaining(), 0); + + bytes.extend(Bytes::from_static(b"hello")); + assert_eq!(bytes.slice(), b"hello"); + assert_eq!(bytes.remaining(), 5); + + bytes.advance(3); + assert_eq!(bytes.slice(), b"lo"); + assert_eq!(bytes.remaining(), 2); + + bytes.extend(Bytes::from_static(b"l")); + assert_eq!(bytes.slice(), b"lol"); + assert_eq!(bytes.remaining(), 3); + + bytes.set_remaining(1); + assert_eq!(bytes.slice(), b"l"); + assert_eq!(bytes.remaining(), 1); +} diff --git a/src/compression/lz4.rs b/src/compression/lz4.rs index 928a8d5..8704e31 100644 --- a/src/compression/lz4.rs +++ b/src/compression/lz4.rs @@ -9,37 +9,38 @@ use futures::{ready, stream::Stream}; use lz4_flex::block; use crate::{ - buflist::BufList, + bytes_ext::BytesExt, error::{Error, Result}, + response::Chunk, }; const MAX_COMPRESSED_SIZE: u32 = 1024 * 1024 * 1024; pub(crate) struct Lz4Decoder { stream: S, - chunks: BufList, + bytes: BytesExt, meta: Option, - buffer: Vec, } impl Stream for Lz4Decoder where S: Stream> + Unpin, { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let meta = loop { - let size = self.chunks.remaining(); - let required_size = self.meta.as_ref().map_or(LZ4_META_SIZE, |m| { - m.compressed_size as usize - LZ4_HEADER_SIZE - }); + let size = self.bytes.remaining(); + let required_size = self + .meta + .as_ref() + .map_or(LZ4_META_SIZE, Lz4Meta::total_size); if size < required_size { let stream = Pin::new(&mut self.stream); match ready!(stream.poll_next(cx)) { Some(Ok(chunk)) => { - self.chunks.push(chunk); + self.bytes.extend(chunk); continue; } Some(Err(err)) => return Some(Err(err)).into(), @@ -51,7 +52,7 @@ where } } - assert!(size >= required_size); + debug_assert!(size >= required_size); match self.meta.take() { Some(meta) => break meta, @@ -59,16 +60,18 @@ where }; }; - let bytes = self.read_data(meta)?; - self.chunks.commit(); - Poll::Ready(Some(Ok(bytes))) + let data = self.read_data(&meta)?; + let net_size = meta.total_size(); + self.bytes.advance(net_size); + + Poll::Ready(Some(Ok(Chunk { data, net_size }))) } } // Meta = checksum + header // - [16b] checksum // - [ 1b] magic number (0x82) -// - [ 4b] compressed size +// - [ 4b] compressed size (data + header) // - [ 4b] uncompressed size const LZ4_CHECKSUM_SIZE: usize = 16; const LZ4_HEADER_SIZE: usize = 9; @@ -82,11 +85,15 @@ struct Lz4Meta { } impl Lz4Meta { - fn read(mut buffer: impl Buf) -> Result { - let checksum = buffer.get_u128_le(); - let magic = buffer.get_u8(); - let compressed_size = buffer.get_u32_le(); - let uncompressed_size = buffer.get_u32_le(); + fn total_size(&self) -> usize { + LZ4_CHECKSUM_SIZE + self.compressed_size as usize + } + + fn read(mut bytes: &[u8]) -> Result { + let checksum = bytes.get_u128_le(); + let magic = bytes.get_u8(); + let compressed_size = bytes.get_u32_le(); + let uncompressed_size = bytes.get_u32_le(); if magic != LZ4_MAGIC { return Err(Error::Decompression("incorrect magic number".into())); @@ -103,11 +110,11 @@ impl Lz4Meta { }) } - fn write_checksum(&self, mut buffer: impl BufMut) { + fn write_checksum(&self, mut buffer: &mut [u8]) { buffer.put_u128_le(self.checksum); } - fn write_header(&self, mut buffer: impl BufMut) { + fn write_header(&self, mut buffer: &mut [u8]) { buffer.put_u8(LZ4_MAGIC); buffer.put_u32_le(self.compressed_size); buffer.put_u32_le(self.uncompressed_size); @@ -118,36 +125,28 @@ impl Lz4Decoder { pub(crate) fn new(stream: S) -> Self { Self { stream, - chunks: BufList::default(), + bytes: BytesExt::default(), meta: None, - buffer: Vec::new(), } } fn read_meta(&mut self) -> Result { - assert!(self.chunks.remaining() >= LZ4_META_SIZE); - Lz4Meta::read(&mut self.chunks) + Lz4Meta::read(self.bytes.slice()) } - fn read_data(&mut self, meta: Lz4Meta) -> Result { - assert!(self.chunks.remaining() >= meta.compressed_size as usize - LZ4_HEADER_SIZE); + fn read_data(&mut self, meta: &Lz4Meta) -> Result { + let total_size = meta.total_size(); + let bytes = &self.bytes.slice()[..total_size]; - self.buffer.resize(meta.compressed_size as usize, 0); - meta.write_header(&mut self.buffer[..]); - - // TODO: if we have a whole frame in one chunk, extra copying can be avoided. - self.chunks - .copy_to_slice(&mut self.buffer[LZ4_HEADER_SIZE..]); - - let actual_checksum = calc_checksum(&self.buffer); + let actual_checksum = calc_checksum(&bytes[LZ4_CHECKSUM_SIZE..]); if actual_checksum != meta.checksum { return Err(Error::Decompression("checksum mismatch".into())); } - let mut uncompressed = vec![0u8; meta.uncompressed_size as usize]; - let len = decompress(&self.buffer[LZ4_HEADER_SIZE..], &mut uncompressed)?; - debug_assert_eq!(len as u32, meta.uncompressed_size); + let uncompressed = block::decompress_size_prepended(&bytes[(LZ4_META_SIZE - 4)..]) + .map_err(|err| Error::Decompression(err.into()))?; + debug_assert_eq!(uncompressed.len() as u32, meta.uncompressed_size); Ok(uncompressed.into()) } } @@ -157,10 +156,6 @@ fn calc_checksum(buffer: &[u8]) -> u128 { hash.rotate_right(64) } -fn decompress(compressed: &[u8], uncompressed: &mut [u8]) -> Result { - block::decompress_into(compressed, uncompressed).map_err(|err| Error::Decompression(err.into())) -} - pub(crate) fn compress(uncompressed: &[u8]) -> Result { let max_compressed_size = block::get_maximum_output_size(uncompressed.len()); @@ -195,9 +190,12 @@ async fn it_decompresses() { ]; let source = vec![ - 245_u8, 5, 222, 235, 225, 158, 59, 108, 225, 31, 65, 215, 66, 66, 36, 92, 130, 34, 0, 0, 0, - 23, 0, 0, 0, 240, 8, 1, 0, 2, 255, 255, 255, 255, 0, 1, 1, 1, 115, 6, 83, 116, 114, 105, - 110, 103, 3, 97, 98, 99, + 245_u8, 5, 222, 235, 225, 158, 59, 108, 225, 31, 65, 215, 66, 66, 36, 92, // checksum + 0x82, // magic number + 34, 0, 0, 0, // compressed size (data + header) + 23, 0, 0, 0, // uncompressed size + 240, 8, 1, 0, 2, 255, 255, 255, 255, 0, 1, 1, 1, 115, 6, 83, 116, 114, 105, 110, 103, 3, + 97, 98, 99, ]; async fn test(chunks: &[&[u8]], expected: &[u8]) { @@ -209,8 +207,12 @@ async fn it_decompresses() { .collect::>(), ); let mut decoder = Lz4Decoder::new(stream); - let actual = decoder.try_next().await.unwrap(); - assert_eq!(actual.as_deref(), Some(expected)); + let actual = decoder.try_next().await.unwrap().unwrap(); + assert_eq!(actual.data, expected); + assert_eq!( + actual.net_size, + chunks.iter().map(|s| s.len()).sum::() + ); } // 1 chunk. diff --git a/src/cursor.rs b/src/cursor.rs index 9cf2f32..a58603c 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -5,123 +5,147 @@ use futures::TryStreamExt; use serde::Deserialize; use crate::{ - buflist::BufList, + bytes_ext::BytesExt, error::{Error, Result}, - response::Response, + response::{Chunks, Response, ResponseFuture}, rowbinary, }; -const INITIAL_BUFFER_SIZE: usize = 1024; - // === RawCursor === -struct RawCursor { - response: Response, - pending: BufList, +struct RawCursor(RawCursorInner); + +enum RawCursorInner { + Waiting(ResponseFuture), + Loading(RawCursorLoading), +} + +struct RawCursorLoading { + chunks: Chunks, + net_size: u64, + data_size: u64, } impl RawCursor { fn new(response: Response) -> Self { - Self { - response, - pending: BufList::default(), - } + Self(RawCursorInner::Waiting(response.into_future())) } - #[inline(always)] - async fn next( - &mut self, - mut f: impl FnMut(&mut BufList) -> ControlFlow, - ) -> Result> { - let chunks = if let Some(chunks) = self.response.chunks() { - chunks - } else { - self.response.chunks_slow().await? + async fn next(&mut self) -> Result> { + if matches!(self.0, RawCursorInner::Waiting(_)) { + self.resolve().await?; + } + + let state = match &mut self.0 { + RawCursorInner::Loading(state) => state, + RawCursorInner::Waiting(_) => unreachable!(), }; - loop { - match f(&mut self.pending) { - ControlFlow::Yield(value) => { - self.pending.commit(); - return Ok(Some(value)); - } - #[cfg(feature = "watch")] - ControlFlow::Skip => { - self.pending.commit(); - continue; - } - ControlFlow::Retry => { - self.pending.rollback(); - continue; - } - ControlFlow::Err(Error::NotEnoughData) => { - self.pending.rollback(); - } - ControlFlow::Err(err) => return Err(err), + match state.chunks.try_next().await { + Ok(Some(chunk)) => { + state.net_size += chunk.net_size as u64; + state.data_size += chunk.data.len() as u64; + Ok(Some(chunk.data)) } + Ok(None) => Ok(None), + Err(err) => Err(err), + } + } - match chunks.try_next().await? { - Some(chunk) => self.pending.push(chunk), - None if self.pending.bufs_cnt() > 0 => return Err(Error::NotEnoughData), - None => return Ok(None), - } + async fn resolve(&mut self) -> Result<()> { + if let RawCursorInner::Waiting(future) = &mut self.0 { + let chunks = future.await; + self.0 = RawCursorInner::Loading(RawCursorLoading { + chunks: chunks?, + net_size: 0, + data_size: 0, + }); + } + Ok(()) + } + + fn received_bytes(&self) -> u64 { + match &self.0 { + RawCursorInner::Waiting(_) => 0, + RawCursorInner::Loading(state) => state.net_size, } } -} -enum ControlFlow { - Yield(T), - #[cfg(feature = "watch")] - Skip, - Retry, - Err(Error), + fn decoded_bytes(&self) -> u64 { + match &self.0 { + RawCursorInner::Waiting(_) => 0, + RawCursorInner::Loading(state) => state.data_size, + } + } } // XXX: it was a workaround for https://github.com/rust-lang/rust/issues/51132, // but introduced #24 and must be fixed. -fn workaround_51132<'a, T>(ptr: &mut T) -> &'a mut T { +fn workaround_51132<'a, T: ?Sized>(ptr: &T) -> &'a T { // SAFETY: actually, it leads to unsoundness, see #24 - unsafe { &mut *(ptr as *mut T) } + unsafe { &*(ptr as *const T) } } -// === RowBinaryCursor === +// === RowCursor === -pub(crate) struct RowBinaryCursor { +/// A cursor that emits rows. +#[must_use] +pub struct RowCursor { raw: RawCursor, - buffer: Vec, + bytes: BytesExt, _marker: PhantomData, } -impl RowBinaryCursor { +impl RowCursor { pub(crate) fn new(response: Response) -> Self { Self { raw: RawCursor::new(response), - buffer: vec![0; INITIAL_BUFFER_SIZE], + bytes: BytesExt::default(), _marker: PhantomData, } } - pub(crate) async fn next<'a, 'b: 'a>(&'a mut self) -> Result> + /// Emits the next row. + /// + /// An result is unspecified if it's called after `Err` is returned. + pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result> where T: Deserialize<'b>, { - let buffer = &mut self.buffer; - - self.raw - .next(|pending| { - match rowbinary::deserialize_from(pending, &mut workaround_51132(buffer)[..]) { - Ok(value) => ControlFlow::Yield(value), - Err(Error::TooSmallBuffer(need)) => { - let new_len = (buffer.len() + need) - .checked_next_power_of_two() - .expect("oom"); - buffer.resize(new_len, 0); - ControlFlow::Retry - } - Err(err) => ControlFlow::Err(err), + loop { + let mut slice = workaround_51132(self.bytes.slice()); + + match rowbinary::deserialize_from(&mut slice) { + Ok(value) => { + self.bytes.set_remaining(slice.len()); + return Ok(Some(value)); } - }) - .await + Err(Error::NotEnoughData) => {} + Err(err) => return Err(err), + } + + match self.raw.next().await? { + Some(chunk) => self.bytes.extend(chunk), + None => return Ok(None), + } + } + } + + /// Returns the total size in bytes received from the CH server since + /// the cursor was created. + /// + /// This method counts only size without HTTP headers for now. + /// It can be changed in the future without notice. + #[inline] + pub fn received_bytes(&self) -> u64 { + self.raw.received_bytes() + } + + /// Returns the total size in bytes decompressed since the cursor was + /// created. + #[inline] + pub fn decoded_bytes(&self) -> u64 { + self.raw.decoded_bytes() } } @@ -130,6 +154,7 @@ impl RowBinaryCursor { #[cfg(feature = "watch")] pub(crate) struct JsonCursor { raw: RawCursor, + bytes: BytesExt, line: String, _marker: PhantomData, } @@ -146,10 +171,13 @@ enum JsonRow { #[cfg(feature = "watch")] impl JsonCursor { + const INITIAL_BUFFER_SIZE: usize = 1024; + pub(crate) fn new(response: Response) -> Self { Self { raw: RawCursor::new(response), - line: String::with_capacity(INITIAL_BUFFER_SIZE), + bytes: BytesExt::default(), + line: String::with_capacity(Self::INITIAL_BUFFER_SIZE), _marker: PhantomData, } } @@ -161,32 +189,28 @@ impl JsonCursor { use bytes::Buf; use std::io::BufRead; - let line = &mut self.line; - - self.raw - .next(|pending| { - line.clear(); - match pending.reader().read_line(line) { - Ok(_) => { - let line = workaround_51132(line); - if let Some(line) = line.strip_suffix('\n') { - match serde_json::from_str(line) { - Ok(JsonRow::Row(value)) => ControlFlow::Yield(value), - Ok(JsonRow::Progress { .. }) => ControlFlow::Skip, - // TODO: another reason? - Err(err) => ControlFlow::Err(Error::BadResponse(err.to_string())), - } - } else { - ControlFlow::Err(Error::NotEnoughData) - } - } - Err(err) => { - // Actually, it's an unreachable branch, because - // `bytes::buf::Reader` doesn't fail. - ControlFlow::Err(Error::Custom(err.to_string())) - } + loop { + self.line.clear(); + + let read = match self.bytes.slice().reader().read_line(&mut self.line) { + Ok(read) => read, + Err(err) => return Err(Error::Custom(err.to_string())), + }; + + if let Some(line) = self.line.strip_suffix('\n') { + self.bytes.advance(read); + + match serde_json::from_str(workaround_51132(line)) { + Ok(JsonRow::Row(value)) => return Ok(Some(value)), + Ok(JsonRow::Progress { .. }) => continue, + Err(err) => return Err(Error::BadResponse(err.to_string())), } - }) - .await + } + + match self.raw.next().await? { + Some(chunk) => self.bytes.extend(chunk), + None => return Ok(None), + } + } } } diff --git a/src/error.rs b/src/error.rs index 85e60d7..118b32b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -38,12 +38,6 @@ pub enum Error { BadResponse(String), #[error("timeout expired")] TimedOut, - - // Internally handled errors, not part of public API. - // XXX: move to another error? - #[error("internal error: too small buffer, need another {0} bytes")] - #[doc(hidden)] - TooSmallBuffer(usize), } assert_impl_all!(Error: StdError, Send, Sync); diff --git a/src/lib.rs b/src/lib.rs index 6df4a50..f1e4eb3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,7 +23,7 @@ pub mod test; #[cfg(feature = "watch")] pub mod watch; -mod buflist; +mod bytes_ext; mod compression; mod cursor; mod headers; diff --git a/src/query.rs b/src/query.rs index 926fdb5..107c1a3 100644 --- a/src/query.rs +++ b/src/query.rs @@ -3,10 +3,9 @@ use serde::Deserialize; use std::fmt::Display; use url::Url; -use crate::headers::with_request_headers; use crate::{ - cursor::RowBinaryCursor, error::{Error, Result}, + headers::with_request_headers, request_body::RequestBody, response::Response, row::Row, @@ -16,6 +15,8 @@ use crate::{ const MAX_QUERY_LEN_TO_USE_GET: usize = 8192; +pub use crate::cursor::RowCursor; + #[must_use] #[derive(Clone)] pub struct Query { @@ -86,7 +87,7 @@ impl Query { self.sql.append(" FORMAT RowBinary"); let response = self.do_execute(true)?; - Ok(RowCursor(RowBinaryCursor::new(response))) + Ok(RowCursor::new(response)) } /// Executes the query and returns just a single row. @@ -196,16 +197,3 @@ impl Query { self } } - -/// A cursor that emits rows. -pub struct RowCursor(RowBinaryCursor); - -impl RowCursor { - /// Emits the next row. - pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result> - where - T: Deserialize<'b>, - { - self.0.next().await - } -} diff --git a/src/response.rs b/src/response.rs index 4a8823a..b500ba6 100644 --- a/src/response.rs +++ b/src/response.rs @@ -15,7 +15,7 @@ use hyper::{ body::{Body as _, Incoming}, StatusCode, }; -use hyper_util::client::legacy::ResponseFuture; +use hyper_util::client::legacy::ResponseFuture as HyperResponseFuture; #[cfg(feature = "lz4")] use crate::compression::lz4::Lz4Decoder; @@ -29,13 +29,15 @@ use crate::{ pub(crate) enum Response { // Headers haven't been received yet. // `Box<_>` improves performance by reducing the size of the whole future. - Waiting(Pin> + Send>>), + Waiting(ResponseFuture), // Headers have been received, streaming the body. Loading(Chunks), } +pub(crate) type ResponseFuture = Pin> + Send>>; + impl Response { - pub(crate) fn new(response: ResponseFuture, compression: Compression) -> Self { + pub(crate) fn new(response: HyperResponseFuture, compression: Compression) -> Self { Self::Waiting(Box::pin(async move { let response = response.await?; let status = response.status(); @@ -52,27 +54,21 @@ impl Response { })) } - #[inline] - pub(crate) fn chunks(&mut self) -> Option<&mut Chunks> { + pub(crate) fn into_future(self) -> ResponseFuture { match self { - Self::Waiting(_) => None, - Self::Loading(chunks) => Some(chunks), + Self::Waiting(future) => future, + Self::Loading(_) => panic!("response is already streaming"), } } - #[cold] - #[inline(never)] - pub(crate) async fn chunks_slow(&mut self) -> Result<&mut Chunks> { - loop { + pub(crate) async fn finish(&mut self) -> Result<()> { + let chunks = loop { match self { Self::Waiting(future) => *self = Self::Loading(future.await?), - Self::Loading(chunks) => break Ok(chunks), + Self::Loading(chunks) => break chunks, } - } - } + }; - pub(crate) async fn finish(&mut self) -> Result<()> { - let chunks = self.chunks_slow().await?; while chunks.try_next().await?.is_some() {} Ok(()) } @@ -99,7 +95,7 @@ async fn collect_bad_response( // Try to decompress the body, because CH uses compression even for errors. let stream = stream::once(future::ready(Result::<_>::Ok(raw_bytes.slice(..)))); - let stream = Decompress::new(stream, compression); + let stream = Decompress::new(stream, compression).map_ok(|chunk| chunk.data); // We're collecting already fetched chunks, thus only decompression errors can // be here. If decompression is failed, we should try the raw body because @@ -138,6 +134,11 @@ fn stringify_status(status: StatusCode) -> String { // === Chunks === +pub(crate) struct Chunk { + pub(crate) data: Bytes, + pub(crate) net_size: usize, +} + // * Uses `Option<_>` to make this stream fused. // * Uses `Box<_>` in order to reduce the size of cursors. pub(crate) struct Chunks(Option>>>); @@ -152,7 +153,7 @@ impl Chunks { } impl Stream for Chunks { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // `take()` prevents from use after caught panic. @@ -224,11 +225,17 @@ impl Stream for Decompress where S: Stream> + Unpin, { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &mut *self { - Self::Plain(stream) => Pin::new(stream).poll_next(cx).map_err(Into::into), + Self::Plain(stream) => Pin::new(stream) + .poll_next(cx) + .map_ok(|bytes| Chunk { + net_size: bytes.len(), + data: bytes, + }) + .map_err(Into::into), #[cfg(feature = "lz4")] Self::Lz4(stream) => Pin::new(stream).poll_next(cx), } @@ -250,9 +257,9 @@ impl DetectDbException { impl Stream for DetectDbException where - S: Stream> + Unpin, + S: Stream> + Unpin, { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &mut *self { @@ -260,11 +267,10 @@ where let mut res = Pin::new(stream).poll_next(cx); if let Poll::Ready(Some(Ok(chunk))) = &mut res { - if let Some(err) = extract_exception(chunk) { + if let Some(err) = extract_exception(&mut chunk.data) { *self = Self::Exception(Some(err)); - // NOTE: now `chunk` can be empty, but it's ok for - // callers. + // NOTE: `chunk` can be empty, but it's ok for callers. } } diff --git a/src/rowbinary/de.rs b/src/rowbinary/de.rs index 202ec88..b0d56c8 100644 --- a/src/rowbinary/de.rs +++ b/src/rowbinary/de.rs @@ -8,43 +8,32 @@ use serde::{ use crate::error::{Error, Result}; -/// Deserializes a value from `buffer` with a row encoded in `RowBinary`. -pub(crate) fn deserialize_from<'de, T: Deserialize<'de>>( - input: impl Buf, - temp_buf: &'de mut [u8], -) -> Result { - let mut deserializer = RowBinaryDeserializer { input, temp_buf }; +/// Deserializes a value from `input` with a row encoded in `RowBinary`. +/// +/// It accepts _a reference to_ a byte slice because it somehow leads to a more +/// performant generated code than `(&[u8]) -> Result<(T, usize)>` and even +/// `(&[u8], &mut Option) -> Result`. +pub(crate) fn deserialize_from<'data, T: Deserialize<'data>>(input: &mut &'data [u8]) -> Result { + let mut deserializer = RowBinaryDeserializer { input }; T::deserialize(&mut deserializer) } /// A deserializer for the RowBinary format. /// /// See https://clickhouse.com/docs/en/interfaces/formats#rowbinary for details. -struct RowBinaryDeserializer<'de, B> { - input: B, - temp_buf: &'de mut [u8], +struct RowBinaryDeserializer<'cursor, 'data> { + input: &'cursor mut &'data [u8], } -impl<'de, B: Buf> RowBinaryDeserializer<'de, B> { +impl<'cursor, 'data> RowBinaryDeserializer<'cursor, 'data> { fn read_vec(&mut self, size: usize) -> Result> { - ensure_size(&mut self.input, size)?; - let mut vec = vec![0; size]; - self.input.copy_to_slice(&mut vec[..]); - Ok(vec) + Ok(self.read_slice(size)?.to_vec()) } - fn read_slice(&mut self, size: usize) -> Result<&'de [u8]> { + fn read_slice(&mut self, size: usize) -> Result<&'data [u8]> { ensure_size(&mut self.input, size)?; - - if self.temp_buf.len() < size { - return Err(Error::TooSmallBuffer(size - self.temp_buf.len())); - } - - let temp_buf = mem::take(&mut self.temp_buf); - let (slice, rest) = temp_buf.split_at_mut(size); - self.temp_buf = rest; - self.input.copy_to_slice(slice); - + let slice = &self.input[..size]; + self.input.advance(size); Ok(slice) } @@ -67,7 +56,7 @@ fn ensure_size(buffer: impl Buf, size: usize) -> Result<()> { macro_rules! impl_num { ($ty:ty, $deser_method:ident, $visitor_method:ident, $reader_method:ident) => { #[inline] - fn $deser_method>(self, visitor: V) -> Result { + fn $deser_method>(self, visitor: V) -> Result { ensure_size(&mut self.input, mem::size_of::<$ty>())?; let value = self.input.$reader_method(); visitor.$visitor_method(value) @@ -75,7 +64,7 @@ macro_rules! impl_num { }; } -impl<'de, 'a, B: Buf> Deserializer<'de> for &'a mut RowBinaryDeserializer<'de, B> { +impl<'cursor, 'data> Deserializer<'data> for &mut RowBinaryDeserializer<'cursor, 'data> { type Error = Error; impl_num!(i8, deserialize_i8, visit_i8, get_i8); @@ -103,23 +92,23 @@ impl<'de, 'a, B: Buf> Deserializer<'de> for &'a mut RowBinaryDeserializer<'de, B impl_num!(f64, deserialize_f64, visit_f64, get_f64_le); #[inline] - fn deserialize_any>(self, _: V) -> Result { + fn deserialize_any>(self, _: V) -> Result { Err(Error::DeserializeAnyNotSupported) } #[inline] - fn deserialize_unit>(self, visitor: V) -> Result { + fn deserialize_unit>(self, visitor: V) -> Result { // TODO: revise this. visitor.visit_unit() } #[inline] - fn deserialize_char>(self, _: V) -> Result { + fn deserialize_char>(self, _: V) -> Result { panic!("character types are unsupported: `char`"); } #[inline] - fn deserialize_bool>(self, visitor: V) -> Result { + fn deserialize_bool>(self, visitor: V) -> Result { ensure_size(&mut self.input, 1)?; match self.input.get_u8() { 0 => visitor.visit_bool(false), @@ -129,7 +118,7 @@ impl<'de, 'a, B: Buf> Deserializer<'de> for &'a mut RowBinaryDeserializer<'de, B } #[inline] - fn deserialize_str>(self, visitor: V) -> Result { + fn deserialize_str>(self, visitor: V) -> Result { let size = self.read_size()?; let slice = self.read_slice(size)?; let str = str::from_utf8(slice).map_err(Error::from)?; @@ -137,7 +126,7 @@ impl<'de, 'a, B: Buf> Deserializer<'de> for &'a mut RowBinaryDeserializer<'de, B } #[inline] - fn deserialize_string>(self, visitor: V) -> Result { + fn deserialize_string>(self, visitor: V) -> Result { let size = self.read_size()?; let vec = self.read_vec(size)?; let string = String::from_utf8(vec).map_err(|err| Error::from(err.utf8_error()))?; @@ -145,20 +134,20 @@ impl<'de, 'a, B: Buf> Deserializer<'de> for &'a mut RowBinaryDeserializer<'de, B } #[inline] - fn deserialize_bytes>(self, visitor: V) -> Result { + fn deserialize_bytes>(self, visitor: V) -> Result { let size = self.read_size()?; let slice = self.read_slice(size)?; visitor.visit_borrowed_bytes(slice) } #[inline] - fn deserialize_byte_buf>(self, visitor: V) -> Result { + fn deserialize_byte_buf>(self, visitor: V) -> Result { let size = self.read_size()?; visitor.visit_byte_buf(self.read_vec(size)?) } #[inline] - fn deserialize_enum>( + fn deserialize_enum>( self, name: &'static str, _variants: &'static [&'static str], @@ -168,18 +157,18 @@ impl<'de, 'a, B: Buf> Deserializer<'de> for &'a mut RowBinaryDeserializer<'de, B } #[inline] - fn deserialize_tuple>(self, len: usize, visitor: V) -> Result { - struct Access<'de, 'a, B> { - deserializer: &'a mut RowBinaryDeserializer<'de, B>, + fn deserialize_tuple>(self, len: usize, visitor: V) -> Result { + struct Access<'de, 'cursor, 'data> { + deserializer: &'de mut RowBinaryDeserializer<'cursor, 'data>, len: usize, } - impl<'de, 'a, B: Buf> SeqAccess<'de> for Access<'de, 'a, B> { + impl<'de, 'cursor, 'data> SeqAccess<'data> for Access<'de, 'cursor, 'data> { type Error = Error; fn next_element_seed(&mut self, seed: T) -> Result> where - T: DeserializeSeed<'de>, + T: DeserializeSeed<'data>, { if self.len > 0 { self.len -= 1; @@ -202,7 +191,7 @@ impl<'de, 'a, B: Buf> Deserializer<'de> for &'a mut RowBinaryDeserializer<'de, B } #[inline] - fn deserialize_option>(self, visitor: V) -> Result { + fn deserialize_option>(self, visitor: V) -> Result { ensure_size(&mut self.input, 1)?; match self.input.get_u8() { @@ -213,18 +202,18 @@ impl<'de, 'a, B: Buf> Deserializer<'de> for &'a mut RowBinaryDeserializer<'de, B } #[inline] - fn deserialize_seq>(self, visitor: V) -> Result { + fn deserialize_seq>(self, visitor: V) -> Result { let len = self.read_size()?; self.deserialize_tuple(len, visitor) } #[inline] - fn deserialize_map>(self, _visitor: V) -> Result { + fn deserialize_map>(self, _visitor: V) -> Result { panic!("maps are unsupported, use `Vec<(A, B)>` instead"); } #[inline] - fn deserialize_struct>( + fn deserialize_struct>( self, _name: &str, fields: &'static [&'static str], @@ -234,12 +223,12 @@ impl<'de, 'a, B: Buf> Deserializer<'de> for &'a mut RowBinaryDeserializer<'de, B } #[inline] - fn deserialize_identifier>(self, _visitor: V) -> Result { + fn deserialize_identifier>(self, _visitor: V) -> Result { panic!("identifiers are unsupported"); } #[inline] - fn deserialize_newtype_struct>( + fn deserialize_newtype_struct>( self, _name: &str, visitor: V, @@ -248,7 +237,7 @@ impl<'de, 'a, B: Buf> Deserializer<'de> for &'a mut RowBinaryDeserializer<'de, B } #[inline] - fn deserialize_unit_struct>( + fn deserialize_unit_struct>( self, name: &'static str, _visitor: V, @@ -257,7 +246,7 @@ impl<'de, 'a, B: Buf> Deserializer<'de> for &'a mut RowBinaryDeserializer<'de, B } #[inline] - fn deserialize_tuple_struct>( + fn deserialize_tuple_struct>( self, name: &'static str, _len: usize, @@ -267,7 +256,7 @@ impl<'de, 'a, B: Buf> Deserializer<'de> for &'a mut RowBinaryDeserializer<'de, B } #[inline] - fn deserialize_ignored_any>(self, _visitor: V) -> Result { + fn deserialize_ignored_any>(self, _visitor: V) -> Result { panic!("ignored types are unsupported"); } diff --git a/src/rowbinary/tests.rs b/src/rowbinary/tests.rs index e7c89d4..2865cbe 100644 --- a/src/rowbinary/tests.rs +++ b/src/rowbinary/tests.rs @@ -116,20 +116,16 @@ fn it_serializes() { #[test] fn it_deserializes() { - use bytes::buf::Buf; - let input = sample_serialized(); - let mut temp_buf = [0; 1024]; for i in 0..input.len() { - let (left, right) = input.split_at(i); + let (mut left, mut right) = input.split_at(i); // It shouldn't panic. - let _: Result, _> = super::deserialize_from(left, &mut temp_buf); - let _: Result, _> = super::deserialize_from(right, &mut temp_buf); + let _: Result, _> = super::deserialize_from(&mut left); + let _: Result, _> = super::deserialize_from(&mut right); - let buf = left.chain(right); - let actual: Sample<'_> = super::deserialize_from(buf, &mut temp_buf).unwrap(); + let actual: Sample<'_> = super::deserialize_from(&mut input.as_slice()).unwrap(); assert_eq!(actual, sample()); } } diff --git a/src/test/handlers.rs b/src/test/handlers.rs index a06d96c..8da4b0e 100644 --- a/src/test/handlers.rs +++ b/src/test/handlers.rs @@ -1,6 +1,6 @@ use std::marker::PhantomData; -use bytes::{Buf, Bytes}; +use bytes::Bytes; use futures::channel::oneshot; use hyper::{Request, Response, StatusCode}; use sealed::sealed; @@ -88,12 +88,12 @@ where where C: Default + Extend, { - let mut buffer = self.rx.await.expect("query canceled"); + let bytes = self.rx.await.expect("query canceled"); + let slice = &mut (&bytes[..]); let mut result = C::default(); - while buffer.has_remaining() { - let row: T = - rowbinary::deserialize_from(&mut buffer, &mut []).expect("failed to deserialize"); + while !slice.is_empty() { + let row: T = rowbinary::deserialize_from(slice).expect("failed to deserialize"); result.extend(std::iter::once(row)); } diff --git a/src/watch.rs b/src/watch.rs index 8833c13..1914399 100644 --- a/src/watch.rs +++ b/src/watch.rs @@ -158,6 +158,8 @@ impl Row for EventPayload { impl EventCursor { /// Emits the next version. + /// + /// An result is unspecified if it's called after `Err` is returned. pub async fn next(&mut self) -> Result> { Ok(self.0.next().await?.map(|payload| payload.version)) } @@ -181,6 +183,8 @@ impl Row for RowPayload { impl RowCursor { /// Emits the next row. + /// + /// An result is unspecified if it's called after `Err` is returned. pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result> where T: Deserialize<'b> + Row, diff --git a/tests/it/compression.rs b/tests/it/compression.rs index c0c1be1..9f826c4 100644 --- a/tests/it/compression.rs +++ b/tests/it/compression.rs @@ -1,36 +1,20 @@ -use serde::{Deserialize, Serialize}; +use clickhouse::{Client, Compression}; -use clickhouse::{Client, Compression, Row}; +use crate::{create_simple_table, SimpleRow}; async fn check(client: Client) { - #[derive(Debug, Row, Serialize, Deserialize)] - struct MyRow<'a> { - no: u32, - name: &'a str, - } - - client - .query( - " - CREATE TABLE test(no UInt32, name LowCardinality(String)) - ENGINE = MergeTree - ORDER BY no - ", - ) - .execute() - .await - .unwrap(); + create_simple_table(&client, "test").await; let mut insert = client.insert("test").unwrap(); for i in 0..200_000 { - insert.write(&MyRow { no: i, name: "foo" }).await.unwrap(); + insert.write(&SimpleRow::new(i, "foo")).await.unwrap(); } insert.end().await.unwrap(); // Check data. let (sum_no, sum_len) = client - .query("SELECT sum(no), sum(length(name)) FROM test") + .query("SELECT sum(id), sum(length(data)) FROM test") .fetch_one::<(u64, u64)>() .await .unwrap(); diff --git a/tests/it/cursor_stats.rs b/tests/it/cursor_stats.rs new file mode 100644 index 0000000..7ae43bd --- /dev/null +++ b/tests/it/cursor_stats.rs @@ -0,0 +1,51 @@ +use clickhouse::{Client, Compression}; + +use crate::{create_simple_table, SimpleRow}; + +async fn check(client: Client, expected_ratio: f64) { + create_simple_table(&client, "test").await; + + let mut insert = client.insert("test").unwrap(); + for i in 0..1_000 { + insert.write(&SimpleRow::new(i, "foobar")).await.unwrap(); + } + insert.end().await.unwrap(); + + let mut cursor = client + .query("SELECT * FROM test") + .fetch::() + .unwrap(); + + let mut received = cursor.received_bytes(); + let mut decoded = cursor.decoded_bytes(); + assert_eq!(received, 0); + assert_eq!(decoded, 0); + + while cursor.next().await.unwrap().is_some() { + assert!(cursor.received_bytes() >= received); + assert!(cursor.decoded_bytes() >= decoded); + received = cursor.received_bytes(); + decoded = cursor.decoded_bytes(); + } + + assert_eq!(decoded, 15000); + assert_eq!(cursor.received_bytes(), dbg!(received)); + assert_eq!(cursor.decoded_bytes(), dbg!(decoded)); + assert_eq!( + (decoded as f64 / received as f64 * 10.).round() / 10., + expected_ratio + ); +} + +#[tokio::test] +async fn none() { + let client = prepare_database!().with_compression(Compression::None); + check(client, 1.0).await; +} + +#[cfg(feature = "lz4")] +#[tokio::test] +async fn lz4() { + let client = prepare_database!().with_compression(Compression::Lz4); + check(client, 3.7).await; +} diff --git a/tests/it/main.rs b/tests/it/main.rs index 647f620..3efe24f 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -56,6 +56,7 @@ async fn flush_query_log(client: &Client) { mod compression; mod cursor_error; +mod cursor_stats; mod insert; mod inserter; mod ip;