-
Notifications
You must be signed in to change notification settings - Fork 1k
feat: arrow-ipc delta dictionary support #8001
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
I worked on adding delta dictionary support a while ago with the help of claude code, but I never ended up having any time to polish it up for review. Here's the branch, feel free to steal whatever you want: https://github.com/polarsignals/arrow-rs/tree/asubiotto-delta |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this contribution @JakeDern 🙏
I need some pointers on the best way to test this using only rust, but am happy to implement any suggestions 🙂. The validation that I did so far involved using the Go ipc writer to dump stream data to a file which I then read from rust:
It looks to me like the pre-existing integration suite may already have this
The data is here (it is a git submodule) https://github.com/apache/arrow-testing/tree/master/data/arrow-ipc-stream/integration
Perhaps you could follow the model of a test like this:
fn read_0_1_7() { |
And read the content of https://github.com/apache/arrow-testing/tree/master/data/arrow-ipc-stream/integration/4.0.0-shareddict (instead of 0.17.1)
@asubiotto @alamb Thanks for the pointers! It looks like the test data in 4.0.0-shareddict may not contain any delta dictionaries as this test passes on main as well as on my branch: #[test]
fn read_stream_delta_dictionary() {
let testdata = arrow_test_data();
let path = "generated_shared_dict";
let version = "4.0.0-shareddict";
verify_arrow_file(&testdata, version, path);
verify_arrow_stream(&testdata, version, path);
} I'll take a crack at cleaning up the writer code from @asubiotto and incorporating it into this PR, then we can produce the data for testing as well. |
@asubiotto @alamb I have an initial implementation of both writer and reader ready for some feedback. Thanks @asubiotto for the starter code, some of the additional tests were especially helpful! Something I'm curious to get thoughts on after digging through both the Go and Rust writer code is that the ability for the ipc writers in the rust code to produce delta dictionaries is very limited. Both the ipc writers in Go and Rust look at dictionaries on incoming RecordBatches one at a time, doing a comparison of the new dictionary with the old in order to determine if the new is a superset of old. I thought this was odd because the following test I wrote in Rust produces 1 delta dictionary whereas I thought it would produce 2: #[test]
fn test_deltas() {
// Dictionary resets at ["C", "D"]
let batches: &[&[&str]] = &[&["A"], &["A", "B"], &["C", "D"], &["A", "B", "E"]];
run_test(batches, false);
} However since the writer only looks at the dictionary values of the last RecordBatch, it has to completely reset when This was inconsistent with what I'd seen in Go where similar code did produce two delta dictionaries, despite seeming to follow the same/similar algorithm. So I dug a bit more and wrote some Go code with two different writers and then read it from rust. Batches 0, 2, and 3 are written using one dictionary builder and batch 1 is written using another: dictType := &arrow.DictionaryType{
IndexType: arrow.PrimitiveTypes.Int16,
ValueType: arrow.BinaryTypes.String,
Ordered: false,
}
schema := arrow.NewSchema([]arrow.Field{
{Name: "foo", Type: dictType},
}, nil)
buf := bytes.NewBuffer([]byte{})
writer := ipc.NewWriter(buf, ipc.WithSchema(schema), ipc.WithDictionaryDeltas(true))
allocator := memory.NewGoAllocator()
dict_builder := array.NewDictionaryBuilder(allocator, dictType)
builder := array.NewStringBuilder(allocator)
builder.AppendStringValues([]string{"A", "B", "C"}, []bool{})
dict_builder.AppendArray(array.NewStringData(builder.NewArray().Data()))
record := array.NewRecord(schema, []arrow.Array{
dict_builder.NewArray(),
}, 3)
if err := writer.Write(record); err != nil {
panic(err)
}
// Reset builder
dict_builder2 := array.NewDictionaryBuilder(allocator, dictType)
builder.AppendStringValues([]string{"A", "B", "D"}, []bool{})
dict_builder2.AppendArray(array.NewStringData(builder.NewArray().Data()))
record2 := array.NewRecord(schema, []arrow.Array{
dict_builder2.NewArray(),
}, 3)
if err := writer.Write(record2); err != nil {
panic(err)
}
builder.AppendStringValues([]string{"A", "B", "E"}, []bool{})
dict_builder.AppendArray(array.NewStringData(builder.NewArray().Data()))
record3 := array.NewRecord(schema, []arrow.Array{
dict_builder.NewArray(),
}, 3)
if err := writer.Write(record3); err != nil {
panic(err)
}
builder.AppendStringValues([]string{"A", "B", "D"}, []bool{})
dict_builder.AppendArray(array.NewStringData(builder.NewArray().Data()))
record4 := array.NewRecord(schema, []arrow.Array{
dict_builder.NewArray(),
}, 3)
if err := writer.Write(record4); err != nil {
panic(err)
}
// write buf out to ~/delta_test/delta.arrow
if err := os.WriteFile("/home/jakedern/delta_test/delta2.arrow", buf.Bytes(), 0644); err != nil {
panic(fmt.Errorf("failed to write delta file: %w", err))
} [arrow-ipc/src/reader.rs:717:9] batch.isDelta() = false
[arrow-ipc/src/reader.rs:722:9] &dictionary_values = StringArray
[
"A",
"B",
"C",
]
[arrow-ipc/src/reader.rs:717:9] batch.isDelta() = false
[arrow-ipc/src/reader.rs:722:9] &dictionary_values = StringArray
[
"A",
"B",
"D",
]
[arrow-ipc/src/reader.rs:717:9] batch.isDelta() = false
[arrow-ipc/src/reader.rs:722:9] &dictionary_values = StringArray
[
"A",
"B",
"C",
"E",
]
[arrow-ipc/src/reader.rs:717:9] batch.isDelta() = true
[arrow-ipc/src/reader.rs:725:5] &dictionary_values = StringArray
[
"D",
] What we see is that writing a RecordBatch from a different builder basically resets the The takeaway being that in Go we need some cooperation between the builder and the ipc writer to get delta dictionaries a reasonably amount of time. If we shovel RecordBatches from different builders into the same ipc writer then we get bad behavior for delta dictionaries. In rust, we don't have that My questions are:
Curious to hear any thoughts, thank you both for the help so far! |
Based on comments above, it seems like maybe what we need is a way to efficiently detect if a value is already in the dictionary. The dictionary builders in all keep some kind of internal state the allows some efficient lookup of this. For example,
Maybe we could refactor this to be something that's reusable by the IPC writer? |
Exposing builder state makes the most sense to me. Thanks for taking this on @JakeDern! |
@asubiotto the approach I opted to take is to allow accumulating values only on the builder via a I also did a little bit of refactoring to get better visibility into the messages that the reader sees. Since we're trying to improve the conditions under which delta dictionaries are emitted (optimization), we need this visibility to test precisely rather than relying on heuristics like the size of the underlying stream. Feedback would be greatly appreciated! If this approach seems reasonable then I can add the same |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this PR @JakeDern -- I went through it carefully and I found the code well structured, the APIs well documented, and it thoroughly tested. Very nice 🏆
The only thing blocking this PR from merging now in my opinion is the breaking API change (see inline comments). I added the appropriate lablels
I left some other suggestions but none that I think would prevent this PR from merging (we could move the tests later or never). Let me know what you think.
cc @viirya or @brancz and @thinkharderdev and @tustvold who I think may be interested in this feature
let existing_len = old.len(); | ||
let new_len = new.len(); | ||
if existing_len == new_len { | ||
if *old == *new { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this compares the entire array element by element which could be slow, however it appears to be what the current code does too so if it turns out to be too slow we can potentially make it faster in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would the faster way be to do some direct comparison of the bytes of the underlying array data? Honestly the underlying data model is still a bit confusing to me 😅, so I don't know if that's a reasonable thing to do or not. But happy to take any direction here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure now to be honest 😬
eb11a4a
to
5e94c95
Compare
Ok @alamb - I made a pretty good effort to resolve the comments, added/updated a few more tests, and also removed the breaking API change in favor of the deprecation strategy that you mentioned. Should be ready for more feedback! The only thing I did not resolve was the efficiency of the element-by-element comparison, but if you have a suggestion for improvement there let me know. |
Thanks @JakeDern -- this PR now has a conflict that needs to be resolved before I can merge it. Is there any way you can merge up and resolve the conflicts? |
5e94c95
to
64b66c2
Compare
@alamb Should be good now! |
@alamb Since I've removed the breaking change in favor of deprecating the old API and adding a new one, do you think this can make it for the August release? I'm not sure what the RAT check that's failing is, and I couldn't figure out how to view the |
this CI check ensures that the apache header is at the top of all files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me -- I think we just need to add the license header to tests.rs and this will be good to go (for the aug release)
Thanks again @JakeDern
pub use stream::*; | ||
|
||
use arrow_select::concat; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, this is unfortunate -- that to get concat we need to get to add this dependency, but I don't really see a way around it
/// batch or dictionary batch requires access to stream state such as schema | ||
/// and the full dictionary cache. | ||
#[derive(Debug)] | ||
#[allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this dead code? Maybe it should be #cfg(test)
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is used by that next_ipc_message
API that I added. It's used by both test and library code, but some of the fields are only accessed in tests hence the dead code. I'd like to rely completely on this API for the StreamReader implementation, including in the constructor for getting the schema, but there are further refactoring that I think we would need to make that possible. Currently the constructor uses just the MessageReader
directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm probably fine with this for now.
FYI, it looks like the arrow_schema::Schema
contained in the Schema
variant of this enum is never read. Should it be read in the test? If not we could also do something like this:
pub(crate) enum IpcMessage {
Schema,
RecordBatch(RecordBatch),
#[allow(dead_code)]
DictionaryBatch {
id: i64,
is_delta: bool,
values: ArrayRef,
},
}
(and of course, modify the next_ipc_message
and run_sequence_test
accordingly)
Another option could be to have the maybe_next
method of StreamReader
actually drive the MessageReader
, and have a separate driver in the run_sequence_test
function which also drives it and produces these IpcMessage
. If we do that, we could maybe also remove the RecordBatch
contained in the RecordBatch
variant, since it's never read in the test.
The downside of that of course is we duplicate some of the dict handling logic here:
arrow-rs/arrow-ipc/src/reader.rs
Lines 1591 to 1612 in 34b4c74
let dict_values = get_dictionary_values( | |
&body.into(), | |
dict, | |
&self.schema, | |
&mut self.dictionaries_by_id, | |
&version, | |
false, | |
self.skip_validation.clone(), | |
)?; | |
update_dictionaries( | |
&mut self.dictionaries_by_id, | |
dict.isDelta(), | |
dict.id(), | |
dict_values.clone(), | |
)?; | |
IpcMessage::DictionaryBatch { | |
id: dict.id(), | |
is_delta: (dict.isDelta()), | |
values: (dict_values), | |
} |
let existing_len = old.len(); | ||
let new_len = new.len(); | ||
if existing_len == new_len { | ||
if *old == *new { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure now to be honest 😬
Also FYI @albertlockett perhaps you would have some time to review this PR as well |
Absolutely @alamb it would be my pleasure! I'll take a look first thing tomorrow morning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks good to me @JakeDern!
I had a few small comments, but nothing that I think should block this.
One general thought I had is: I wonder if eventually we'd want to produce deltas for record batches where the dictionary of the subsequent batches fed into the writer don't necessarily need to be extensions of the previous dict. (I think this has maybe been called out elsewhere, but I'll call it out here for posterity).
That would allow us to produce IPC dict deltas without having to copy the values the each time we call .finish_preserve_values()
. Of course, it makes the logic in writer::compare_dictionaries
more complicated (and maybe as we change the implementation, we could also maybe fix the performance issue identified here), and it also means that we need to adjust the dictionary keys when writing the subsequent batches.
Obviously this is a larger piece of work than what this change is trying to achieve, but maybe we could create a follow up issue to open the discussion? cc @alamb
/// batch or dictionary batch requires access to stream state such as schema | ||
/// and the full dictionary cache. | ||
#[derive(Debug)] | ||
#[allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm probably fine with this for now.
FYI, it looks like the arrow_schema::Schema
contained in the Schema
variant of this enum is never read. Should it be read in the test? If not we could also do something like this:
pub(crate) enum IpcMessage {
Schema,
RecordBatch(RecordBatch),
#[allow(dead_code)]
DictionaryBatch {
id: i64,
is_delta: bool,
values: ArrayRef,
},
}
(and of course, modify the next_ipc_message
and run_sequence_test
accordingly)
Another option could be to have the maybe_next
method of StreamReader
actually drive the MessageReader
, and have a separate driver in the run_sequence_test
function which also drives it and produces these IpcMessage
. If we do that, we could maybe also remove the RecordBatch
contained in the RecordBatch
variant, since it's never read in the test.
The downside of that of course is we duplicate some of the dict handling logic here:
arrow-rs/arrow-ipc/src/reader.rs
Lines 1591 to 1612 in 34b4c74
let dict_values = get_dictionary_values( | |
&body.into(), | |
dict, | |
&self.schema, | |
&mut self.dictionaries_by_id, | |
&version, | |
false, | |
self.skip_validation.clone(), | |
)?; | |
update_dictionaries( | |
&mut self.dictionaries_by_id, | |
dict.isDelta(), | |
dict.id(), | |
dict_values.clone(), | |
)?; | |
IpcMessage::DictionaryBatch { | |
id: dict.id(), | |
is_delta: (dict.isDelta()), | |
values: (dict_values), | |
} |
@albertlockett Thanks for the review, I'll try to go through it this afternoon!
Yeah I 100% agree - The implementation that I have is very similar to the go implementation, but I think there's opportunity to be more efficient. The naive way I think would be to have the IPC writer track all the values and re-intern them, but while that would get us nice deltas it would be super expensive. An alternative I was thinking about is we could:
That way the IPC writer will know for sure that every time it seals a batch that it has the exact delta for every dictionary since it owns the record batch production and is controlling what the dictionary builders do. But like you say it's a bigger change and better suited to a follow-up IMO. |
I agree -- I suggest we file it into its own new ticket so the idea isn't lost in the comments on this PR |
@albertlockett Not sure why I can't reply directly to this comment, but it's a good observation and I actually spent a fair bit of time thinking about it and pulling on various threads here but the required change stack got too deep (or had unwanted duplications like you mentioned). There are some chicken and egg problems if you look into how we use RecordBatchReader and the state required including the dictionaries and schema. Definitely want to continue iterating here over time and also look at the FileReader which has some of it's own logic I wasn't able to account for either. I can remove the fields that aren't read if you think it's worth it, but the computation to produce those fields (including schema) is already done so I thought it would be somewhat harmless to leave it so the ability is advertised to others who might leverage it. In the future I hope we can fully consolidate logic for interpreting IPC messages to one place. |
Oh OK, thanks @JakeDern. Yeah that's good context. I'm OK with how this is, no need to change! |
Thank you @JakeDern and @albertlockett for the work and conversation so far on this PR. Since @albertlockett and I think it is ready to go, I will merge it in. We can always improve the code as additional follow on work. Thanks again for your patience and dilligence |
Which issue does this PR close?
Rationale for this change
Delta dictionaries are not supported by either the arrow-ipc reader or writer. Other languages like Go have delta dictionary support and so reading ipc streams produced by those languages sometimes includes delta dictionaries.
This PR adds reader and writer support so that we can consume streams with those messages in rust.
What changes are included in this PR?
read_dictionary_impl
to support delta dictionaries by concatenating the dictionaries ifisDelta()
is truefinish_preserve_values
API toGenericBytesDictionaryBuilder
which allows for accumulating the total set of values built by the builderStreamReader
to de-couple parsing individual IPC messages from producing the next record batch. This enables better testing via inspection of the individual messaging in the streamMessageReader
type to handle reading metadata lengths, headers and message bodiesAre these changes tested?
Yes, unit testing suites were added to cover delta functionality specifically. Existing unit tests are expected to also guard against regressions due to refactors.
Are there any user-facing changes?
Yes, there is a new
finish_preserve_values
public method for GenericBytesDictionaryBuilder. If we want to move forward with this approach then this will be added to the other dictionary builders too.