Skip to content

Change FieldSummary {upper,lower}_bound to ByteBuf #1369

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 102 additions & 67 deletions crates/iceberg/src/expr/visitors/manifest_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use fnv::FnvHashSet;
use serde_bytes::ByteBuf;

use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit};
use crate::expr::{BoundPredicate, BoundReference};
Expand All @@ -42,13 +43,13 @@ impl ManifestEvaluator {
/// see if this `ManifestFile` could possibly contain data that matches
/// the scan's filter.
pub(crate) fn eval(&self, manifest_file: &ManifestFile) -> Result<bool> {
if manifest_file.partitions.is_empty() {
return Ok(true);
match &manifest_file.partitions {
Some(p) if !p.is_empty() => {
let mut evaluator = ManifestFilterVisitor::new(p);
visit(&mut evaluator, &self.partition_filter)
}
_ => Ok(true),
}

let mut evaluator = ManifestFilterVisitor::new(&manifest_file.partitions);

visit(&mut evaluator, &self.partition_filter)
}
}

Expand Down Expand Up @@ -154,9 +155,19 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
_predicate: &BoundPredicate,
) -> crate::Result<bool> {
let field = self.field_summary_for_reference(reference);

match &field.lower_bound {
Some(bound) if datum <= bound => ROWS_CANNOT_MATCH,
Some(_) => ROWS_MIGHT_MATCH,
Some(bound_bytes) => {
let bound = ManifestFilterVisitor::bytes_to_datum(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liurenjie1024 what's the cost of datum_from_bytes and bytes_to_datum? Do we need to introduce a new type in between?

bound_bytes,
*reference.field().field_type.clone(),
);
if datum <= &bound {
ROWS_CANNOT_MATCH
} else {
ROWS_MIGHT_MATCH
}
}
None => ROWS_CANNOT_MATCH,
}
}
Expand All @@ -169,8 +180,17 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
) -> crate::Result<bool> {
let field = self.field_summary_for_reference(reference);
match &field.lower_bound {
Some(bound) if datum < bound => ROWS_CANNOT_MATCH,
Some(_) => ROWS_MIGHT_MATCH,
Some(bound_bytes) => {
let bound = ManifestFilterVisitor::bytes_to_datum(
bound_bytes,
*reference.field().field_type.clone(),
);
if datum < &bound {
ROWS_CANNOT_MATCH
} else {
ROWS_MIGHT_MATCH
}
}
None => ROWS_CANNOT_MATCH,
}
}
Expand All @@ -183,8 +203,17 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
) -> crate::Result<bool> {
let field = self.field_summary_for_reference(reference);
match &field.upper_bound {
Some(bound) if datum >= bound => ROWS_CANNOT_MATCH,
Some(_) => ROWS_MIGHT_MATCH,
Some(bound_bytes) => {
let bound = ManifestFilterVisitor::bytes_to_datum(
bound_bytes,
*reference.field().field_type.clone(),
);
if datum >= &bound {
ROWS_CANNOT_MATCH
} else {
ROWS_MIGHT_MATCH
}
}
None => ROWS_CANNOT_MATCH,
}
}
Expand All @@ -197,8 +226,17 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
) -> crate::Result<bool> {
let field = self.field_summary_for_reference(reference);
match &field.upper_bound {
Some(bound) if datum > bound => ROWS_CANNOT_MATCH,
Some(_) => ROWS_MIGHT_MATCH,
Some(bound_bytes) => {
let bound = ManifestFilterVisitor::bytes_to_datum(
bound_bytes,
*reference.field().field_type.clone(),
);
if datum > &bound {
ROWS_CANNOT_MATCH
} else {
ROWS_MIGHT_MATCH
}
}
None => ROWS_CANNOT_MATCH,
}
}
Expand All @@ -215,14 +253,22 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
return ROWS_CANNOT_MATCH;
}

