Skip to content

Commit 46206f5

Browse files
committed
minor changes
1 parent b4d7d97 commit 46206f5

File tree

5 files changed

+216
-36
lines changed

5 files changed

+216
-36
lines changed

datafusion/physical-expr-common/Cargo.toml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,17 @@ path = "src/lib.rs"
3737

3838
[dependencies]
3939
ahash = { workspace = true }
40-
arrow = { workspace = true }
40+
arrow = { workspace = true, features = ["test_utils"] }
41+
criterion = "0.5"
4142
datafusion-common = { workspace = true, default-features = true }
4243
datafusion-expr-common = { workspace = true }
4344
hashbrown = { workspace = true }
4445
rand = { workspace = true }
46+
47+
[[bench]]
48+
harness = false
49+
name = "binary_map"
50+
51+
[[bench]]
52+
harness = false
53+
name = "binary_view_map"
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use arrow::array::ArrayRef;
21+
use arrow::util::bench_util::create_string_array_with_len;
22+
use criterion::{black_box, criterion_group, criterion_main, Criterion};
23+
use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType};
24+
25+
fn benchmark_arrow_bytes_map(c: &mut Criterion) {
26+
let sizes = [100_000, 1_000_000];
27+
let null_densities = [0.1, 0.5];
28+
let string_lengths = [20, 50];
29+
30+
for &num_items in &sizes {
31+
for &null_density in &null_densities {
32+
for &str_len in &string_lengths {
33+
let array: ArrayRef = Arc::new(create_string_array_with_len::<i32>(
34+
num_items,
35+
null_density,
36+
str_len,
37+
));
38+
39+
c.bench_function(
40+
&format!(
41+
"ArrowBytesMap insert_if_new - items: {}, null_density: {:.1}, str_len: {}",
42+
num_items, null_density, str_len
43+
),
44+
|b| {
45+
b.iter(|| {
46+
let mut map = ArrowBytesMap::<i32, ()>::new(OutputType::Utf8);
47+
map.insert_if_new(black_box(&array), |_| {}, |_| {}, |_| {});
48+
black_box(&map);
49+
});
50+
},
51+
);
52+
53+
let mut map = ArrowBytesMap::<i32, u32>::new(OutputType::Utf8);
54+
map.insert_if_new(&array, |_| 1u32, |_| {}, |_| {});
55+
56+
c.bench_function(
57+
&format!(
58+
"ArrowBytesMap get_payloads - items: {}, null_density: {:.1}, str_len: {}",
59+
num_items, null_density, str_len
60+
),
61+
|b| {
62+
b.iter(|| {
63+
let payloads = map.take().get_payloads(black_box(&array));
64+
black_box(payloads);
65+
});
66+
},
67+
);
68+
}
69+
}
70+
}
71+
}
72+
73+
criterion_group!(benches, benchmark_arrow_bytes_map);
74+
criterion_main!(benches);
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use arrow::array::ArrayRef;
21+
use arrow::util::bench_util::create_string_view_array_with_len;
22+
use criterion::{black_box, criterion_group, criterion_main, Criterion};
23+
use datafusion_physical_expr_common::{
24+
binary_map::OutputType, binary_view_map::ArrowBytesViewMap,
25+
};
26+
27+
fn benchmark_arrow_bytes_view_map(c: &mut Criterion) {
28+
let sizes = [100_000, 1_000_000];
29+
let null_densities = [0.1, 0.5];
30+
let string_lengths = [20, 50];
31+
32+
for &num_items in &sizes {
33+
for &null_density in &null_densities {
34+
for &str_len in &string_lengths {
35+
let array: ArrayRef = Arc::new(create_string_view_array_with_len(
36+
num_items,
37+
null_density,
38+
str_len,
39+
false,
40+
));
41+
42+
c.bench_function(
43+
&format!(
44+
"ArrowBytesViewMap insert_if_new - items: {}, null_density: {:.1}, str_len: {}",
45+
num_items, null_density, str_len
46+
),
47+
|b| {
48+
b.iter(|| {
49+
let mut map = ArrowBytesViewMap::<()>::new(OutputType::Utf8View);
50+
map.insert_if_new(black_box(&array), |_| {}, |_| {}, |_| {});
51+
black_box(&map);
52+
});
53+
},
54+
);
55+
56+
let mut map = ArrowBytesViewMap::<i32>::new(OutputType::Utf8View);
57+
map.insert_if_new(&array, |_| 1i32, |_| {}, |_| {});
58+
59+
c.bench_function(
60+
&format!(
61+
"ArrowBytesViewMap get_payloads - items: {}, null_density: {:.1}, str_len: {}",
62+
num_items, null_density, str_len
63+
),
64+
|b| {
65+
b.iter(|| {
66+
let payloads = map.take().get_payloads(black_box(&array));
67+
black_box(payloads);
68+
});
69+
},
70+
);
71+
}
72+
}
73+
}
74+
}
75+
76+
criterion_group!(benches, benchmark_arrow_bytes_view_map);
77+
criterion_main!(benches);

datafusion/physical-expr-common/src/binary_map.rs

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -539,33 +539,21 @@ where
539539
let mut batch_hashes = vec![0u64; values.len()];
540540
batch_hashes.clear();
541541
batch_hashes.resize(values.len(), 0);
542-
create_hashes(&[values.clone()], &self.random_state, &mut batch_hashes).unwrap(); // Compute the hashes for the values
542+
create_hashes(&[values.clone()], &self.random_state, &mut batch_hashes).unwrap();
543543

