Skip to content

Commit 9005585

Browse files
Deprecate LexOrderingRef and LexRequirementRef (#13233)
* converted LexOrderingRef to &LexOrdering * using LexOrdering::from_ref fn instead of directly cloning it * using as_ref instread of & * using as_ref * removed commented code * updated cargo lock * updated LexRequirementRef to &LexRequirement * fixed clippy issues * fixed taplo error for cargo.toml in physical-expr-common * removed commented code * fixed clippy errors * fixed clippy error * fixes * removed LexOrdering::from_ref instead using clone and created LexOrdering::empty() fn * Update mod.rs --------- Co-authored-by: Berkay Şahin <[email protected]> Co-authored-by: berkaysynnada <[email protected]>
1 parent 4f169ec commit 9005585

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+396
-303
lines changed

benchmarks/src/sort.rs

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};
2222

2323
use arrow::util::pretty;
2424
use datafusion::common::Result;
25-
use datafusion::physical_expr::{LexOrdering, LexOrderingRef, PhysicalSortExpr};
25+
use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr};
2626
use datafusion::physical_plan::collect;
2727
use datafusion::physical_plan::sorts::sort::SortExec;
2828
use datafusion::prelude::{SessionConfig, SessionContext};
@@ -70,31 +70,28 @@ impl RunOpt {
7070
let sort_cases = vec![
7171
(
7272
"sort utf8",
73-
vec![PhysicalSortExpr {
73+
LexOrdering::new(vec![PhysicalSortExpr {
7474
expr: col("request_method", &schema)?,
7575
options: Default::default(),
76-
}],
76+
}]),
7777
),
7878
(
7979
"sort int",
80-
vec![PhysicalSortExpr {
81-
expr: col("request_bytes", &schema)?,
80+
LexOrdering::new(vec![PhysicalSortExpr {
81+
expr: col("response_bytes", &schema)?,
8282
options: Default::default(),
83-
}],
83+
}]),
8484
),
8585
(
8686
"sort decimal",
87-
vec![
88-
// sort decimal
89-
PhysicalSortExpr {
90-
expr: col("decimal_price", &schema)?,
91-
options: Default::default(),
92-
},
93-
],
87+
LexOrdering::new(vec![PhysicalSortExpr {
88+
expr: col("decimal_price", &schema)?,
89+
options: Default::default(),
90+
}]),
9491
),
9592
(
9693
"sort integer tuple",
97-
vec![
94+
LexOrdering::new(vec![
9895
PhysicalSortExpr {
9996
expr: col("request_bytes", &schema)?,
10097
options: Default::default(),
@@ -103,11 +100,11 @@ impl RunOpt {
103100
expr: col("response_bytes", &schema)?,
104101
options: Default::default(),
105102
},
106-
],
103+
]),
107104
),
108105
(
109106
"sort utf8 tuple",
110-
vec![
107+
LexOrdering::new(vec![
111108
// sort utf8 tuple
112109
PhysicalSortExpr {
113110
expr: col("service", &schema)?,
@@ -125,11 +122,11 @@ impl RunOpt {
125122
expr: col("image", &schema)?,
126123
options: Default::default(),
127124
},
128-
],
125+
]),
129126
),
130127
(
131128
"sort mixed tuple",
132-
vec![
129+
LexOrdering::new(vec![
133130
PhysicalSortExpr {
134131
expr: col("service", &schema)?,
135132
options: Default::default(),
@@ -142,7 +139,7 @@ impl RunOpt {
142139
expr: col("decimal_price", &schema)?,
143140
options: Default::default(),
144141
},
145-
],
142+
]),
146143
),
147144
];
148145
for (title, expr) in sort_cases {
@@ -170,13 +167,13 @@ impl RunOpt {
170167

171168
async fn exec_sort(
172169
ctx: &SessionContext,
173-
expr: LexOrderingRef<'_>,
170+
expr: &LexOrdering,
174171
test_file: &TestParquetFile,
175172
debug: bool,
176173
) -> Result<(usize, std::time::Duration)> {
177174
let start = Instant::now();
178175
let scan = test_file.create_scan(ctx, None).await?;
179-
let exec = Arc::new(SortExec::new(LexOrdering::new(expr.to_owned()), scan));
176+
let exec = Arc::new(SortExec::new(expr.clone(), scan));
180177
let task_ctx = ctx.task_ctx();
181178
let result = collect(exec, task_ctx).await?;
182179
let elapsed = start.elapsed();

datafusion-cli/Cargo.lock

Lines changed: 17 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/src/datasource/physical_plan/file_scan_config.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef};
3535
use datafusion_common::stats::Precision;
3636
use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics};
3737
use datafusion_physical_expr::LexOrdering;
38-
use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
3938

4039
use log::warn;
4140

@@ -308,7 +307,7 @@ impl FileScanConfig {
308307
pub fn split_groups_by_statistics(
309308
table_schema: &SchemaRef,
310309
file_groups: &[Vec<PartitionedFile>],
311-
sort_order: LexOrderingRef,
310+
sort_order: &LexOrdering,
312311
) -> Result<Vec<Vec<PartitionedFile>>> {
313312
let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
314313
// First Fit:
@@ -1113,17 +1112,19 @@ mod tests {
11131112
))))
11141113
.collect::<Vec<_>>(),
11151114
));
1116-
let sort_order = case
1117-
.sort
1118-
.into_iter()
1119-
.map(|expr| {
1120-
crate::physical_planner::create_physical_sort_expr(
1121-
&expr,
1122-
&DFSchema::try_from(table_schema.as_ref().clone())?,
1123-
&ExecutionProps::default(),
1124-
)
1125-
})
1126-
.collect::<Result<Vec<_>>>()?;
1115+
let sort_order = LexOrdering {
1116+
inner: case
1117+
.sort
1118+
.into_iter()
1119+
.map(|expr| {
1120+
crate::physical_planner::create_physical_sort_expr(
1121+
&expr,
1122+
&DFSchema::try_from(table_schema.as_ref().clone())?,
1123+
&ExecutionProps::default(),
1124+
)
1125+
})
1126+
.collect::<Result<Vec<_>>>()?,
1127+
};
11271128

11281129
let partitioned_files =
11291130
case.files.into_iter().map(From::from).collect::<Vec<_>>();

datafusion/core/src/datasource/physical_plan/statistics.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use arrow_array::RecordBatch;
3636
use arrow_schema::SchemaRef;
3737
use datafusion_common::{DataFusionError, Result};
3838
use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
39-
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
39+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
4040

4141
/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison.
4242
/// The min/max values are ordered by [`Self::sort_order`].
@@ -50,7 +50,7 @@ pub(crate) struct MinMaxStatistics {
5050
impl MinMaxStatistics {
5151
/// Sort order used to sort the statistics
5252
#[allow(unused)]
53-
pub fn sort_order(&self) -> LexOrderingRef {
53+
pub fn sort_order(&self) -> &LexOrdering {
5454
&self.sort_order
5555
}
5656

@@ -66,8 +66,8 @@ impl MinMaxStatistics {
6666
}
6767

6868
pub fn new_from_files<'a>(
69-
projected_sort_order: LexOrderingRef, // Sort order with respect to projected schema
70-
projected_schema: &SchemaRef, // Projected schema
69+
projected_sort_order: &LexOrdering, // Sort order with respect to projected schema
70+
projected_schema: &SchemaRef, // Projected schema
7171
projection: Option<&[usize]>, // Indices of projection in full table schema (None = all columns)
7272
files: impl IntoIterator<Item = &'a PartitionedFile>,
7373
) -> Result<Self> {
@@ -119,15 +119,17 @@ impl MinMaxStatistics {
119119
projected_schema
120120
.project(&(sort_columns.iter().map(|c| c.index()).collect::<Vec<_>>()))?,
121121
);
122-
let min_max_sort_order = sort_columns
123-
.iter()
124-
.zip(projected_sort_order.iter())
125-
.enumerate()
126-
.map(|(i, (col, sort))| PhysicalSortExpr {
127-
expr: Arc::new(Column::new(col.name(), i)),
128-
options: sort.options,
129-
})
130-
.collect::<Vec<_>>();
122+
let min_max_sort_order = LexOrdering {
123+
inner: sort_columns
124+
.iter()
125+
.zip(projected_sort_order.iter())
126+
.enumerate()
127+
.map(|(i, (col, sort))| PhysicalSortExpr {
128+
expr: Arc::new(Column::new(col.name(), i)),
129+
options: sort.options,
130+
})
131+
.collect::<Vec<_>>(),
132+
};
131133

132134
let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
133135
.iter()
@@ -167,7 +169,7 @@ impl MinMaxStatistics {
167169
}
168170

169171
pub fn new(
170-
sort_order: LexOrderingRef,
172+
sort_order: &LexOrdering,
171173
schema: &SchemaRef,
172174
min_values: RecordBatch,
173175
max_values: RecordBatch,
@@ -257,7 +259,7 @@ impl MinMaxStatistics {
257259
Ok(Self {
258260
min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
259261
max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
260-
sort_order: LexOrdering::from_ref(sort_order),
262+
sort_order: sort_order.clone(),
261263
})
262264
}
263265

@@ -278,7 +280,7 @@ impl MinMaxStatistics {
278280
}
279281

280282
fn sort_columns_from_physical_sort_exprs(
281-
sort_order: LexOrderingRef,
283+
sort_order: &LexOrdering,
282284
) -> Option<Vec<&Column>> {
283285
sort_order
284286
.iter()

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,12 @@ use datafusion_physical_expr::utils::map_columns_before_projection;
5252
use datafusion_physical_expr::{
5353
physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
5454
};
55-
use datafusion_physical_expr_common::sort_expr::LexOrdering;
5655
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
5756
use datafusion_physical_optimizer::PhysicalOptimizerRule;
5857
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
5958
use datafusion_physical_plan::ExecutionPlanProperties;
6059

60+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
6161
use itertools::izip;
6262

6363
/// The `EnforceDistribution` rule ensures that distribution requirements are
@@ -936,7 +936,11 @@ fn add_spm_on_top(input: DistributionContext) -> DistributionContext {
936936

937937
let new_plan = if should_preserve_ordering {
938938
Arc::new(SortPreservingMergeExec::new(
939-
LexOrdering::from_ref(input.plan.output_ordering().unwrap_or(&[])),
939+
input
940+
.plan
941+
.output_ordering()
942+
.unwrap_or(&LexOrdering::default())
943+
.clone(),
940944
input.plan.clone(),
941945
)) as _
942946
} else {

0 commit comments

Comments
 (0)