Skip to content

Commit 452a8f4

Browse files
authored
fix: enable pruning by bloom filters for dictionary columns (#13768)
1 parent 57d1309 commit 452a8f4

File tree

3 files changed

+363
-70
lines changed

3 files changed

+363
-70
lines changed

datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs

Lines changed: 89 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,94 @@ struct BloomFilterStatistics {
228228
column_sbbf: HashMap<String, (Sbbf, Type)>,
229229
}
230230

231+
impl BloomFilterStatistics {
232+
/// Helper function for checking if [`Sbbf`] filter contains [`ScalarValue`].
233+
///
234+
/// In case the type of scalar is not supported, returns `true`, assuming that the
235+
/// value may be present.
236+
fn check_scalar(sbbf: &Sbbf, value: &ScalarValue, parquet_type: &Type) -> bool {
237+
match value {
238+
ScalarValue::Utf8(Some(v))
239+
| ScalarValue::Utf8View(Some(v))
240+
| ScalarValue::LargeUtf8(Some(v)) => sbbf.check(&v.as_str()),
241+
ScalarValue::Binary(Some(v))
242+
| ScalarValue::BinaryView(Some(v))
243+
| ScalarValue::LargeBinary(Some(v)) => sbbf.check(v),
244+
ScalarValue::FixedSizeBinary(_size, Some(v)) => sbbf.check(v),
245+
ScalarValue::Boolean(Some(v)) => sbbf.check(v),
246+
ScalarValue::Float64(Some(v)) => sbbf.check(v),
247+
ScalarValue::Float32(Some(v)) => sbbf.check(v),
248+
ScalarValue::Int64(Some(v)) => sbbf.check(v),
249+
ScalarValue::Int32(Some(v)) => sbbf.check(v),
250+
ScalarValue::UInt64(Some(v)) => sbbf.check(v),
251+
ScalarValue::UInt32(Some(v)) => sbbf.check(v),
252+
ScalarValue::Decimal128(Some(v), p, s) => match parquet_type {
253+
Type::INT32 => {
254+
//https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/Encodings.md?plain=1#L35-L42
255+
// All physical type are little-endian
256+
if *p > 9 {
257+
//DECIMAL can be used to annotate the following types:
258+
//
259+
// int32: for 1 <= precision <= 9
260+
// int64: for 1 <= precision <= 18
261+
return true;
262+
}
263+
let b = (*v as i32).to_le_bytes();
264+
// Use Decimal constructor after https://github.com/apache/arrow-rs/issues/5325
265+
let decimal = Decimal::Int32 {
266+
value: b,
267+
precision: *p as i32,
268+
scale: *s as i32,
269+
};
270+
sbbf.check(&decimal)
271+
}
272+
Type::INT64 => {
273+
if *p > 18 {
274+
return true;
275+
}
276+
let b = (*v as i64).to_le_bytes();
277+
let decimal = Decimal::Int64 {
278+
value: b,
279+
precision: *p as i32,
280+
scale: *s as i32,
281+
};
282+
sbbf.check(&decimal)
283+
}
284+
Type::FIXED_LEN_BYTE_ARRAY => {
285+
// keep with from_bytes_to_i128
286+
let b = v.to_be_bytes().to_vec();
287+
// Use Decimal constructor after https://github.com/apache/arrow-rs/issues/5325
288+
let decimal = Decimal::Bytes {
289+
value: b.into(),
290+
precision: *p as i32,
291+
scale: *s as i32,
292+
};
293+
sbbf.check(&decimal)
294+
}
295+
_ => true,
296+
},
297+
// One more parrern matching since not all data types are supported
298+
// inside of a Dictionary
299+
ScalarValue::Dictionary(_, inner) => match inner.as_ref() {
300+
ScalarValue::Int32(_)
301+
| ScalarValue::Int64(_)
302+
| ScalarValue::UInt32(_)
303+
| ScalarValue::UInt64(_)
304+
| ScalarValue::Float32(_)
305+
| ScalarValue::Float64(_)
306+
| ScalarValue::Utf8(_)
307+
| ScalarValue::LargeUtf8(_)
308+
| ScalarValue::Binary(_)
309+
| ScalarValue::LargeBinary(_) => {
310+
BloomFilterStatistics::check_scalar(sbbf, inner, parquet_type)
311+
}
312+
_ => true,
313+
},
314+
_ => true,
315+
}
316+
}
317+
}
318+
231319
impl PruningStatistics for BloomFilterStatistics {
232320
fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
233321
None
@@ -268,70 +356,7 @@ impl PruningStatistics for BloomFilterStatistics {
268356

269357
let known_not_present = values
270358
.iter()
271-
.map(|value| {
272-
match value {
273-
ScalarValue::Utf8(Some(v)) | ScalarValue::Utf8View(Some(v)) => {
274-
sbbf.check(&v.as_str())
275-
}
276-
ScalarValue::Binary(Some(v)) | ScalarValue::BinaryView(Some(v)) => {
277-
sbbf.check(v)
278-
}
279-
ScalarValue::FixedSizeBinary(_size, Some(v)) => sbbf.check(v),
280-
ScalarValue::Boolean(Some(v)) => sbbf.check(v),
281-
ScalarValue::Float64(Some(v)) => sbbf.check(v),
282-
ScalarValue::Float32(Some(v)) => sbbf.check(v),
283-
ScalarValue::Int64(Some(v)) => sbbf.check(v),
284-
ScalarValue::Int32(Some(v)) => sbbf.check(v),
285-
ScalarValue::UInt64(Some(v)) => sbbf.check(v),
286-
ScalarValue::UInt32(Some(v)) => sbbf.check(v),
287-
ScalarValue::Decimal128(Some(v), p, s) => match parquet_type {
288-
Type::INT32 => {
289-
//https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/Encodings.md?plain=1#L35-L42
290-
// All physical type are little-endian
291-
if *p > 9 {
292-
//DECIMAL can be used to annotate the following types:
293-
//
294-
// int32: for 1 <= precision <= 9
295-
// int64: for 1 <= precision <= 18
296-
return true;
297-
}
298-
let b = (*v as i32).to_le_bytes();
299-
// Use Decimal constructor after https://github.com/apache/arrow-rs/issues/5325
300-
let decimal = Decimal::Int32 {
301-
value: b,
302-
precision: *p as i32,
303-
scale: *s as i32,
304-
};
305-
sbbf.check(&decimal)
306-
}
307-
Type::INT64 => {
308-
if *p > 18 {
309-
return true;
310-
}
311-
let b = (*v as i64).to_le_bytes();
312-
let decimal = Decimal::Int64 {
313-
value: b,
314-
precision: *p as i32,
315-
scale: *s as i32,
316-
};
317-
sbbf.check(&decimal)
318-
}
319-
Type::FIXED_LEN_BYTE_ARRAY => {
320-
// keep with from_bytes_to_i128
321-
let b = v.to_be_bytes().to_vec();
322-
// Use Decimal constructor after https://github.com/apache/arrow-rs/issues/5325
323-
let decimal = Decimal::Bytes {
324-
value: b.into(),
325-
precision: *p as i32,
326-
scale: *s as i32,
327-
};
328-
sbbf.check(&decimal)
329-
}
330-
_ => true,
331-
},
332-
_ => true,
333-
}
334-
})
359+
.map(|value| BloomFilterStatistics::check_scalar(sbbf, value, parquet_type))
335360
// The row group doesn't contain any of the values if
336361
// all the checks are false
337362
.all(|v| !v);

datafusion/core/tests/parquet/mod.rs

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717

1818
//! Parquet integration tests
1919
use crate::parquet::utils::MetricsFinder;
20-
use arrow::array::Decimal128Array;
2120
use arrow::{
2221
array::{
2322
make_array, Array, ArrayRef, BinaryArray, Date32Array, Date64Array,
24-
FixedSizeBinaryArray, Float64Array, Int16Array, Int32Array, Int64Array,
25-
Int8Array, LargeBinaryArray, LargeStringArray, StringArray,
26-
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
27-
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
23+
Decimal128Array, DictionaryArray, FixedSizeBinaryArray, Float64Array, Int16Array,
24+
Int32Array, Int64Array, Int8Array, LargeBinaryArray, LargeStringArray,
25+
StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
26+
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
27+
UInt64Array, UInt8Array,
2828
},
2929
datatypes::{DataType, Field, Schema},
3030
record_batch::RecordBatch,
@@ -64,7 +64,7 @@ fn init() {
6464
// ----------------------
6565

6666
/// What data to use
67-
#[derive(Debug, Clone, Copy)]
67+
#[derive(Debug, Clone)]
6868
enum Scenario {
6969
Timestamps,
7070
Dates,
@@ -84,6 +84,7 @@ enum Scenario {
8484
WithNullValues,
8585
WithNullValuesPageLevel,
8686
UTF8,
87+
Dictionary,
8788
}
8889

8990
enum Unit {
@@ -740,6 +741,54 @@ fn make_utf8_batch(value: Vec<Option<&str>>) -> RecordBatch {
740741
.unwrap()
741742
}
742743

744+
fn make_dictionary_batch(strings: Vec<&str>, integers: Vec<i32>) -> RecordBatch {
745+
let keys = Int32Array::from_iter(0..strings.len() as i32);
746+
let small_keys = Int16Array::from_iter(0..strings.len() as i16);
747+
748+
let utf8_values = StringArray::from(strings.clone());
749+
let utf8_dict = DictionaryArray::new(keys.clone(), Arc::new(utf8_values));
750+
751+
let large_utf8 = LargeStringArray::from(strings.clone());
752+
let large_utf8_dict = DictionaryArray::new(keys.clone(), Arc::new(large_utf8));
753+
754+
let binary =
755+
BinaryArray::from_iter_values(strings.iter().cloned().map(|v| v.as_bytes()));
756+
let binary_dict = DictionaryArray::new(keys.clone(), Arc::new(binary));
757+
758+
let large_binary =
759+
LargeBinaryArray::from_iter_values(strings.iter().cloned().map(|v| v.as_bytes()));
760+
let large_binary_dict = DictionaryArray::new(keys.clone(), Arc::new(large_binary));
761+
762+
let int32 = Int32Array::from_iter_values(integers.clone());
763+
let int32_dict = DictionaryArray::new(small_keys.clone(), Arc::new(int32));
764+
765+
let int64 = Int64Array::from_iter_values(integers.iter().cloned().map(|v| v as i64));
766+
let int64_dict = DictionaryArray::new(keys.clone(), Arc::new(int64));
767+
768+
let uint32 =
769+
UInt32Array::from_iter_values(integers.iter().cloned().map(|v| v as u32));
770+
let uint32_dict = DictionaryArray::new(small_keys.clone(), Arc::new(uint32));
771+
772+
let decimal = Decimal128Array::from_iter_values(
773+
integers.iter().cloned().map(|v| (v * 100) as i128),
774+
)
775+
.with_precision_and_scale(6, 2)
776+
.unwrap();
777+
let decimal_dict = DictionaryArray::new(keys.clone(), Arc::new(decimal));
778+
779+
RecordBatch::try_from_iter(vec![
780+
("utf8", Arc::new(utf8_dict) as _),
781+
("large_utf8", Arc::new(large_utf8_dict) as _),
782+
("binary", Arc::new(binary_dict) as _),
783+
("large_binary", Arc::new(large_binary_dict) as _),
784+
("int32", Arc::new(int32_dict) as _),
785+
("int64", Arc::new(int64_dict) as _),
786+
("uint32", Arc::new(uint32_dict) as _),
787+
("decimal", Arc::new(decimal_dict) as _),
788+
])
789+
.unwrap()
790+
}
791+
743792
fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
744793
match scenario {
745794
Scenario::Timestamps => {
@@ -961,6 +1010,13 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
9611010
]),
9621011
]
9631012
}
1013+
1014+
Scenario::Dictionary => {
1015+
vec![
1016+
make_dictionary_batch(vec!["a", "b", "c", "d", "e"], vec![0, 1, 2, 5, 6]),
1017+
make_dictionary_batch(vec!["f", "g", "h", "i", "j"], vec![0, 1, 3, 8, 9]),
1018+
]
1019+
}
9641020
}
9651021
}
9661022

0 commit comments

Comments
 (0)