Skip to content

Improve recursive unnest options API #12836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Oct 20, 2024
Merged
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::SchemaReference;
pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{ResolvedTableReference, TableReference};
pub use unnest::UnnestOptions;
pub use unnest::{RecursionUnnestOption, UnnestOptions};
pub use utils::project_schema;

// These are hidden from docs purely to avoid polluting the public view of what this crate exports.
Expand Down
26 changes: 26 additions & 0 deletions datafusion/common/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! [`UnnestOptions`] for unnesting structured types

use crate::Column;

/// Options for unnesting a column that contains a list type,
/// replicating values in the other, non nested rows.
///
Expand Down Expand Up @@ -60,17 +62,35 @@
/// └─────────┘ └─────┘ └─────────┘ └─────┘
/// c1 c2 c1 c2
/// ```
///
/// `recursions` instruct how a column should be unnested (e.g unnesting a column multiple
/// time, with depth = 1 and depth = 2). Any unnested column not being mentioned inside this
/// options is inferred to be unnested with depth = 1
#[derive(Debug, Clone, PartialEq, PartialOrd, Hash, Eq)]
pub struct UnnestOptions {
/// Should nulls in the input be preserved? Defaults to true
pub preserve_nulls: bool,
/// If specific columns need to be unnested multiple times (e.g at different depth),
/// declare them here. Any unnested columns not being mentioned inside this option
/// will be unnested with depth = 1
pub recursions: Vec<RecursionUnnestOption>,
}

/// Instruction on how to unnest a column (mostly with a list type)
/// such as how to name the output, and how many level it should be unnested
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
pub struct RecursionUnnestOption {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please add documentation to this structure as it is public?

pub input_column: Column,
pub output_column: Column,
pub depth: usize,
}

impl Default for UnnestOptions {
fn default() -> Self {
Self {
// default to true to maintain backwards compatible behavior
preserve_nulls: true,
recursions: vec![],
}
}
}
Expand All @@ -87,4 +107,10 @@ impl UnnestOptions {
self.preserve_nulls = preserve_nulls;
self
}

/// Set the recursions for the unnest operation
pub fn with_recursions(mut self, recursion: RecursionUnnestOption) -> Self {
self.recursions.push(recursion);
self
}
}
186 changes: 70 additions & 116 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ use crate::{
TableProviderFilterPushDown, TableSource, WriteOp,
};

use super::dml::InsertOp;
use super::plan::ColumnUnnestList;
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::file_options::file_type::FileType;
Expand All @@ -54,9 +56,6 @@ use datafusion_common::{
};
use datafusion_expr_common::type_coercion::binary::type_union_resolution;

use super::dml::InsertOp;
use super::plan::{ColumnUnnestList, ColumnUnnestType};

/// Default table name for unnamed table
pub const UNNAMED_TABLE: &str = "?table?";

