Skip to content

Commit 6bfcf50

Browse files
authored
Merge branch 'ClickHouse:main' into development
2 parents b12ad8f + 93bfe03 commit 6bfcf50

File tree

5 files changed

+165
-34
lines changed

5 files changed

+165
-34
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ undocumented_unsafe_blocks = "warn"
3737
all-features = true
3838
rustdoc-args = ["--cfg", "docsrs"]
3939

40+
[[bench]]
41+
name = "select_market_data"
42+
harness = false
43+
4044
[[bench]]
4145
name = "select_nyc_taxi_data"
4246
harness = false

benches/common_select.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ macro_rules! impl_benchmark_row_no_access_type {
3535
($type:ty, $id_field:ident) => {
3636
impl WithId for $type {
3737
fn id(&self) -> u64 {
38-
self.$id_field
38+
self.$id_field as u64
3939
}
4040
}
4141

benches/mocked_insert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ where
9595

9696
let mut group = c.benchmark_group(name);
9797
group.throughput(Throughput::Bytes(mem::size_of::<SomeRow>() as u64));
98-
group.bench_function("no compression", |b| {
98+
group.bench_function("uncompressed", |b| {
9999
b.iter_custom(|iters| {
100100
let client = Client::default()
101101
.with_url(format!("http://{addr}"))

benches/mocked_select.rs

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ mod common;
2121
async fn serve(
2222
request: Request<Incoming>,
2323
compression: Compression,
24+
use_rbwnat: bool,
2425
) -> Response<impl Body<Data = Bytes, Error = Infallible>> {
2526
common::skip_incoming(request).await;
2627

27-
let write_schema = async move {
28+
let maybe_schema = if use_rbwnat {
2829
let schema = vec![
2930
Column::new("a".to_string(), DataTypeNode::UInt64),
3031
Column::new("b".to_string(), DataTypeNode::Int64),
@@ -42,12 +43,15 @@ async fn serve(
4243
_ => unreachable!(),
4344
};
4445

45-
Ok(Frame::data(buffer))
46+
Some(buffer)
47+
} else {
48+
None
4649
};
4750

48-
let chunk = prepare_chunk();
49-
let stream =
50-
stream::once(write_schema).chain(stream::repeat(chunk).map(|chunk| Ok(Frame::data(chunk))));
51+
let stream = stream::iter(maybe_schema)
52+
.chain(stream::repeat(prepare_chunk()))
53+
.map(|chunk| Ok(Frame::data(chunk)));
54+
5155
Response::new(StreamBody::new(stream))
5256
}
5357

@@ -75,8 +79,8 @@ fn prepare_chunk() -> Bytes {
7579
const ADDR: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 6523));
7680

7781
fn select(c: &mut Criterion) {
78-
async fn start_server(compression: Compression) -> common::ServerHandle {
79-
common::start_server(ADDR, move |req| serve(req, compression)).await
82+
async fn start_server(compression: Compression, use_rbwnat: bool) -> common::ServerHandle {
83+
common::start_server(ADDR, move |req| serve(req, compression, use_rbwnat)).await
8084
}
8185

8286
let runner = common::start_runner();
@@ -89,8 +93,16 @@ fn select(c: &mut Criterion) {
8993
d: u32,
9094
}
9195

92-
async fn select_rows(client: Client, iters: u64, compression: Compression) -> Result<Duration> {
93-
let _server = start_server(compression).await;
96+
async fn select_rows(
97+
client: Client,
98+
iters: u64,
99+
compression: Compression,
100+
validation: bool,
101+
) -> Result<Duration> {
102+
let client = client
103+
.with_compression(compression)
104+
.with_validation(validation);
105+
let _server = start_server(compression, validation).await;
94106

95107
let mut sum = SomeRow::default();
96108
let start = Instant::now();
@@ -119,7 +131,8 @@ fn select(c: &mut Criterion) {
119131
min_size: u64,
120132
compression: Compression,
121133
) -> Result<Duration> {
122-
let _server = start_server(compression).await;
134+
let client = client.with_compression(compression);
135+
let _server = start_server(compression, false).await;
123136

124137
let start = Instant::now();
125138
let mut cursor = client
@@ -137,23 +150,30 @@ fn select(c: &mut Criterion) {
137150

138151
let mut group = c.benchmark_group("rows");
139152
group.throughput(Throughput::Bytes(size_of::<SomeRow>() as u64));
140-
group.bench_function("uncompressed", |b| {
153+
group.bench_function("validation=off/uncompressed", |b| {
141154
b.iter_custom(|iters| {
142-
let compression = Compression::None;
143-
let client = Client::default()
144-
.with_url(format!("http://{ADDR}"))
145-
.with_compression(compression);
146-
runner.run(select_rows(client, iters, compression))
155+
let client = Client::default().with_url(format!("http://{ADDR}"));
156+
runner.run(select_rows(client, iters, Compression::None, false))
147157
})
148158
});
149159
#[cfg(feature = "lz4")]
150-
group.bench_function("lz4", |b| {
160+
group.bench_function("validation=off/lz4", |b| {
161+
b.iter_custom(|iters| {
162+
let client = Client::default().with_url(format!("http://{ADDR}"));
163+
runner.run(select_rows(client, iters, Compression::Lz4, false))
164+
})
165+
});
166+
group.bench_function("validation=on/uncompressed", |b| {
167+
b.iter_custom(|iters| {
168+
let client = Client::default().with_url(format!("http://{ADDR}"));
169+
runner.run(select_rows(client, iters, Compression::None, true))
170+
})
171+
});
172+
#[cfg(feature = "lz4")]
173+
group.bench_function("validation=on/lz4", |b| {
151174
b.iter_custom(|iters| {
152-
let compression = Compression::Lz4;
153-
let client = Client::default()
154-
.with_url(format!("http://{ADDR}"))
155-
.with_compression(compression);
156-
runner.run(select_rows(client, iters, compression))
175+
let client = Client::default().with_url(format!("http://{ADDR}"));
176+
runner.run(select_rows(client, iters, Compression::Lz4, true))
157177
})
158178
});
159179
group.finish();
@@ -163,21 +183,15 @@ fn select(c: &mut Criterion) {
163183
group.throughput(Throughput::Bytes(MIB));
164184
group.bench_function("uncompressed", |b| {
165185
b.iter_custom(|iters| {
166-
let compression = Compression::None;
167-
let client = Client::default()
168-
.with_url(format!("http://{ADDR}"))
169-
.with_compression(compression);
170-
runner.run(select_bytes(client, iters * MIB, compression))
186+
let client = Client::default().with_url(format!("http://{ADDR}"));
187+
runner.run(select_bytes(client, iters * MIB, Compression::None))
171188
})
172189
});
173190
#[cfg(feature = "lz4")]
174191
group.bench_function("lz4", |b| {
175192
b.iter_custom(|iters| {
176-
let compression = Compression::None;
177-
let client = Client::default()
178-
.with_url(format!("http://{ADDR}"))
179-
.with_compression(compression);
180-
runner.run(select_bytes(client, iters * MIB, compression))
193+
let client = Client::default().with_url(format!("http://{ADDR}"));
194+
runner.run(select_bytes(client, iters * MIB, Compression::Lz4))
181195
})
182196
});
183197
group.finish();

benches/select_market_data.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use crate::common_select::{
2+
do_select_bench, print_header, print_results, BenchmarkRow, WithAccessType, WithId,
3+
};
4+
use clickhouse::{Client, Compression, Row};
5+
use serde::{Deserialize, Serialize};
6+
use serde_repr::{Deserialize_repr, Serialize_repr};
7+
8+
mod common_select;
9+
10+
#[derive(Row, Serialize, Deserialize)]
11+
struct L2Update {
12+
instrument_id: u32,
13+
received_time: Timestamp,
14+
exchange_time: Option<Timestamp>,
15+
sequence_no: Option<u64>,
16+
trace_id: TraceId,
17+
side: Side,
18+
price: f64,
19+
amount: f64,
20+
is_eot: bool,
21+
}
22+
23+
#[derive(Serialize, Deserialize)]
24+
struct Timestamp(i64);
25+
26+
#[derive(Serialize, Deserialize)]
27+
struct TraceId(u64);
28+
29+
#[derive(Serialize_repr, Deserialize_repr)]
30+
#[repr(i8)]
31+
enum Side {
32+
Bid = 0,
33+
Ask = 1,
34+
}
35+
36+
impl_benchmark_row_no_access_type!(L2Update, instrument_id);
37+
38+
async fn prepare_data() {
39+
let client = Client::default().with_url("http://localhost:8123");
40+
41+
client
42+
.query(
43+
r#"
44+
CREATE TABLE IF NOT EXISTS l2_book_log
45+
(
46+
`instrument_id` UInt32 CODEC(T64, Default),
47+
`received_time` DateTime64(9) CODEC(DoubleDelta, Default),
48+
`exchange_time` Nullable(DateTime64(9)) CODEC(DoubleDelta, Default),
49+
`sequence_no` Nullable(UInt64) CODEC(DoubleDelta, Default),
50+
`trace_id` UInt64 CODEC(DoubleDelta, Default),
51+
`side` Enum8('Bid' = 0, 'Ask' = 1),
52+
`price` Float64,
53+
`amount` Float64,
54+
`is_eot` Bool
55+
)
56+
ENGINE = MergeTree
57+
PRIMARY KEY (instrument_id, received_time)
58+
"#,
59+
)
60+
.execute()
61+
.await
62+
.unwrap();
63+
64+
let len = client
65+
.query("SELECT count() FROM l2_book_log")
66+
.fetch_one::<usize>()
67+
.await
68+
.unwrap();
69+
70+
if len > 0 {
71+
return;
72+
}
73+
74+
let mut insert = client.insert("l2_book_log").unwrap();
75+
76+
for i in 0..10_000_000 {
77+
insert
78+
.write(&L2Update {
79+
instrument_id: 42,
80+
received_time: Timestamp(1749888780458000000 + 11_111 * i as i64),
81+
exchange_time: Some(Timestamp(1749888780458000000 + 10_101 * i as i64)),
82+
trace_id: TraceId(1749888780458000000 + i),
83+
sequence_no: Some(i),
84+
side: if i % 10 >= 5 { Side::Bid } else { Side::Ask },
85+
price: 54321. + 100. * (i as f64).sin(),
86+
amount: 100. + 100. * (i as f64).sin(),
87+
is_eot: i % 10 == 0,
88+
})
89+
.await
90+
.unwrap();
91+
}
92+
93+
insert.end().await.unwrap();
94+
}
95+
96+
async fn bench(compression: Compression, validation: bool) {
97+
let stats =
98+
do_select_bench::<L2Update>("SELECT * FROM l2_book_log", compression, validation).await;
99+
assert_eq!(stats.result, 420000000);
100+
print_results::<L2Update>(&stats, compression, validation);
101+
}
102+
103+
#[tokio::main]
104+
async fn main() {
105+
prepare_data().await;
106+
print_header(None);
107+
#[cfg(feature = "lz4")]
108+
bench(Compression::Lz4, false).await;
109+
#[cfg(feature = "lz4")]
110+
bench(Compression::Lz4, true).await;
111+
bench(Compression::None, false).await;
112+
bench(Compression::None, true).await;
113+
}

0 commit comments

Comments
 (0)