Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions arrow-array/src/builder/fixed_size_binary_dictionary_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,41 @@ where

DictionaryArray::from(unsafe { builder.build_unchecked() })
}

/// Builds the `DictionaryArray` without resetting the values builder or
/// the internal de-duplication map.
///
/// The advantage of doing this is that the values will represent the entire
/// set of what has been built so-far by this builder and ensures
/// consistency in the assignment of keys to values across multiple calls
/// to `finish_preserve_values`. This enables ipc writers to efficiently
/// emit delta dictionaries.
///
/// The downside to this is that building the record requires creating a
/// copy of the values, which can become slowly more expensive if the
/// dictionary grows.
///
/// Additionally, if record batches from multiple different dictionary
/// builders for the same column are fed into a single ipc writer, beware
/// that entire dictionaries are likely to be re-sent frequently even when
/// the majority of the values are not used by the current record batch.
pub fn finish_preserve_values(&mut self) -> DictionaryArray<K> {
let values = self.values_builder.finish_cloned();
let keys = self.keys_builder.finish();

let data_type = DataType::Dictionary(
Box::new(K::DATA_TYPE),
Box::new(FixedSizeBinary(self.byte_width)),
);

let builder = keys
.into_data()
.into_builder()
.data_type(data_type)
.child_data(vec![values.into_data()]);

DictionaryArray::from(unsafe { builder.build_unchecked() })
}
}

fn get_bytes(values: &FixedSizeBinaryBuilder, byte_width: i32, idx: usize) -> &[u8] {
Expand Down Expand Up @@ -508,4 +543,62 @@ mod tests {
);
}
}

#[test]
fn test_finish_preserve_values() {
// Create the first dictionary
let mut builder = FixedSizeBinaryDictionaryBuilder::<Int32Type>::new(3);
builder.append_value("aaa");
builder.append_value("bbb");
builder.append_value("ccc");
let dict = builder.finish_preserve_values();
assert_eq!(dict.keys().values(), &[0, 1, 2]);
let values = dict
.downcast_dict::<FixedSizeBinaryArray>()
.unwrap()
.into_iter()
.collect::<Vec<_>>();
assert_eq!(
values,
vec![
Some("aaa".as_bytes()),
Some("bbb".as_bytes()),
Some("ccc".as_bytes())
]
);

// Create a new dictionary
builder.append_value("ddd");
builder.append_value("eee");
let dict2 = builder.finish_preserve_values();

// Make sure the keys are assigned after the old ones and we have the
// right values
assert_eq!(dict2.keys().values(), &[3, 4]);
let values = dict2
.downcast_dict::<FixedSizeBinaryArray>()
.unwrap()
.into_iter()
.collect::<Vec<_>>();
assert_eq!(values, [Some("ddd".as_bytes()), Some("eee".as_bytes())]);

// Check that we have all of the expected values
let all_values = dict2
.values()
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.unwrap()
.into_iter()
.collect::<Vec<_>>();
assert_eq!(
all_values,
[
Some("aaa".as_bytes()),
Some("bbb".as_bytes()),
Some("ccc".as_bytes()),
Some("ddd".as_bytes()),
Some("eee".as_bytes())
]
);
}
}
79 changes: 79 additions & 0 deletions arrow-array/src/builder/generic_bytes_dictionary_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,38 @@ where
DictionaryArray::from(unsafe { builder.build_unchecked() })
}

/// Builds the `DictionaryArray` without resetting the values builder or
/// the internal de-duplication map.
///
/// The advantage of doing this is that the values will represent the entire
/// set of what has been built so-far by this builder and ensures
/// consistency in the assignment of keys to values across multiple calls
/// to `finish_preserve_values`. This enables ipc writers to efficiently
/// emit delta dictionaries.
///
/// The downside to this is that building the record requires creating a
/// copy of the values, which can become slowly more expensive if the
/// dictionary grows.
///
/// Additionally, if record batches from multiple different dictionary
/// builders for the same column are fed into a single ipc writer, beware
/// that entire dictionaries are likely to be re-sent frequently even when
/// the majority of the values are not used by the current record batch.
pub fn finish_preserve_values(&mut self) -> DictionaryArray<K> {
let values = self.values_builder.finish_cloned();
let keys = self.keys_builder.finish();

let data_type = DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(T::DATA_TYPE));

let builder = keys
.into_data()
.into_builder()
.data_type(data_type)
.child_data(vec![values.into_data()]);

DictionaryArray::from(unsafe { builder.build_unchecked() })
}

