-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add insert_or_update and get_payloads to map #12701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Thanks @dmitrybugakov please help understand what are use cases for this functions in DF or downstream projects? |
Right but still not sure should it be part of DataFusion core. The mode function is in contrib package, however DF core is heavily using StringViews. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @dmitrybugakov -- my biggest concern with this PR is the duplication between insert_if_new
and insert_or_update
It seems like the biggest difference is when the observe_payload and update_payload functions are called (I had to study the code to see that observe_payload seems to be called also for newly inserted payloads as well as pre-exisitng ones)
what do you think instead of adding an entirely new copy, extending the existing insert_if_new
function to take 3 functions (instead of just two)
Something like this, perhaps:
pub fn insert_if_new<MP, OP>(
&mut self,
values: &ArrayRef,
make_payload_fn: MP,
observe_payload_fn: OP,
update_payload_fn: UP,
) where
MP: FnMut(Option<&[u8]>) -> V,
UP: FnMut(&mut V),
OP: FnMut(V),
{
?
FWIW I think having a structure like this (optimized maps that can emit arrow arrays) in the core of datafusion is a good idea given they are general purpose. I think this one is pretty general purpose |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @dmitrybugakov
I read this code and it all makes sense to me
If anyone is curious here is the use of get_payloads
in mode
: https://github.com/datafusion-contrib/datafusion-functions-extra/blob/dd6806037c6f3db949768485f9afdc8db0797d03/src/common/mode/bytes.rs#L81
The one last thing I would like to do before merging this is to run some performance benchmarks to make sure there are no regressions
for (value, &hash) in values.iter().zip(batch_hashes.iter()) { | ||
// Handle null value | ||
let Some(value) = value else { | ||
if let Some(&(payload, _)) = self.null.as_ref() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this code would be faster if you checked for nulls outside the loop rather than on each row
/// # Returns | ||
/// | ||
/// A vector of payloads for each value, or `None` if the value is not found. | ||
pub fn get_payloads(self, values: &ArrayRef) -> Vec<Option<V>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we could call this (and the other function) into_values()
as that is what it appears to be doing (consuming the map to get the values)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered that as well, but when comparing into_values
with into_state
(which literally converts the map to a state), using the name into_values
feels a bit off to me. Since it only returns the values’ state without changing the map.
@@ -178,8 +183,12 @@ where | |||
/// `make_payload_fn`: invoked for each value that is not already present | |||
/// to create the payload, in order of the values in `values` | |||
/// | |||
/// `observe_payload_fn`: invoked once, for each value in `values`, that was | |||
/// already present in the map, with corresponding payload value. | |||
/// `update_payload_fn`: invoked when the value is already present in the map. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-reading this code i wonder why the term "payload" is used rather than "value" -- it is a Map after all, so it has keys and values 🤔
Maybe it is because the values
was already used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that might be the case. It could make sense to stick with the more traditional key/value naming.
@@ -0,0 +1,77 @@ | |||
// Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArrowBytesViewMap insert_if_new - items: 1000000, null_density: 0.1, str_len: 20
time: [29.995 ms 30.207 ms 30.447 ms]
Found 16 outliers among 100 measurements (16.00%)
3 (3.00%) high mild
13 (13.00%) high severe
ArrowBytesViewMap get_payloads - items: 1000000, null_density: 0.1, str_len: 20
time: [7.6807 ms 7.7203 ms 7.7624 ms]
Found 4 outliers among 100 measurements (4.00%)
4 (4.00%) high mild
ArrowBytesViewMap insert_if_new - items: 1000000, null_density: 0.1, str_len: 50
time: [35.002 ms 35.305 ms 35.637 ms]
Found 7 outliers among 100 measurements (7.00%)
6 (6.00%) high mild
1 (1.00%) high severe
ArrowBytesViewMap get_payloads - items: 1000000, null_density: 0.1, str_len: 50
time: [8.7820 ms 8.8271 ms 8.8757 ms]
Found 7 outliers among 100 measurements (7.00%)
7 (7.00%) high mild
ArrowBytesViewMap insert_if_new - items: 1000000, null_density: 0.5, str_len: 20
time: [23.549 ms 23.660 ms 23.784 ms]
Found 11 outliers among 100 measurements (11.00%)
5 (5.00%) high mild
6 (6.00%) high severe
ArrowBytesViewMap get_payloads - items: 1000000, null_density: 0.5, str_len: 20
time: [11.704 ms 11.746 ms 11.793 ms]
Found 10 outliers among 100 measurements (10.00%)
6 (6.00%) high mild
4 (4.00%) high severe
ArrowBytesViewMap insert_if_new - items: 1000000, null_density: 0.5, str_len: 50
time: [26.712 ms 27.083 ms 27.475 ms]
ArrowBytesViewMap get_payloads - items: 1000000, null_density: 0.5, str_len: 50
time: [12.457 ms 12.516 ms 12.581 ms]
Found 5 outliers among 100 measurements (5.00%)
3 (3.00%) high mild
2 (2.00%) high severe
@@ -0,0 +1,74 @@ | |||
// Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArrowBytesMap insert_if_new - items: 1000000, null_density: 0.1, str_len: 20
time: [26.761 ms 27.106 ms 27.472 ms]
Found 1 outliers among 100 measurements (1.00%)
1 (1.00%) high mild
ArrowBytesMap get_payloads - items: 1000000, null_density: 0.1, str_len: 20
time: [7.1595 ms 7.1766 ms 7.1950 ms]
Found 6 outliers among 100 measurements (6.00%)
1 (1.00%) low mild
3 (3.00%) high mild
2 (2.00%) high severe
ArrowBytesMap insert_if_new - items: 1000000, null_density: 0.1, str_len: 50
time: [31.591 ms 31.749 ms 31.929 ms]
Found 11 outliers among 100 measurements (11.00%)
6 (6.00%) high mild
5 (5.00%) high severe
ArrowBytesMap get_payloads - items: 1000000, null_density: 0.1, str_len: 50
time: [8.2213 ms 8.2395 ms 8.2596 ms]
Found 9 outliers among 100 measurements (9.00%)
4 (4.00%) high mild
5 (5.00%) high severe
ArrowBytesMap insert_if_new - items: 1000000, null_density: 0.5, str_len: 20
time: [22.149 ms 22.258 ms 22.378 ms]
Found 13 outliers among 100 measurements (13.00%)
6 (6.00%) high mild
7 (7.00%) high severe
ArrowBytesMap get_payloads - items: 1000000, null_density: 0.5, str_len: 20
time: [10.727 ms 10.783 ms 10.842 ms]
Found 1 outliers among 100 measurements (1.00%)
1 (1.00%) high mild
ArrowBytesMap insert_if_new - items: 1000000, null_density: 0.5, str_len: 50
time: [23.929 ms 24.083 ms 24.252 ms]
Found 17 outliers among 100 measurements (17.00%)
10 (10.00%) high mild
7 (7.00%) high severe
ArrowBytesMap get_payloads - items: 1000000, null_density: 0.5, str_len: 50
time: [11.141 ms 11.165 ms 11.194 ms]
Found 10 outliers among 100 measurements (10.00%)
3 (3.00%) high mild
7 (7.00%) high severe
f4ddf8c
to
46206f5
Compare
@@ -37,8 +37,17 @@ path = "src/lib.rs" | |||
|
|||
[dependencies] | |||
ahash = { workspace = true } | |||
arrow = { workspace = true } | |||
arrow = { workspace = true, features = ["test_utils"] } | |||
criterion = "0.5" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be, it makes sense to move criterion
to the workspace deps?
1eb1b66
to
44b5507
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dmitrybugakov -- thank you, this is epic work
I was dreaming about this change last night, and spent some time reviewing the mode
implementation this morning
One of the main benefits of the BinaryMap
structures in DataFusion over a normal HashMap is that the keys (aka string values) can be emitted directly as an Arrow StringArray
without having to copy the values.
What I don't understand about mode
is that it actually seems to be just looking for the single largest value and returning it as a ScalarValue
So I guess I am trying to figure out, maybe we could implement mode
with just a normal HashMap
(or one from hashbrown) to make the code simpler 🤔 and avoid the need for this change in the datafusion core?
I really apoligize for not reviewing the mode
implementation more carefully before.
Thank you, @alamb That’s the reason I started using it.
|
Thank you for understanding - and I apologize for all the runaround just to result in a closed PR. I will try and do better next time |
Which issue does this PR close?
Closes #12594
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?