Skip to content

Commit 6531341

Browse files
committed
functional way
1 parent 28a2820 commit 6531341

File tree

6 files changed

+47
-61
lines changed

6 files changed

+47
-61
lines changed

datafusion/physical-plan/src/filter.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -410,18 +410,19 @@ impl ExecutionPlan for FilterExec {
410410
}
411411

412412
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
413-
let input_stats = self.input.statistics_by_partition()?;
414-
let mut stats = Vec::with_capacity(input_stats.len());
415-
for input_stat in input_stats {
416-
let stat = Self::statistics_helper(
417-
self.schema(),
418-
input_stat,
419-
self.predicate(),
420-
self.default_selectivity,
421-
)?;
422-
stats.push(stat.project(self.projection.as_ref()));
423-
}
424-
Ok(stats)
413+
self.input
414+
.statistics_by_partition()?
415+
.into_iter()
416+
.map(|input_stat| {
417+
Self::statistics_helper(
418+
self.schema(),
419+
input_stat,
420+
self.predicate(),
421+
self.default_selectivity,
422+
)
423+
.map(|stat| stat.project(self.projection.as_ref()))
424+
})
425+
.collect()
425426
}
426427

427428
fn cardinality_effect(&self) -> CardinalityEffect {

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -355,16 +355,15 @@ impl ExecutionPlan for CrossJoinExec {
355355

356356
// Summarize the `left_stats`
357357
let statistics = compute_summary_statistics(
358-
left_stats.iter(),
358+
left_stats.into_iter(),
359359
self.schema.fields().len(),
360360
|stats| Some(stats),
361361
);
362362

363-
let mut stats = Vec::new();
364-
for right in right_stats.iter() {
365-
stats.push(stats_cartesian_product(statistics.clone(), right.clone()));
366-
}
367-
Ok(stats)
363+
Ok(right_stats
364+
.into_iter()
365+
.map(|right| stats_cartesian_product(statistics.clone(), right))
366+
.collect())
368367
}
369368

370369
/// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done,

datafusion/physical-plan/src/limit.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -348,19 +348,13 @@ impl ExecutionPlan for LocalLimitExec {
348348
}
349349

350350
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
351-
let input_stats = self.input.statistics_by_partition()?;
352-
let mut stats = Vec::with_capacity(input_stats.len());
353-
for input_stat in input_stats {
354-
let stat = Statistics::with_fetch(
355-
input_stat,
356-
self.schema(),
357-
Some(self.fetch),
358-
0,
359-
1,
360-
)?;
361-
stats.push(stat);
362-
}
363-
Ok(stats)
351+
self.input
352+
.statistics_by_partition()?
353+
.into_iter()
354+
.map(|input_stat| {
355+
Statistics::with_fetch(input_stat, self.schema(), Some(self.fetch), 0, 1)
356+
})
357+
.collect()
364358
}
365359

366360
fn fetch(&self) -> Option<usize> {

datafusion/physical-plan/src/projection.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -252,17 +252,18 @@ impl ExecutionPlan for ProjectionExec {
252252
}
253253

254254
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
255-
let input_stats = self.input.statistics_by_partition()?;
256-
let mut stats = Vec::with_capacity(input_stats.len());
257-
for input_stat in input_stats {
258-
let stat = stats_projection(
259-
input_stat.clone(),
260-
self.expr.iter().map(|(e, _)| Arc::clone(e)),
261-
Arc::clone(&self.schema),
262-
);
263-
stats.push(stat);
264-
}
265-
Ok(stats)
255+
Ok(self
256+
.input
257+
.statistics_by_partition()?
258+
.into_iter()
259+
.map(|input_stats| {
260+
stats_projection(
261+
input_stats,
262+
self.expr.iter().map(|(e, _)| Arc::clone(e)),
263+
Arc::clone(&self.schema),
264+
)
265+
})
266+
.collect())
266267
}
267268

268269
fn supports_limit_pushdown(&self) -> bool {

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1274,18 +1274,11 @@ impl ExecutionPlan for SortExec {
12741274
}
12751275

12761276
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
1277-
let input_stats = self.input.statistics_by_partition()?;
1278-
let mut stats = Vec::with_capacity(input_stats.len());
1279-
for stat in input_stats {
1280-
stats.push(Statistics::with_fetch(
1281-
stat,
1282-
self.schema(),
1283-
self.fetch,
1284-
0,
1285-
1,
1286-
)?);
1287-
}
1288-
Ok(stats)
1277+
self.input
1278+
.statistics_by_partition()?
1279+
.into_iter()
1280+
.map(|stat| Statistics::with_fetch(stat, self.schema(), self.fetch, 0, 1))
1281+
.collect()
12891282
}
12901283

12911284
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {

datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -365,13 +365,11 @@ impl ExecutionPlan for BoundedWindowAggExec {
365365
}
366366

367367
fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
368-
let input_stats = self.input.statistics_by_partition()?;
369-
let mut output_stats = Vec::with_capacity(input_stats.len());
370-
for stat in input_stats {
371-
let output_stat = self.statistics_helper(stat.clone())?;
372-
output_stats.push(output_stat);
373-
}
374-
Ok(output_stats)
368+
self.input
369+
.statistics_by_partition()?
370+
.into_iter()
371+
.map(|stat| self.statistics_helper(stat))
372+
.collect()
375373
}
376374
}
377375

0 commit comments

Comments
 (0)