/// Returns the current null buffer as a slice
pub fn validity_slice(&self) -> Option<&[u8]> {
self.keys_builder.validity_slice()
Expand Down Expand Up @@ -1006,4 +1038,51 @@ mod tests {

assert_eq!(values, [None, None]);
}

#[test]
fn test_finish_preserve_values() {
// Create the first dictionary
let mut builder = GenericByteDictionaryBuilder::<Int32Type, Utf8Type>::new();
builder.append("a").unwrap();
builder.append("b").unwrap();
builder.append("c").unwrap();
let dict = builder.finish_preserve_values();
assert_eq!(dict.keys().values(), &[0, 1, 2]);
assert_eq!(dict.values().len(), 3);
let values = dict
.downcast_dict::<GenericByteArray<Utf8Type>>()
.unwrap()
.into_iter()
.collect::<Vec<_>>();
assert_eq!(values, [Some("a"), Some("b"), Some("c")]);

// Create a new dictionary
builder.append("d").unwrap();
builder.append("e").unwrap();
let dict2 = builder.finish_preserve_values();

// Make sure the keys are assigned after the old ones and we have the
// right values
assert_eq!(dict2.keys().values(), &[3, 4]);
let values = dict2
.downcast_dict::<GenericByteArray<Utf8Type>>()
.unwrap()
.into_iter()
.collect::<Vec<_>>();
assert_eq!(values, [Some("d"), Some("e")]);

// Check that we have all of the expected values
assert_eq!(dict2.values().len(), 5);
let all_values = dict2
.values()
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.into_iter()
.collect::<Vec<_>>();
assert_eq!(
all_values,
[Some("a"), Some("b"), Some("c"), Some("d"), Some("e"),]
);
}
}
73 changes: 73 additions & 0 deletions arrow-array/src/builder/primitive_dictionary_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,38 @@ where
DictionaryArray::from(unsafe { builder.build_unchecked() })
}

/// Builds the `DictionaryArray` without resetting the values builder or
/// the internal de-duplication map.
///
/// The advantage of doing this is that the values will represent the entire
/// set of what has been built so-far by this builder and ensures
/// consistency in the assignment of keys to values across multiple calls
/// to `finish_preserve_values`. This enables ipc writers to efficiently
/// emit delta dictionaries.
///
/// The downside to this is that building the record requires creating a
/// copy of the values, which can become slowly more expensive if the
/// dictionary grows.
///
/// Additionally, if record batches from multiple different dictionary
/// builders for the same column are fed into a single ipc writer, beware
/// that entire dictionaries are likely to be re-sent frequently even when
/// the majority of the values are not used by the current record batch.
pub fn finish_preserve_values(&mut self) -> DictionaryArray<K> {
let values = self.values_builder.finish_cloned();
let keys = self.keys_builder.finish();

let data_type = DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(V::DATA_TYPE));

let builder = keys
.into_data()
.into_builder()
.data_type(data_type)
.child_data(vec![values.into_data()]);

DictionaryArray::from(unsafe { builder.build_unchecked() })
}

/// Returns the current dictionary values buffer as a slice
pub fn values_slice(&self) -> &[V::Native] {
self.values_builder.values_slice()
Expand Down Expand Up @@ -817,4 +849,45 @@ mod tests {
);
}
}

#[test]
fn test_finish_preserve_values() {
// Create the first dictionary
let mut builder = PrimitiveDictionaryBuilder::<UInt8Type, UInt32Type>::new();
builder.append(10).unwrap();
builder.append(20).unwrap();
let array = builder.finish_preserve_values();
assert_eq!(array.keys(), &UInt8Array::from(vec![Some(0), Some(1)]));
let values: &[u32] = array
.values()
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap()
.values();
assert_eq!(values, &[10, 20]);

// Create a new dictionary
builder.append(30).unwrap();
builder.append(40).unwrap();
let array2 = builder.finish_preserve_values();

// Make sure the keys are assigned after the old ones
// and that we have the right values
assert_eq!(array2.keys(), &UInt8Array::from(vec![Some(2), Some(3)]));
let values = array2
.downcast_dict::<UInt32Array>()
.unwrap()
.into_iter()
.collect::<Vec<_>>();
assert_eq!(values, vec![Some(30), Some(40)]);

// Check that we have all of the expected values
let all_values: &[u32] = array2
.values()
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap()
.values();
assert_eq!(all_values, &[10, 20, 30, 40]);
}
}
1 change: 1 addition & 0 deletions arrow-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true}
flatbuffers = { version = "25.2.10", default-features = false }
lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true }
zstd = { version = "0.13.0", default-features = false, optional = true }
Expand Down
5 changes: 4 additions & 1 deletion arrow-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ pub mod writer;

mod compression;

#[cfg(test)]
mod tests;

#[allow(mismatched_lifetime_syntaxes)]
#[allow(clippy::redundant_closure)]
#[allow(clippy::needless_lifetimes)]
#[allow(clippy::extra_unused_lifetimes)]
#[allow(clippy::redundant_static_lifetimes)]
#[allow(clippy::redundant_field_names)]
#[allow(non_camel_case_types)]
#[allow(mismatched_lifetime_syntaxes)]
#[allow(missing_docs)] // Because this is autogenerated
pub mod gen;

Expand Down
Loading
Loading