Skip to content

Commit 089a42a

Browse files
authored
Minor: Add Column::from(Tableref, &FieldRef), Expr::from(Column) and Expr::from(Tableref, &FieldRef) (#10178)
* Minor: Add `Column::from(Tableref, &FieldRef)` * Add Expr::from() * fix docs * Fix doc test
1 parent da82cec commit 089a42a

File tree

12 files changed

+68
-45
lines changed

12 files changed

+68
-45
lines changed

benchmarks/src/tpch/convert.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,7 @@ impl ConvertOpt {
8888
.schema()
8989
.iter()
9090
.take(schema.fields.len() - 1)
91-
.map(|(qualifier, field)| {
92-
Expr::Column(Column::from((qualifier, field.as_ref())))
93-
})
91+
.map(Expr::from)
9492
.collect();
9593

9694
csv = csv.select(selection)?;

datafusion/common/src/column.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
//! Column
1919
20-
use arrow_schema::Field;
20+
use arrow_schema::{Field, FieldRef};
2121

2222
use crate::error::_schema_err;
2323
use crate::utils::{parse_identifiers_normalized, quote_identifier};
@@ -63,6 +63,8 @@ impl Column {
6363
}
6464

6565
/// Create Column from unqualified name.
66+
///
67+
/// Alias for `Column::new_unqualified`
6668
pub fn from_name(name: impl Into<String>) -> Self {
6769
Self {
6870
relation: None,
@@ -346,6 +348,13 @@ impl From<(Option<&TableReference>, &Field)> for Column {
346348
}
347349
}
348350

351+
/// Create a column, use qualifier and field name
352+
impl From<(Option<&TableReference>, &FieldRef)> for Column {
353+
fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
354+
Self::new(relation.cloned(), field.name())
355+
}
356+
}
357+
349358
impl FromStr for Column {
350359
type Err = Infallible;
351360

datafusion/core/src/dataframe/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1332,7 +1332,7 @@ impl DataFrame {
13321332
col_exists = true;
13331333
new_column.clone()
13341334
} else {
1335-
col(Column::from((qualifier, field.as_ref())))
1335+
col(Column::from((qualifier, field)))
13361336
}
13371337
})
13381338
.collect();
@@ -1402,9 +1402,9 @@ impl DataFrame {
14021402
.iter()
14031403
.map(|(qualifier, field)| {
14041404
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
1405-
col(Column::from((qualifier, field.as_ref()))).alias(new_name)
1405+
col(Column::from((qualifier, field))).alias(new_name)
14061406
} else {
1407-
col(Column::from((qualifier, field.as_ref())))
1407+
col(Column::from((qualifier, field)))
14081408
}
14091409
})
14101410
.collect::<Vec<_>>();

datafusion/core/src/physical_planner.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1261,15 +1261,8 @@ impl DefaultPhysicalPlanner {
12611261

12621262
// Remove temporary projected columns
12631263
if left_projected || right_projected {
1264-
let final_join_result = join_schema
1265-
.iter()
1266-
.map(|(qualifier, field)| {
1267-
Expr::Column(datafusion_common::Column::from((
1268-
qualifier,
1269-
field.as_ref(),
1270-
)))
1271-
})
1272-
.collect::<Vec<_>>();
1264+
let final_join_result =
1265+
join_schema.iter().map(Expr::from).collect::<Vec<_>>();
12731266
let projection = LogicalPlan::Projection(Projection::try_new(
12741267
final_join_result,
12751268
Arc::new(new_join),

datafusion/expr/src/expr.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::{
3232
Signature,
3333
};
3434

35-
use arrow::datatypes::DataType;
35+
use arrow::datatypes::{DataType, FieldRef};
3636
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3737
use datafusion_common::{
3838
internal_err, plan_err, Column, DFSchema, Result, ScalarValue, TableReference,
@@ -84,6 +84,29 @@ use sqlparser::ast::NullTreatment;
8484
/// assert_eq!(binary_expr.op, Operator::Eq);
8585
/// }
8686
/// ```
87+
///
88+
/// ## Return a list of [`Expr::Column`] from a schema's columns
89+
/// ```
90+
/// # use arrow::datatypes::{DataType, Field, Schema};
91+
/// # use datafusion_common::{DFSchema, Column};
92+
/// # use datafusion_expr::Expr;
93+
///
94+
/// let arrow_schema = Schema::new(vec![
95+
/// Field::new("c1", DataType::Int32, false),
96+
/// Field::new("c2", DataType::Float64, false),
97+
/// ]);
98+
/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
99+
///
100+
/// // Form a list of expressions for each item in the schema
101+
/// let exprs: Vec<_> = df_schema.iter()
102+
/// .map(Expr::from)
103+
/// .collect();
104+
///
105+
/// assert_eq!(exprs, vec![
106+
/// Expr::from(Column::from_qualified_name("t1.c1")),
107+
/// Expr::from(Column::from_qualified_name("t1.c2")),
108+
/// ]);
109+
/// ```
87110
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
88111
pub enum Expr {
89112
/// An expression with a specific name.
@@ -190,6 +213,23 @@ impl Default for Expr {
190213
}
191214
}
192215

216+
/// Create an [`Expr`] from a [`Column`]
217+
impl From<Column> for Expr {
218+
fn from(value: Column) -> Self {
219+
Expr::Column(value)
220+
}
221+
}
222+
223+
/// Create an [`Expr`] from an optional qualifier and a [`FieldRef`]. This is
224+
/// useful for creating [`Expr`] from a [`DFSchema`].
225+
///
226+
/// See example on [`Expr`]
227+
impl<'a> From<(Option<&'a TableReference>, &'a FieldRef)> for Expr {
228+
fn from(value: (Option<&'a TableReference>, &'a FieldRef)) -> Self {
229+
Expr::from(Column::from(value))
230+
}
231+
}
232+
193233
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
194234
pub struct Unnest {
195235
pub expr: Box<Expr>,

datafusion/expr/src/expr_rewriter/mod.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -218,13 +218,7 @@ pub fn coerce_plan_expr_for_schema(
218218
Ok(LogicalPlan::Projection(projection))
219219
}
220220
_ => {
221-
let exprs: Vec<Expr> = plan
222-
.schema()
223-
.iter()
224-
.map(|(qualifier, field)| {
225-
Expr::Column(Column::from((qualifier, field.as_ref())))
226-
})
227-
.collect();
221+
let exprs: Vec<Expr> = plan.schema().iter().map(Expr::from).collect();
228222

229223
let new_exprs = coerce_exprs_for_schema(exprs, plan.schema(), schema)?;
230224
let add_project = new_exprs.iter().any(|expr| expr.try_into_col().is_err());

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1577,7 +1577,7 @@ pub fn unnest_with_options(
15771577
return Ok(input);
15781578
}
15791579
};
1580-
qualified_columns.push(Column::from((unnest_qualifier, unnested_field.as_ref())));
1580+
qualified_columns.push(Column::from((unnest_qualifier, &unnested_field)));
15811581
unnested_fields.insert(index, unnested_field);
15821582
}
15831583

datafusion/expr/src/utils.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -356,12 +356,7 @@ fn get_exprs_except_skipped(
356356
columns_to_skip: HashSet<Column>,
357357
) -> Vec<Expr> {
358358
if columns_to_skip.is_empty() {
359-
schema
360-
.iter()
361-
.map(|(qualifier, field)| {
362-
Expr::Column(Column::from((qualifier, field.as_ref())))
363-
})
364-
.collect::<Vec<Expr>>()
359+
schema.iter().map(Expr::from).collect::<Vec<Expr>>()
365360
} else {
366361
schema
367362
.columns()
@@ -855,7 +850,7 @@ pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> {
855850
match expr {
856851
Expr::Column(col) => {
857852
let (qualifier, field) = plan.schema().qualified_field_from_column(col)?;
858-
Ok(Expr::Column(Column::from((qualifier, field))))
853+
Ok(Expr::from(Column::from((qualifier, field))))
859854
}
860855
_ => Ok(Expr::Column(Column::from_name(expr.display_name()?))),
861856
}

datafusion/optimizer/src/common_subexpr_eliminate.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ fn build_common_expr_project_plan(
506506

507507
for (qualifier, field) in input.schema().iter() {
508508
if fields_set.insert(qualified_name(qualifier, field.name())) {
509-
project_exprs.push(Expr::Column(Column::from((qualifier, field.as_ref()))));
509+
project_exprs.push(Expr::from((qualifier, field)));
510510
}
511511
}
512512

@@ -525,10 +525,7 @@ fn build_recover_project_plan(
525525
schema: &DFSchema,
526526
input: LogicalPlan,
527527
) -> Result<LogicalPlan> {
528-
let col_exprs = schema
529-
.iter()
530-
.map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field.as_ref()))))
531-
.collect();
528+
let col_exprs = schema.iter().map(Expr::from).collect();
532529
Ok(LogicalPlan::Projection(Projection::try_new(
533530
col_exprs,
534531
Arc::new(input),

datafusion/optimizer/src/replace_distinct_aggregate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
127127
.skip(on_expr.len())
128128
.zip(schema.iter())
129129
.map(|((new_qualifier, new_field), (old_qualifier, old_field))| {
130-
Ok(col(Column::from((new_qualifier, new_field.as_ref())))
130+
Ok(col(Column::from((new_qualifier, new_field)))
131131
.alias_qualified(old_qualifier.cloned(), old_field.name()))
132132
})
133133
.collect::<Result<Vec<Expr>>>()?;

datafusion/sql/src/expr/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use sqlparser::ast::{ArrayAgg, Expr as SQLExpr, JsonOperator, TrimWhereField, Va
2121
use sqlparser::parser::ParserError::ParserError;
2222

2323
use datafusion_common::{
24-
internal_datafusion_err, internal_err, not_impl_err, plan_err, Column, DFSchema,
25-
Result, ScalarValue,
24+
internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, Result,
25+
ScalarValue,
2626
};
2727
use datafusion_expr::expr::AggregateFunctionDefinition;
2828
use datafusion_expr::expr::InList;
@@ -142,9 +142,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
142142
}
143143
_ => false,
144144
}) {
145-
Some((qualifier, df_field)) => {
146-
Expr::Column(Column::from((qualifier, df_field.as_ref())))
147-
}
145+
Some((qualifier, df_field)) => Expr::from((qualifier, df_field)),
148146
None => Expr::Column(col),
149147
}
150148
}

datafusion/sql/src/statement.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,8 +1307,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
13071307
))
13081308
} else {
13091309
datafusion_expr::Expr::Column(Column::from((
1310-
qualifier,
1311-
field.as_ref(),
1310+
qualifier, field,
13121311
)))
13131312
}
13141313
}

0 commit comments

Comments
 (0)