544544
// Step 2: Get payloads for each value
545545
let values = values.as_bytes::<B>();
546-
assert_eq!(values.len(), batch_hashes.len()); // Ensure hash count matches value count
546+
assert_eq!(values.len(), batch_hashes.len());
547547

548548
let mut payloads = Vec::with_capacity(values.len());
549549

550-
for (value, &hash) in values.iter().zip(batch_hashes.iter()) {
551-
// Handle null value
552-
let Some(value) = value else {
553-
if let Some(&(payload, _)) = self.null.as_ref() {
554-
payloads.push(Some(payload));
555-
} else {
556-
payloads.push(None);
557-
}
558-
continue;
559-
};
560-
550+
let process_value = |value: &B::Native, hash: u64| -> Option<V> {
561551
let value: &[u8] = value.as_ref();
562552
let value_len = O::usize_as(value.len());
563553

564-
// Small value optimization
565-
let payload = if value.len() <= SHORT_VALUE_LEN {
566-
let inline = value.iter().fold(0usize, |acc, &x| acc << 8 | x as usize);
554+
if value.len() <= SHORT_VALUE_LEN {
555+
let inline = value.iter().fold(0usize, |acc, &x| (acc << 8) | x as usize);
567556

568-
// Check if the value is already present in the set
569557
let entry = self.map.get(hash, |header| {
570558
if header.len != value_len {
571559
return false;
@@ -575,7 +563,6 @@ where
575563

576564
entry.map(|entry| entry.payload)
577565
} else {
578-
// Handle larger values
579566
let entry = self.map.get(hash, |header| {
580567
if header.len != value_len {
581568
return false;
@@ -586,9 +573,30 @@ where
586573
});
587574

588575
entry.map(|entry| entry.payload)
589-
};
576+
}
577+
};
590578

591-
payloads.push(payload);
579+
if let Some(validity_bitmap) = values.nulls() {
580+
let null_payload = self.null.as_ref().map(|&(payload, _)| payload);
581+
let validity_iter = validity_bitmap.iter();
582+
583+
for ((value_opt, &hash), is_valid) in
584+
values.iter().zip(batch_hashes.iter()).zip(validity_iter)
585+
{
586+
if is_valid {
587+
let value = value_opt.unwrap(); // Safe to unwrap since is_valid is true
588+
let payload = process_value(value, hash);
589+
payloads.push(payload);
590+
} else {
591+
payloads.push(null_payload);
592+
}
593+
}
594+
} else {
595+
for (value_opt, &hash) in values.iter().zip(batch_hashes.iter()) {
596+
let value = value_opt.unwrap(); // Safe to unwrap because there are no nulls
597+
let payload = process_value(value, hash);
598+
payloads.push(payload);
599+
}
592600
}
593601

594602
payloads

datafusion/physical-expr-common/src/binary_view_map.rs

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -360,34 +360,46 @@ where
360360
{
361361
// Step 1: Compute hashes
362362
let mut batch_hashes = vec![0u64; values.len()];
363-
create_hashes(&[values.clone()], &self.random_state, &mut batch_hashes).unwrap(); // Compute the hashes for the values
363+
create_hashes(&[values.clone()], &self.random_state, &mut batch_hashes).unwrap();
364364

365365
// Step 2: Get payloads for each value
366366
let values = values.as_byte_view::<B>();
367-
assert_eq!(values.len(), batch_hashes.len()); // Ensure hash count matches value count
367+
assert_eq!(values.len(), batch_hashes.len());
368368

369369
let mut payloads = Vec::with_capacity(values.len());
370370

371-
for (value, &hash) in values.iter().zip(batch_hashes.iter()) {
372-
// Handle null value
373-
let Some(value) = value else {
374-
if let Some(&(payload, _)) = self.null.as_ref() {
375-
payloads.push(Some(payload));
376-
} else {
377-
payloads.push(None);
378-
}
379-
continue;
380-
};
381-
371+
let process_value = |value: &B::Native, hash: u64| -> Option<V> {
382372
let value: &[u8] = value.as_ref();
383373

384374
let entry = self.map.get(hash, |header| {
385375
let v = self.builder.get_value(header.view_idx);
386376
v.len() == value.len() && v == value
387377
});
388378

389-
let payload = entry.map(|e| e.payload);
390-
payloads.push(payload);
379+
entry.map(|e| e.payload)
380+
};
381+
382+
if let Some(validity_bitmap) = values.nulls() {
383+
let null_payload = self.null.as_ref().map(|&(payload, _)| payload);
384+
let validity_iter = validity_bitmap.iter();
385+
386+
for ((value_opt, &hash), is_valid) in
387+
values.iter().zip(batch_hashes.iter()).zip(validity_iter)
388+
{
389+
if is_valid {
390+
let value = value_opt.unwrap(); // Safe to unwrap since is_valid is true
391+
let payload = process_value(value, hash);
392+
payloads.push(payload);
393+
} else {
394+
payloads.push(null_payload);
395+
}
396+
}
397+
} else {
398+
for (value_opt, &hash) in values.iter().zip(batch_hashes.iter()) {
399+
let value = value_opt.unwrap(); // Safe to unwrap because there are no nulls
400+
let payload = process_value(value, hash);
401+
payloads.push(payload);
402+
}
391403
}
392404

393405
payloads

0 commit comments

Comments
 (0)