Expand Down Expand Up @@ -1181,7 +1180,7 @@ impl LogicalPlanBuilder {
) -> Result<Self> {
unnest_with_options(
Arc::unwrap_or_clone(self.plan),
vec![(column.into(), ColumnUnnestType::Inferred)],
vec![column.into()],
options,
)
.map(Self::new)
Expand All @@ -1192,26 +1191,6 @@ impl LogicalPlanBuilder {
self,
columns: Vec<Column>,
options: UnnestOptions,
) -> Result<Self> {
unnest_with_options(
Arc::unwrap_or_clone(self.plan),
columns
.into_iter()
.map(|c| (c, ColumnUnnestType::Inferred))
.collect(),
options,
)
.map(Self::new)
}

/// Unnest the given columns with the given [`UnnestOptions`]
/// if one column is a list type, it can be recursively and simultaneously
/// unnested into the desired recursion levels
/// e.g select unnest(list_col,depth=1), unnest(list_col,depth=2)
pub fn unnest_columns_recursive_with_options(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

self,
columns: Vec<(Column, ColumnUnnestType)>,
options: UnnestOptions,
) -> Result<Self> {
unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
.map(Self::new)
Expand Down Expand Up @@ -1589,14 +1568,12 @@ impl TableSource for LogicalTableSource {

/// Create a [`LogicalPlan::Unnest`] plan
pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
let unnestings = columns
.into_iter()
.map(|c| (c, ColumnUnnestType::Inferred))
.collect();
unnest_with_options(input, unnestings, UnnestOptions::default())
unnest_with_options(input, columns, UnnestOptions::default())
}

pub fn get_unnested_list_datatype_recursive(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this method, recursion specification can be declared using UnnestOptions

// Get the data type of a multi-dimensional type after unnesting it
// with a given depth
fn get_unnested_list_datatype_recursive(
data_type: &DataType,
depth: usize,
) -> Result<DataType> {
Expand All @@ -1615,27 +1592,6 @@ pub fn get_unnested_list_datatype_recursive(
internal_err!("trying to unnest on invalid data type {:?}", data_type)
}

/// Infer the unnest type based on the data type:
/// - list type: infer to unnest(list(col, depth=1))
/// - struct type: infer to unnest(struct)
fn infer_unnest_type(
col_name: &String,
data_type: &DataType,
) -> Result<ColumnUnnestType> {
match data_type {
DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
Ok(ColumnUnnestType::List(vec![ColumnUnnestList {
output_column: Column::from_name(col_name),
depth: 1,
}]))
}
DataType::Struct(_) => Ok(ColumnUnnestType::Struct),
_ => {
internal_err!("trying to unnest on invalid data type {:?}", data_type)
}
}
}

pub fn get_struct_unnested_columns(
col_name: &String,
inner_fields: &Fields,
Expand Down Expand Up @@ -1724,20 +1680,15 @@ pub fn get_unnested_columns(
/// ```
pub fn unnest_with_options(
input: LogicalPlan,
columns_to_unnest: Vec<(Column, ColumnUnnestType)>,
columns_to_unnest: Vec<Column>,
options: UnnestOptions,
) -> Result<LogicalPlan> {
let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
let mut struct_columns = vec![];
let indices_to_unnest = columns_to_unnest
.iter()
.map(|col_unnesting| {
Ok((
input.schema().index_of_column(&col_unnesting.0)?,
col_unnesting,
))
})
.collect::<Result<HashMap<usize, &(Column, ColumnUnnestType)>>>()?;
.map(|c| Ok((input.schema().index_of_column(c)?, c)))
.collect::<Result<HashMap<usize, &Column>>>()?;

let input_schema = input.schema();

Expand All @@ -1762,51 +1713,59 @@ pub fn unnest_with_options(
.enumerate()
.map(|(index, (original_qualifier, original_field))| {
match indices_to_unnest.get(&index) {
Some((column_to_unnest, unnest_type)) => {
let mut inferred_unnest_type = unnest_type.clone();
if let ColumnUnnestType::Inferred = unnest_type {
inferred_unnest_type = infer_unnest_type(
Some(column_to_unnest) => {
let recursions_on_column = options
.recursions
.iter()
.filter(|p| -> bool { &p.input_column == *column_to_unnest })
.collect::<Vec<_>>();
let mut transformed_columns = recursions_on_column
.iter()
.map(|r| {
list_columns.push((
index,
ColumnUnnestList {
output_column: r.output_column.clone(),
depth: r.depth,
},
));
Ok(get_unnested_columns(
&r.output_column.name,
original_field.data_type(),
r.depth,
)?
.into_iter()
.next()
.unwrap()) // because unnesting a list column always result into one result
})
.collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
if transformed_columns.is_empty() {
transformed_columns = get_unnested_columns(
&column_to_unnest.name,
original_field.data_type(),
1,
)?;
}
let transformed_columns: Vec<(Column, Arc<Field>)> =
match inferred_unnest_type {
ColumnUnnestType::Struct => {
match original_field.data_type() {
DataType::Struct(_) => {
struct_columns.push(index);
get_unnested_columns(
&column_to_unnest.name,
original_field.data_type(),
1,
)?
}
ColumnUnnestType::List(unnest_lists) => {
list_columns.extend(
unnest_lists
.iter()
.map(|ul| (index, ul.to_owned().clone())),
);
unnest_lists
.iter()
.map(
|ColumnUnnestList {
output_column,
depth,
}| {
get_unnested_columns(
&output_column.name,
original_field.data_type(),
*depth,
)
},
)
.collect::<Result<Vec<Vec<(Column, Arc<Field>)>>>>()?
.into_iter()
.flatten()
.collect::<Vec<_>>()
DataType::List(_)
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_) => {
list_columns.push((
index,
ColumnUnnestList {
output_column: Column::from_name(
&column_to_unnest.name,
),
depth: 1,
},
));
}
_ => return internal_err!("Invalid unnest type"),
_ => {}
};
}

// new columns dependent on the same original index
dependency_indices
.extend(std::iter::repeat(index).take(transformed_columns.len()));
Expand Down Expand Up @@ -1855,7 +1814,7 @@ mod tests {
use crate::logical_plan::StringifiedPlan;
use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};

use datafusion_common::SchemaError;
use datafusion_common::{RecursionUnnestOption, SchemaError};

#[test]
fn plan_builder_simple() -> Result<()> {
Expand Down Expand Up @@ -2263,24 +2222,19 @@ mod tests {

// Simultaneously unnesting a list (with different depth) and a struct column
let plan = nested_table_scan("test_table")?
.unnest_columns_recursive_with_options(
vec![
(
"stringss".into(),
ColumnUnnestType::List(vec![
ColumnUnnestList {
output_column: Column::from_name("stringss_depth_1"),
depth: 1,
},
ColumnUnnestList {
output_column: Column::from_name("stringss_depth_2"),
depth: 2,
},
]),
),
("struct_singular".into(), ColumnUnnestType::Inferred),
],
UnnestOptions::default(),
.unnest_columns_with_options(
vec!["stringss".into(), "struct_singular".into()],
UnnestOptions::default()
.with_recursions(RecursionUnnestOption {
input_column: "stringss".into(),
output_column: "stringss_depth_1".into(),
depth: 1,
})
.with_recursions(RecursionUnnestOption {
input_column: "stringss".into(),
output_column: "stringss_depth_2".into(),
depth: 2,
}),
)?
.build()?;

Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub use ddl::{
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
projection_schema, Aggregate, Analyze, ColumnUnnestList, ColumnUnnestType, CrossJoin,
DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, Extension, Filter, Join,
projection_schema, Aggregate, Analyze, ColumnUnnestList, CrossJoin, DescribeTable,
Distinct, DistinctOn, EmptyRelation, Explain, Extension, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Projection, RecursiveQuery, Repartition, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
Expand Down
Loading