Skip to content

Commit 0d92ce2

Browse files
committed
perf(query): improve performance
1 parent 3ac194e commit 0d92ce2

File tree

6 files changed

+95
-16
lines changed

6 files changed

+95
-16
lines changed

Diff for: CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
<!-- next-header -->
88

99
## [Unreleased] - ReleaseDate
10+
### Added
11+
- query: `Query::fetch_optional()`.
12+
13+
### Changed
14+
- query: increase performance up to 40%.
1015

1116
## [0.11.3] - 2023-02-19
1217
### Added

Diff for: Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ rust-version = "1.60"
1414
all-features = true
1515
rustdoc-args = ["--cfg", "docsrs"]
1616

17+
[[bench]]
18+
name = "select_numbers"
19+
harness = false
20+
1721
[[bench]]
1822
name = "insert"
1923
harness = false

Diff for: benches/select_numbers.rs

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use serde::Deserialize;
2+
3+
use clickhouse::{error::Result, Client, Compression, Row};
4+
5+
#[derive(Row, Deserialize)]
6+
struct Data {
7+
no: u64,
8+
}
9+
10+
async fn bench() -> u64 {
11+
let client = Client::default()
12+
.with_compression(Compression::None)
13+
.with_url("http://localhost:8123");
14+
15+
let mut cursor = client
16+
.query("SELECT number FROM system.numbers_mt LIMIT 500000000")
17+
.fetch::<Data>()
18+
.unwrap();
19+
20+
let mut sum = 0;
21+
while let Some(row) = cursor.next().await.unwrap() {
22+
sum += row.no;
23+
}
24+
25+
sum
26+
}
27+
28+
#[tokio::main]
29+
async fn main() -> Result<()> {
30+
println!("Started");
31+
let start = std::time::Instant::now();
32+
let sum = tokio::spawn(bench()).await.unwrap();
33+
let elapsed = start.elapsed();
34+
println!("Done: elapsed={elapsed:?} sum={sum}");
35+
Ok(())
36+
}

Diff for: src/buflist.rs

+33-13
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ use std::collections::VecDeque;
22

33
use bytes::Buf;
44

5-
#[derive(Default)]
5+
#[derive(Debug, Default)]
66
pub(crate) struct BufList<T> {
7+
next_buf: Option<T>,
78
bufs: VecDeque<T>,
89
rem: usize,
910
cursor: usize,
@@ -13,21 +14,28 @@ impl<T: Buf> BufList<T> {
1314
#[inline]
1415
pub(crate) fn push(&mut self, buf: T) {
1516
let rem = buf.remaining();
17+
if rem == 0 {
18+
return;
19+
}
1620

17-
if rem > 0 {
21+
if self.next_buf.is_none() {
22+
self.next_buf = Some(buf);
23+
} else {
1824
self.bufs.push_back(buf);
19-
self.rem += rem;
2025
}
26+
27+
self.rem += rem;
2128
}
2229

2330
#[inline]
2431
pub(crate) fn bufs_cnt(&self) -> usize {
25-
self.bufs.len()
32+
self.next_buf.is_some() as usize + self.bufs.len()
2633
}
2734

35+
#[inline]
2836
pub(crate) fn commit(&mut self) {
2937
while self.cursor > 0 {
30-
let front = &mut self.bufs[0];
38+
let front = self.next_buf.as_mut().unwrap();
3139
let rem = front.remaining();
3240

3341
if rem > self.cursor {
@@ -36,7 +44,7 @@ impl<T: Buf> BufList<T> {
3644
} else {
3745
front.advance(rem);
3846
self.cursor -= rem;
39-
self.bufs.pop_front();
47+
self.next_buf = self.bufs.pop_front();
4048
}
4149
}
4250
}
@@ -45,6 +53,21 @@ impl<T: Buf> BufList<T> {
4553
self.rem += self.cursor;
4654
self.cursor = 0;
4755
}
56+
57+
#[cold]
58+
fn chunk_slow(&self) -> &[u8] {
59+
let mut cnt = self.cursor - self.next_buf.as_ref().map_or(0, |b| b.chunk().len());
60+
61+
for buf in &self.bufs {
62+
let bytes = buf.chunk();
63+
if bytes.len() > cnt {
64+
return &bytes[cnt..];
65+
}
66+
cnt -= bytes.len();
67+
}
68+
69+
b""
70+
}
4871
}
4972

5073
impl<T: Buf> Buf for BufList<T> {
@@ -55,17 +78,14 @@ impl<T: Buf> Buf for BufList<T> {
5578

5679
#[inline]
5780
fn chunk(&self) -> &[u8] {
58-
let mut cnt = self.cursor;
59-
60-
for buf in &self.bufs {
81+
if let Some(buf) = &self.next_buf {
6182
let bytes = buf.chunk();
62-
if bytes.len() > cnt {
63-
return &bytes[cnt..];
83+
if bytes.len() > self.cursor {
84+
return &bytes[self.cursor..];
6485
}
65-
cnt -= bytes.len();
6686
}
6787

68-
b""
88+
self.chunk_slow()
6989
}
7090

7191
#[inline]

Diff for: src/cursor.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ impl RawCursor {
3333
&mut self,
3434
mut f: impl FnMut(&mut BufList<Bytes>) -> ControlFlow<T>,
3535
) -> Result<Option<T>> {
36-
let chunks = self.response.chunks().await?;
36+
let chunks = if let Some(chunks) = self.response.chunks() {
37+
chunks
38+
} else {
39+
self.response.chunks_slow().await?
40+
};
3741

3842
loop {
3943
match f(&mut self.pending) {

Diff for: src/response.rs

+12-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,17 @@ impl Response {
4040
}))
4141
}
4242

43-
pub(crate) async fn chunks(&mut self) -> Result<&mut Chunks<Body>> {
43+
#[inline]
44+
pub(crate) fn chunks(&mut self) -> Option<&mut Chunks<Body>> {
45+
match self {
46+
Self::Waiting(_) => None,
47+
Self::Loading(chunks) => Some(chunks),
48+
}
49+
}
50+
51+
#[cold]
52+
#[inline(never)]
53+
pub(crate) async fn chunks_slow(&mut self) -> Result<&mut Chunks<Body>> {
4454
loop {
4555
match self {
4656
Self::Waiting(future) => *self = Self::Loading(future.await?),
@@ -50,7 +60,7 @@ impl Response {
5060
}
5161

5262
pub(crate) async fn finish(&mut self) -> Result<()> {
53-
let chunks = self.chunks().await?;
63+
let chunks = self.chunks_slow().await?;
5464
while chunks.try_next().await?.is_some() {}
5565
Ok(())
5666
}

0 commit comments

Comments
 (0)