Skip to content

Commit

Permalink
feat: handle partition filters via kernel expressions
Browse files Browse the repository at this point in the history
Signed-off-by: Robert Pack <[email protected]>
  • Loading branch information
roeap committed Jan 3, 2025
1 parent 556be92 commit b946f92
Show file tree
Hide file tree
Showing 12 changed files with 519 additions and 408 deletions.
1 change: 1 addition & 0 deletions crates/core/src/kernel/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use lazy_static::lazy_static;
pub(crate) mod extract;
pub(crate) mod json;

pub(crate) const LIST_ARRAY_ROOT: &str = "element";
const MAP_ROOT_DEFAULT: &str = "key_value";
const MAP_KEY_DEFAULT: &str = "key";
const MAP_VALUE_DEFAULT: &str = "value";
Expand Down
290 changes: 197 additions & 93 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;

use arrow::array::AsArray;
use arrow_array::{Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray};
use arrow_select::filter::filter_record_batch;
use chrono::{DateTime, Utc};
Expand All @@ -26,22 +26,10 @@ const COL_MIN_VALUES: &str = "minValues";
const COL_MAX_VALUES: &str = "maxValues";
const COL_NULL_COUNT: &str = "nullCount";

pub(crate) type PartitionFields<'a> = Arc<IndexMap<&'a str, &'a StructField>>;
pub(crate) type PartitionValues<'a> = IndexMap<&'a str, Scalar>;

pub(crate) trait PartitionsExt {
fn hive_partition_path(&self) -> String;
}

impl PartitionsExt for IndexMap<&str, Scalar> {
fn hive_partition_path(&self) -> String {
self.iter()
.map(|(k, v)| format!("{k}={}", v.serialize_encoded()))
.collect::<Vec<_>>()
.join("/")
}
}

impl PartitionsExt for IndexMap<String, Scalar> {
fn hive_partition_path(&self) -> String {
self.iter()
Expand Down Expand Up @@ -163,8 +151,6 @@ pub struct LogicalFile<'a> {

/// Pointer to a specific row in the log data.
index: usize,
/// Schema fields the table is partitioned by.
partition_fields: PartitionFields<'a>,
}

impl LogicalFile<'_> {
Expand Down Expand Up @@ -214,61 +200,6 @@ impl LogicalFile<'_> {
})
}

/// The partition values for this logical file.
pub fn partition_values(&self) -> DeltaResult<PartitionValues<'_>> {
if self.partition_fields.is_empty() {
return Ok(IndexMap::new());
}
let map_value = self.partition_values.value(self.index);
let keys = map_value
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.ok_or(DeltaTableError::generic(
"expected partition values key field to be of type string",
))?;
let values = map_value
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.ok_or(DeltaTableError::generic(
"expected partition values value field to be of type string",
))?;

let values = keys
.iter()
.zip(values.iter())
.map(|(k, v)| {
let (key, field) = self.partition_fields.get_key_value(k.unwrap()).unwrap();
let field_type = match field.data_type() {
DataType::Primitive(p) => Ok(p),
_ => Err(DeltaTableError::generic(
"nested partitioning values are not supported",
)),
}?;
Ok((
*key,
v.map(|vv| field_type.parse_scalar(vv))
.transpose()?
.unwrap_or(Scalar::Null(field.data_type().clone())),
))
})
.collect::<DeltaResult<HashMap<_, _>>>()?;

// NOTE: we recreate the map as a IndexMap to ensure the order of the keys is consistently
// the same as the order of partition fields.
self.partition_fields
.iter()
.map(|(k, f)| {
let val = values
.get(*k)
.cloned()
.unwrap_or(Scalar::Null(f.data_type.clone()));
Ok((*k, val))
})
.collect::<DeltaResult<IndexMap<_, _>>>()
}