if let Some(lower_bound) = &field.lower_bound {
if lower_bound > datum {
if let Some(lower_bound_bytes) = &field.lower_bound {
let lower_bound = ManifestFilterVisitor::bytes_to_datum(
lower_bound_bytes,
*reference.field().field_type.clone(),
);
if &lower_bound > datum {
return ROWS_CANNOT_MATCH;
}
}

if let Some(upper_bound) = &field.upper_bound {
if upper_bound < datum {
if let Some(upper_bound_bytes) = &field.upper_bound {
let upper_bound = ManifestFilterVisitor::bytes_to_datum(
upper_bound_bytes,
*reference.field().field_type.clone(),
);
if &upper_bound < datum {
return ROWS_CANNOT_MATCH;
}
}
Expand Down Expand Up @@ -260,23 +306,15 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
let prefix_len = prefix.len();

if let Some(lower_bound) = &field.lower_bound {
let lower_bound_str = ManifestFilterVisitor::datum_as_str(
lower_bound,
"Cannot perform starts_with on non-string lower bound",
)?;
let min_len = lower_bound_str.len().min(prefix_len);
if prefix.as_bytes().lt(&lower_bound_str.as_bytes()[..min_len]) {
let min_len = lower_bound.len().min(prefix_len);
if prefix.as_bytes().lt(&lower_bound[..min_len]) {
return ROWS_CANNOT_MATCH;
}
}

if let Some(upper_bound) = &field.upper_bound {
let upper_bound_str = ManifestFilterVisitor::datum_as_str(
upper_bound,
"Cannot perform starts_with on non-string upper bound",
)?;
let min_len = upper_bound_str.len().min(prefix_len);
if prefix.as_bytes().gt(&upper_bound_str.as_bytes()[..min_len]) {
let min_len = upper_bound.len().min(prefix_len);
if prefix.as_bytes().gt(&upper_bound[..min_len]) {
return ROWS_CANNOT_MATCH;
}
}
Expand Down Expand Up @@ -305,35 +343,19 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
// not_starts_with will match unless all values must start with the prefix. This happens when
// the lower and upper bounds both start with the prefix.
if let Some(lower_bound) = &field.lower_bound {
let lower_bound_str = ManifestFilterVisitor::datum_as_str(
lower_bound,
"Cannot perform not_starts_with on non-string lower bound",
)?;

// if lower is shorter than the prefix then lower doesn't start with the prefix
if prefix_len > lower_bound_str.len() {
if prefix_len > lower_bound.len() {
return ROWS_MIGHT_MATCH;
}

if prefix
.as_bytes()
.eq(&lower_bound_str.as_bytes()[..prefix_len])
{
if prefix.as_bytes().eq(&lower_bound[..prefix_len]) {
if let Some(upper_bound) = &field.upper_bound {
let upper_bound_str = ManifestFilterVisitor::datum_as_str(
upper_bound,
"Cannot perform not_starts_with on non-string upper bound",
)?;

// if upper is shorter than the prefix then upper can't start with the prefix
if prefix_len > upper_bound_str.len() {
if prefix_len > upper_bound.len() {
return ROWS_MIGHT_MATCH;
}

if prefix
.as_bytes()
.eq(&upper_bound_str.as_bytes()[..prefix_len])
{
if prefix.as_bytes().eq(&upper_bound[..prefix_len]) {
return ROWS_CANNOT_MATCH;
}
}
Expand All @@ -359,13 +381,21 @@ impl BoundPredicateVisitor for ManifestFilterVisitor<'_> {
}

if let Some(lower_bound) = &field.lower_bound {
if literals.iter().all(|datum| lower_bound > datum) {
let lower_bound = ManifestFilterVisitor::bytes_to_datum(
lower_bound,
*reference.field().clone().field_type,
);
if literals.iter().all(|datum| &lower_bound > datum) {
return ROWS_CANNOT_MATCH;
}
}

if let Some(upper_bound) = &field.upper_bound {
if literals.iter().all(|datum| upper_bound < datum) {
let upper_bound = ManifestFilterVisitor::bytes_to_datum(
upper_bound,
*reference.field().clone().field_type,
);
if literals.iter().all(|datum| &upper_bound < datum) {
return ROWS_CANNOT_MATCH;
}
}
Expand Down Expand Up @@ -414,6 +444,11 @@ impl ManifestFilterVisitor<'_> {
};
Ok(bound)
}

fn bytes_to_datum(bytes: &ByteBuf, t: Type) -> Datum {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit should this be in Datum alongside try_from_bytes?

let p = t.as_primitive_type().unwrap();
Datum::try_from_bytes(bytes, p.clone()).unwrap()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -520,8 +555,8 @@ mod test {
FieldSummary {
contains_null: false,
contains_nan: None,
lower_bound: Some(Datum::int(INT_MIN_VALUE)),
upper_bound: Some(Datum::int(INT_MAX_VALUE)),
lower_bound: Some(Datum::int(INT_MIN_VALUE).to_bytes().unwrap()),
upper_bound: Some(Datum::int(INT_MAX_VALUE).to_bytes().unwrap()),
},
// all_nulls_missing_nan
FieldSummary {
Expand All @@ -534,22 +569,22 @@ mod test {
FieldSummary {
contains_null: true,
contains_nan: None,
lower_bound: Some(Datum::string(STRING_MIN_VALUE)),
upper_bound: Some(Datum::string(STRING_MAX_VALUE)),
lower_bound: Some(Datum::string(STRING_MIN_VALUE).to_bytes().unwrap()),
upper_bound: Some(Datum::string(STRING_MAX_VALUE).to_bytes().unwrap()),
},
// no_nulls
FieldSummary {
contains_null: false,
contains_nan: None,
lower_bound: Some(Datum::string(STRING_MIN_VALUE)),
upper_bound: Some(Datum::string(STRING_MAX_VALUE)),
lower_bound: Some(Datum::string(STRING_MIN_VALUE).to_bytes().unwrap()),
upper_bound: Some(Datum::string(STRING_MAX_VALUE).to_bytes().unwrap()),
},
// float
FieldSummary {
contains_null: true,
contains_nan: None,
lower_bound: Some(Datum::float(0.0)),
upper_bound: Some(Datum::float(20.0)),
lower_bound: Some(Datum::float(0.0).to_bytes().unwrap()),
upper_bound: Some(Datum::float(20.0).to_bytes().unwrap()),
},
// all_nulls_double
FieldSummary {
Expand Down Expand Up @@ -583,8 +618,8 @@ mod test {
FieldSummary {
contains_null: false,
contains_nan: Some(false),
lower_bound: Some(Datum::float(0.0)),
upper_bound: Some(Datum::float(20.0)),
lower_bound: Some(Datum::float(0.0).to_bytes().unwrap()),
upper_bound: Some(Datum::float(20.0).to_bytes().unwrap()),
},
// all_nulls_missing_nan_float
FieldSummary {
Expand All @@ -597,15 +632,15 @@ mod test {
FieldSummary {
contains_null: true,
contains_nan: None,
lower_bound: Some(Datum::string(STRING_MIN_VALUE)),
upper_bound: Some(Datum::string(STRING_MIN_VALUE)),
lower_bound: Some(Datum::string(STRING_MIN_VALUE).to_bytes().unwrap()),
upper_bound: Some(Datum::string(STRING_MIN_VALUE).to_bytes().unwrap()),
},
// no_nulls_same_value_a
FieldSummary {
contains_null: false,
contains_nan: None,
lower_bound: Some(Datum::string(STRING_MIN_VALUE)),
upper_bound: Some(Datum::string(STRING_MIN_VALUE)),
lower_bound: Some(Datum::string(STRING_MIN_VALUE).to_bytes().unwrap()),
upper_bound: Some(Datum::string(STRING_MIN_VALUE).to_bytes().unwrap()),
},
]
}
Expand All @@ -625,7 +660,7 @@ mod test {
added_rows_count: None,
existing_rows_count: None,
deleted_rows_count: None,
partitions,
partitions: Some(partitions),
key_metadata: vec![],
}
}
Expand Down
33 changes: 28 additions & 5 deletions crates/iceberg/src/inspect/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use futures::{StreamExt, stream};
use crate::Result;
use crate::arrow::schema_to_arrow_schema;
use crate::scan::ArrowRecordBatchStream;
use crate::spec::{FieldSummary, ListType, NestedField, PrimitiveType, StructType, Type};
use crate::spec::{Datum, FieldSummary, ListType, NestedField, PrimitiveType, StructType, Type};
use crate::table::Table;

/// Manifests table.
Expand Down Expand Up @@ -181,7 +181,20 @@ impl<'a> ManifestsTable<'a> {
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
deleted_delete_files_count
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
self.append_partition_summaries(&mut partition_summaries, &manifest.partitions);

let spec = self
.table
.metadata()
.partition_spec_by_id(manifest.partition_spec_id)
.unwrap();
let spec_struct = spec
.partition_type(self.table.metadata().current_schema())
.unwrap();
self.append_partition_summaries(
&mut partition_summaries,
&manifest.partitions.clone().unwrap_or_else(Vec::new),
spec_struct,
);
}
}

Expand Down Expand Up @@ -230,9 +243,10 @@ impl<'a> ManifestsTable<'a> {
&self,
builder: &mut GenericListBuilder<i32, StructBuilder>,
partitions: &[FieldSummary],
partition_struct: StructType,
) {
let partition_summaries_builder = builder.values();
for summary in partitions {
for (summary, field) in partitions.iter().zip(partition_struct.fields()) {
partition_summaries_builder
.field_builder::<BooleanBuilder>(0)
.unwrap()
Expand All @@ -241,14 +255,23 @@ impl<'a> ManifestsTable<'a> {
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_option(summary.contains_nan);

partition_summaries_builder
.field_builder::<StringBuilder>(2)
.unwrap()
.append_option(summary.lower_bound.as_ref().map(|v| v.to_string()));
.append_option(summary.lower_bound.as_ref().map(|v| {
Datum::try_from_bytes(v, field.field_type.as_primitive_type().unwrap().clone())
.unwrap()
.to_string()
}));
partition_summaries_builder
.field_builder::<StringBuilder>(3)
.unwrap()
.append_option(summary.upper_bound.as_ref().map(|v| v.to_string()));
.append_option(summary.upper_bound.as_ref().map(|v| {
Datum::try_from_bytes(v, field.field_type.as_primitive_type().unwrap().clone())
.unwrap()
.to_string()
}));
partition_summaries_builder.append(true);
}
builder.append(true);
Expand Down
Loading
Loading