Skip to content

Commit bf46f33

Browse files
authored
Reuse bulk serialization helpers for protobuf (#12179)
Reuse `parse_exprs` and `serialize_exprs`. Reduce code duplication, increasing readability.
1 parent a616e88 commit bf46f33

File tree

1 file changed

+60
-145
lines changed
  • datafusion/proto/src/logical_plan

1 file changed

+60
-145
lines changed

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 60 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::{
2929
},
3030
};
3131

32-
use crate::protobuf::{proto_error, FromProtoError, ToProtoError};
32+
use crate::protobuf::{proto_error, ToProtoError};
3333
use arrow::datatypes::{DataType, Schema, SchemaRef};
3434
#[cfg(feature = "parquet")]
3535
use datafusion::datasource::file_format::parquet::ParquetFormat;
@@ -66,11 +66,10 @@ use datafusion_expr::{
6666
};
6767
use datafusion_expr::{AggregateUDF, Unnest};
6868

69+
use self::to_proto::{serialize_expr, serialize_exprs};
6970
use prost::bytes::BufMut;
7071
use prost::Message;
7172

72-
use self::to_proto::serialize_expr;
73-
7473
pub mod file_formats;
7574
pub mod from_proto;
7675
pub mod to_proto;
@@ -273,13 +272,7 @@ impl AsLogicalPlan for LogicalPlanNode {
273272
values
274273
.values_list
275274
.chunks_exact(n_cols)
276-
.map(|r| {
277-
r.iter()
278-
.map(|expr| {
279-
from_proto::parse_expr(expr, ctx, extension_codec)
280-
})
281-
.collect::<Result<Vec<_>, FromProtoError>>()
282-
})
275+
.map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
283276
.collect::<Result<Vec<_>, _>>()
284277
.map_err(|e| e.into())
285278
}?;
@@ -288,11 +281,8 @@ impl AsLogicalPlan for LogicalPlanNode {
288281
LogicalPlanType::Projection(projection) => {
289282
let input: LogicalPlan =
290283
into_logical_plan!(projection.input, ctx, extension_codec)?;
291-
let expr: Vec<Expr> = projection
292-
.expr
293-
.iter()
294-
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
295-
.collect::<Result<Vec<_>, _>>()?;
284+
let expr: Vec<Expr> =
285+
from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
296286

297287
let new_proj = project(input, expr)?;
298288
match projection.optional_alias.as_ref() {
@@ -324,26 +314,17 @@ impl AsLogicalPlan for LogicalPlanNode {
324314
LogicalPlanType::Window(window) => {
325315
let input: LogicalPlan =
326316
into_logical_plan!(window.input, ctx, extension_codec)?;
327-
let window_expr = window
328-
.window_expr
329-
.iter()
330-
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
331-
.collect::<Result<Vec<Expr>, _>>()?;
317+
let window_expr =
318+
from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
332319
LogicalPlanBuilder::from(input).window(window_expr)?.build()
333320
}
334321
LogicalPlanType::Aggregate(aggregate) => {
335322
let input: LogicalPlan =
336323
into_logical_plan!(aggregate.input, ctx, extension_codec)?;
337-
let group_expr = aggregate
338-
.group_expr
339-
.iter()
340-
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
341-
.collect::<Result<Vec<Expr>, _>>()?;
342-
let aggr_expr = aggregate
343-
.aggr_expr
344-
.iter()
345-
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
346-
.collect::<Result<Vec<Expr>, _>>()?;
324+
let group_expr =
325+
from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
326+
let aggr_expr =
327+
from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
347328
LogicalPlanBuilder::from(input)
348329
.aggregate(group_expr, aggr_expr)?
349330
.build()
@@ -361,20 +342,16 @@ impl AsLogicalPlan for LogicalPlanNode {
361342
projection = Some(column_indices);
362343
}
363344

364-
let filters = scan
365-
.filters
366-
.iter()
367-
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
368-
.collect::<Result<Vec<_>, _>>()?;
345+
let filters =
346+
from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
369347

370348
let mut all_sort_orders = vec![];
371349
for order in &scan.file_sort_order {
372-
let file_sort_order = order
373-
.logical_expr_nodes
374-
.iter()
375-
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
376-
.collect::<Result<Vec<_>, _>>()?;
377-
all_sort_orders.push(file_sort_order)
350+
all_sort_orders.push(from_proto::parse_exprs(
351+
&order.logical_expr_nodes,
352+
ctx,
353+
extension_codec,
354+
)?)
378355
}
379356

380357
let file_format: Arc<dyn FileFormat> =
@@ -475,11 +452,8 @@ impl AsLogicalPlan for LogicalPlanNode {
475452
projection = Some(column_indices);
476453
}
477454

478-
let filters = scan
479-
.filters
480-
.iter()
481-
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
482-
.collect::<Result<Vec<_>, _>>()?;
455+
let filters =
456+
from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
483457

484458
let table_name =
485459
from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
@@ -502,11 +476,8 @@ impl AsLogicalPlan for LogicalPlanNode {
502476
LogicalPlanType::Sort(sort) => {
503477
let input: LogicalPlan =
504478
into_logical_plan!(sort.input, ctx, extension_codec)?;
505-
let sort_expr: Vec<Expr> = sort
506-
.expr
507-
.iter()
508-
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
509-
.collect::<Result<Vec<Expr>, _>>()?;
479+
let sort_expr: Vec<Expr> =
480+
from_proto::parse_exprs(&sort.expr, ctx, extension_codec)?;
510481
LogicalPlanBuilder::from(input).sort(sort_expr)?.build()
511482
}
512483
LogicalPlanType::Repartition(repartition) => {
@@ -525,12 +496,7 @@ impl AsLogicalPlan for LogicalPlanNode {
525496
hash_expr: pb_hash_expr,
526497
partition_count,
527498
}) => Partitioning::Hash(
528-
pb_hash_expr
529-
.iter()
530-
.map(|expr| {
531-
from_proto::parse_expr(expr, ctx, extension_codec)
532-
})
533-
.collect::<Result<Vec<_>, _>>()?,
499+
from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
534500
*partition_count as usize,
535501
),
536502
PartitionMethod::RoundRobin(partition_count) => {
@@ -570,12 +536,11 @@ impl AsLogicalPlan for LogicalPlanNode {
570536

571537
let mut order_exprs = vec![];
572538
for expr in &create_extern_table.order_exprs {
573-
let order_expr = expr
574-
.logical_expr_nodes
575-
.iter()
576-
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
577-
.collect::<Result<Vec<Expr>, _>>()?;
578-
order_exprs.push(order_expr)
539+
order_exprs.push(from_proto::parse_exprs(
540+
&expr.logical_expr_nodes,
541+
ctx,
542+
extension_codec,
543+
)?);
579544
}
580545

581546
let mut column_defaults =
@@ -693,16 +658,10 @@ impl AsLogicalPlan for LogicalPlanNode {
693658
LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
694659
}
695660
LogicalPlanType::Join(join) => {
696-
let left_keys: Vec<Expr> = join
697-
.left_join_key
698-
.iter()
699-
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
700-
.collect::<Result<Vec<_>, _>>()?;
701-
let right_keys: Vec<Expr> = join
702-
.right_join_key
703-
.iter()
704-
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
705-
.collect::<Result<Vec<_>, _>>()?;
661+
let left_keys: Vec<Expr> =
662+
from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
663+
let right_keys: Vec<Expr> =
664+
from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
706665
let join_type =
707666
protobuf::JoinType::try_from(join.join_type).map_err(|_| {
708667
proto_error(format!(
@@ -804,27 +763,20 @@ impl AsLogicalPlan for LogicalPlanNode {
804763
LogicalPlanType::DistinctOn(distinct_on) => {
805764
let input: LogicalPlan =
806765
into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
807-
let on_expr = distinct_on
808-
.on_expr
809-
.iter()
810-
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
811-
.collect::<Result<Vec<Expr>, _>>()?;
812-
let select_expr = distinct_on
813-
.select_expr
814-
.iter()
815-
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
816-
.collect::<Result<Vec<Expr>, _>>()?;
766+
let on_expr =
767+
from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
768+
let select_expr = from_proto::parse_exprs(
769+
&distinct_on.select_expr,
770+
ctx,
771+
extension_codec,
772+
)?;
817773
let sort_expr = match distinct_on.sort_expr.len() {
818774
0 => None,
819-
_ => Some(
820-
distinct_on
821-
.sort_expr
822-
.iter()
823-
.map(|expr| {
824-
from_proto::parse_expr(expr, ctx, extension_codec)
825-
})
826-
.collect::<Result<Vec<Expr>, _>>()?,
827-
),
775+
_ => Some(from_proto::parse_exprs(
776+
&distinct_on.sort_expr,
777+
ctx,
778+
extension_codec,
779+
)?),
828780
};
829781
LogicalPlanBuilder::from(input)
830782
.distinct_on(on_expr, select_expr, sort_expr)?
@@ -943,11 +895,8 @@ impl AsLogicalPlan for LogicalPlanNode {
943895
} else {
944896
values[0].len()
945897
} as u64;
946-
let values_list = values
947-
.iter()
948-
.flatten()
949-
.map(|v| serialize_expr(v, extension_codec))
950-
.collect::<Result<Vec<_>, _>>()?;
898+
let values_list =
899+
serialize_exprs(values.iter().flatten(), extension_codec)?;
951900
Ok(protobuf::LogicalPlanNode {
952901
logical_plan_type: Some(LogicalPlanType::Values(
953902
protobuf::ValuesNode {
@@ -982,10 +931,8 @@ impl AsLogicalPlan for LogicalPlanNode {
982931
};
983932
let schema: protobuf::Schema = schema.as_ref().try_into()?;
984933

985-
let filters: Vec<protobuf::LogicalExprNode> = filters
986-
.iter()
987-
.map(|filter| serialize_expr(filter, extension_codec))
988-
.collect::<Result<Vec<_>, _>>()?;
934+
let filters: Vec<protobuf::LogicalExprNode> =
935+
serialize_exprs(filters, extension_codec)?;
989936

990937
if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
991938
let any = listing_table.options().format.as_any();
@@ -1037,10 +984,7 @@ impl AsLogicalPlan for LogicalPlanNode {
1037984
let mut exprs_vec: Vec<LogicalExprNodeCollection> = vec![];
1038985
for order in &options.file_sort_order {
1039986
let expr_vec = LogicalExprNodeCollection {
1040-
logical_expr_nodes: order
1041-
.iter()
1042-
.map(|expr| serialize_expr(expr, extension_codec))
1043-
.collect::<Result<Vec<_>, ToProtoError>>()?,
987+
logical_expr_nodes: serialize_exprs(order, extension_codec)?,
1044988
};
1045989
exprs_vec.push(expr_vec);
1046990
}
@@ -1118,10 +1062,7 @@ impl AsLogicalPlan for LogicalPlanNode {
11181062
extension_codec,
11191063
)?,
11201064
)),
1121-
expr: expr
1122-
.iter()
1123-
.map(|expr| serialize_expr(expr, extension_codec))
1124-
.collect::<Result<Vec<_>, ToProtoError>>()?,
1065+
expr: serialize_exprs(expr, extension_codec)?,
11251066
optional_alias: None,
11261067
},
11271068
))),
@@ -1173,22 +1114,13 @@ impl AsLogicalPlan for LogicalPlanNode {
11731114
)?;
11741115
let sort_expr = match sort_expr {
11751116
None => vec![],
1176-
Some(sort_expr) => sort_expr
1177-
.iter()
1178-
.map(|expr| serialize_expr(expr, extension_codec))
1179-
.collect::<Result<Vec<_>, _>>()?,
1117+
Some(sort_expr) => serialize_exprs(sort_expr, extension_codec)?,
11801118
};
11811119
Ok(protobuf::LogicalPlanNode {
11821120
logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
11831121
protobuf::DistinctOnNode {
1184-
on_expr: on_expr
1185-
.iter()
1186-
.map(|expr| serialize_expr(expr, extension_codec))
1187-
.collect::<Result<Vec<_>, _>>()?,
1188-
select_expr: select_expr
1189-
.iter()
1190-
.map(|expr| serialize_expr(expr, extension_codec))
1191-
.collect::<Result<Vec<_>, _>>()?,
1122+
on_expr: serialize_exprs(on_expr, extension_codec)?,
1123+
select_expr: serialize_exprs(select_expr, extension_codec)?,
11921124
sort_expr,
11931125
input: Some(Box::new(input)),
11941126
},
@@ -1207,10 +1139,7 @@ impl AsLogicalPlan for LogicalPlanNode {
12071139
logical_plan_type: Some(LogicalPlanType::Window(Box::new(
12081140
protobuf::WindowNode {
12091141
input: Some(Box::new(input)),
1210-
window_expr: window_expr
1211-
.iter()
1212-
.map(|expr| serialize_expr(expr, extension_codec))
1213-
.collect::<Result<Vec<_>, _>>()?,
1142+
window_expr: serialize_exprs(window_expr, extension_codec)?,
12141143
},
12151144
))),
12161145
})
@@ -1230,14 +1159,8 @@ impl AsLogicalPlan for LogicalPlanNode {
12301159
logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
12311160
protobuf::AggregateNode {
12321161
input: Some(Box::new(input)),
1233-
group_expr: group_expr
1234-
.iter()
1235-
.map(|expr| serialize_expr(expr, extension_codec))
1236-
.collect::<Result<Vec<_>, _>>()?,
1237-
aggr_expr: aggr_expr
1238-
.iter()
1239-
.map(|expr| serialize_expr(expr, extension_codec))
1240-
.collect::<Result<Vec<_>, _>>()?,
1162+
group_expr: serialize_exprs(group_expr, extension_codec)?,
1163+
aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
12411164
},
12421165
))),
12431166
})
@@ -1335,10 +1258,8 @@ impl AsLogicalPlan for LogicalPlanNode {
13351258
input.as_ref(),
13361259
extension_codec,
13371260
)?;
1338-
let selection_expr: Vec<protobuf::LogicalExprNode> = expr
1339-
.iter()
1340-
.map(|expr| serialize_expr(expr, extension_codec))
1341-
.collect::<Result<Vec<_>, ToProtoError>>()?;
1261+
let selection_expr: Vec<protobuf::LogicalExprNode> =
1262+
serialize_exprs(expr, extension_codec)?;
13421263
Ok(protobuf::LogicalPlanNode {
13431264
logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
13441265
protobuf::SortNode {
@@ -1367,10 +1288,7 @@ impl AsLogicalPlan for LogicalPlanNode {
13671288
let pb_partition_method = match partitioning_scheme {
13681289
Partitioning::Hash(exprs, partition_count) => {
13691290
PartitionMethod::Hash(protobuf::HashRepartition {
1370-
hash_expr: exprs
1371-
.iter()
1372-
.map(|expr| serialize_expr(expr, extension_codec))
1373-
.collect::<Result<Vec<_>, ToProtoError>>()?,
1291+
hash_expr: serialize_exprs(exprs, extension_codec)?,
13741292
partition_count: *partition_count as u64,
13751293
})
13761294
}
@@ -1419,10 +1337,7 @@ impl AsLogicalPlan for LogicalPlanNode {
14191337
let mut converted_order_exprs: Vec<LogicalExprNodeCollection> = vec![];
14201338
for order in order_exprs {
14211339
let temp = LogicalExprNodeCollection {
1422-
logical_expr_nodes: order
1423-
.iter()
1424-
.map(|expr| serialize_expr(expr, extension_codec))
1425-
.collect::<Result<Vec<_>, ToProtoError>>()?,
1340+
logical_expr_nodes: serialize_exprs(order, extension_codec)?,
14261341
};
14271342
converted_order_exprs.push(temp);
14281343
}

0 commit comments

Comments
 (0)