/// Defines a deletion vector
pub fn deletion_vector(&self) -> Option<DeletionVectorView<'_>> {
self.deletion_vector.as_ref().and_then(|arr| {
Expand Down Expand Up @@ -346,11 +277,11 @@ impl LogicalFile<'_> {
impl<'a> TryFrom<&LogicalFile<'a>> for ObjectMeta {
type Error = DeltaTableError;

fn try_from(file_stats: &LogicalFile<'a>) -> Result<Self, Self::Error> {
fn try_from(value: &LogicalFile<'a>) -> Result<Self, Self::Error> {
Ok(ObjectMeta {
location: file_stats.object_store_path(),
size: file_stats.size() as usize,
last_modified: file_stats.modification_datetime()?,
location: value.object_store_path(),
size: value.size() as usize,
last_modified: value.modification_datetime()?,
version: None,
e_tag: None,
})
Expand All @@ -359,7 +290,6 @@ impl<'a> TryFrom<&LogicalFile<'a>> for ObjectMeta {

/// Helper for processing data from the materialized Delta log.
pub struct FileStatsAccessor<'a> {
partition_fields: PartitionFields<'a>,
data: Arc<RecordBatch>,
sizes: &'a Int64Array,
stats: &'a StructArray,
Expand Down Expand Up @@ -425,22 +355,6 @@ impl<'a> FileStatsAccessor<'a> {
let partition_values = extract_and_cast::<MapArray>(data, "add.partitionValues")?;
let partition_values_parsed =
extract_and_cast_opt::<StructArray>(data, "add.partitionValues_parsed");
let partition_fields = Arc::new(
metadata
.partition_columns
.iter()
.map(|c| {
Ok((
c.as_str(),
schema
.field(c.as_str())
.ok_or(DeltaTableError::PartitionError {
partition: c.clone(),
})?,
))
})
.collect::<DeltaResult<IndexMap<_, _>>>()?,
);
let deletion_vector = extract_and_cast_opt::<StructArray>(data, "add.deletionVector");
let deletion_vector = deletion_vector.and_then(|dv| {
if dv.null_count() == dv.len() {
Expand All @@ -463,7 +377,6 @@ impl<'a> FileStatsAccessor<'a> {
});

Ok(Self {
partition_fields,
data: Arc::new(result),
sizes,
stats,
Expand All @@ -486,7 +399,6 @@ impl<'a> FileStatsAccessor<'a> {
data: self.data.clone(),
partition_values: self.partition_values,
partition_values_parsed: self.partition_values_parsed.clone(),
partition_fields: self.partition_fields.clone(),
stats: self.stats,
deletion_vector: self.deletion_vector.clone(),
index,
Expand All @@ -508,6 +420,198 @@ impl<'a> Iterator for FileStatsAccessor<'a> {
}
}

pub struct LogFileView<'a> {
data: &'a RecordBatch,
curr: Option<usize>,
}

impl LogFileView<'_> {
fn index(&self) -> usize {
self.curr.expect("index initialized")
}

/// Path to the files storage location.
pub fn path(&self) -> Cow<'_, str> {
percent_decode_str(pick::<StringArray>(&self.data, 0).value(self.index()))
.decode_utf8_lossy()
}

/// An object store [`Path`] to the file.
///
/// this tries to parse the file string and if that fails, it will return the string as is.
// TODO assert consistent handling of the paths encoding when reading log data so this logic can be removed.
pub fn object_store_path(&self) -> Path {
let path = self.path();
// Try to preserve percent encoding if possible
match Path::parse(path.as_ref()) {
Ok(path) => path,
Err(_) => Path::from(path.as_ref()),
}
}

/// File size stored on disk.
pub fn size(&self) -> i64 {
pick::<Int64Array>(&self.data, 1).value(self.index())
}

/// Last modified time of the file.
pub fn modification_time(&self) -> i64 {
pick::<Int64Array>(&self.data, 2).value(self.index())
}

/// Datetime of the last modification time of the file.
pub fn modification_datetime(&self) -> DeltaResult<chrono::DateTime<Utc>> {
DateTime::from_timestamp_millis(self.modification_time()).ok_or(DeltaTableError::from(
crate::protocol::ProtocolError::InvalidField(format!(
"invalid modification_time: {:?}",
self.modification_time()
)),
))
}

pub fn partition_values(&self) -> Option<StructData> {
self.data
.column_by_name("partition_values")
.and_then(|c| c.as_struct_opt())
.and_then(|arr| match Scalar::from_array(arr, self.index()) {
Some(Scalar::Struct(s)) => Some(s),
_ => None,
})
}

fn stats(&self) -> Option<&StructArray> {
self.data
.column_by_name("stats")
.and_then(|c| c.as_struct_opt())
}

/// The number of records stored in the data file.
pub fn num_records(&self) -> Option<usize> {
self.stats().and_then(|c| {
c.column_by_name(COL_NUM_RECORDS)
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
.map(|a| a.value(self.index()) as usize)
})
}

/// Struct containing all available null counts for the columns in this file.
pub fn null_counts(&self) -> Option<Scalar> {
self.stats().and_then(|c| {
c.column_by_name(COL_NULL_COUNT)
.and_then(|c| Scalar::from_array(c.as_ref(), self.index()))
})
}

/// Struct containing all available min values for the columns in this file.
pub fn min_values(&self) -> Option<Scalar> {
self.stats().and_then(|c| {
c.column_by_name(COL_MIN_VALUES)
.and_then(|c| Scalar::from_array(c.as_ref(), self.index()))
})
}

/// Struct containing all available max values for the columns in this file.
pub fn max_values(&self) -> Option<Scalar> {
self.stats().and_then(|c| {
c.column_by_name(COL_MAX_VALUES)
.and_then(|c| Scalar::from_array(c.as_ref(), self.index()))
})
}
}

impl<'a> Iterator for LogFileView<'a> {
type Item = LogFileView<'a>;

fn next(&mut self) -> Option<Self::Item> {
if self.data.num_rows() < 1 {
return None;
}
if self.curr.is_some() && self.index() >= self.data.num_rows() - 1 {
return None;
}
self.curr = self.curr.map(|c| c + 1).or(Some(0));
Some(Self {
data: self.data,
curr: self.curr,
})
}
}

impl<'a> TryFrom<&LogFileView<'a>> for ObjectMeta {
type Error = DeltaTableError;

fn try_from(value: &LogFileView<'a>) -> Result<Self, Self::Error> {
Ok(ObjectMeta {
location: value.object_store_path(),
size: value.size() as usize,
last_modified: value.modification_datetime()?,
version: None,
e_tag: None,
})
}
}

pub struct LogDataView {
data: RecordBatch,
metadata: Metadata,
schema: StructType,
}

impl LogDataView {
pub(crate) fn new(data: RecordBatch, metadata: Metadata, schema: StructType) -> Self {
Self {
data,
metadata,
schema,
}
}

fn partition_data(&self) -> Option<RecordBatch> {
self.data
.column_by_name("partition_values")
.and_then(|c| c.as_any().downcast_ref::<StructArray>())
.map(|c| c.into())
}

pub fn with_partition_filter(self, predicate: Option<&Expression>) -> DeltaResult<Self> {
if let (Some(pred), Some(data)) = (predicate, self.partition_data()) {
let data = ArrowEngineData::new(data);
let evaluator = ARROW_HANDLER.get_evaluator(
Arc::new(data.record_batch().schema_ref().as_ref().try_into()?),
pred.clone(),
DataType::BOOLEAN,
);
let result = ArrowEngineData::try_from_engine_data(evaluator.evaluate(&data)?)?;
let filter = result.record_batch().column(0).as_boolean();
return Ok(Self {
data: filter_record_batch(&self.data, filter)?,
metadata: self.metadata,
schema: self.schema,
});
}
Ok(self)
}

pub fn iter(&self) -> impl Iterator<Item = LogFileView<'_>> {
LogFileView {
data: &self.data,
curr: None,
}
}
}

impl<'a> IntoIterator for &'a LogDataView {
type Item = LogFileView<'a>;
type IntoIter = LogFileView<'a>;

fn into_iter(self) -> Self::IntoIter {
LogFileView {
data: &self.data,
curr: None,
}
}
}

/// Provides semanitc access to the log data.
///
/// This is a helper struct that provides access to the log data in a more semantic way
Expand Down
Loading

0 comments on commit b946f92

Please sign in to comment.