Skip to content

Commit a6993eb

Browse files
committed
Use PhysicalExtensionCodec consistently
1 parent 671cef8 commit a6993eb

File tree

3 files changed

+151
-88
lines changed

3 files changed

+151
-88
lines changed

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 72 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,10 @@ impl From<&protobuf::PhysicalColumn> for Column {
7676
/// # Arguments
7777
///
7878
/// * `proto` - Input proto with physical sort expression node
79-
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
79+
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
8080
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
8181
/// when performing type coercion.
82+
/// * `codec` - An extension codec used to decode custom UDFs.
8283
pub fn parse_physical_sort_expr(
8384
proto: &protobuf::PhysicalSortExprNode,
8485
registry: &dyn FunctionRegistry,
@@ -102,9 +103,10 @@ pub fn parse_physical_sort_expr(
102103
/// # Arguments
103104
///
104105
/// * `proto` - Input proto with vector of physical sort expression node
105-
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
106+
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
106107
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
107108
/// when performing type coercion.
109+
/// * `codec` - An extension codec used to decode custom UDFs.
108110
pub fn parse_physical_sort_exprs(
109111
proto: &[protobuf::PhysicalSortExprNode],
110112
registry: &dyn FunctionRegistry,
@@ -123,25 +125,39 @@ pub fn parse_physical_sort_exprs(
123125
///
124126
/// # Arguments
125127
///
126-
/// * `proto` - Input proto with physical window exprression node.
128+
/// * `proto` - Input proto with physical window expression node.
127129
/// * `name` - Name of the window expression.
128-
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
130+
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
129131
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
130132
/// when performing type coercion.
131133
pub fn parse_physical_window_expr(
132134
proto: &protobuf::PhysicalWindowExprNode,
133135
registry: &dyn FunctionRegistry,
134136
input_schema: &Schema,
135137
) -> Result<Arc<dyn WindowExpr>> {
136-
let codec = DefaultPhysicalExtensionCodec {};
138+
parse_physical_window_expr_ext(
139+
proto,
140+
registry,
141+
input_schema,
142+
&DefaultPhysicalExtensionCodec {},
143+
)
144+
}
145+
146+
// TODO: Make this the public function on next major release.
147+
pub(crate) fn parse_physical_window_expr_ext(
148+
proto: &protobuf::PhysicalWindowExprNode,
149+
registry: &dyn FunctionRegistry,
150+
input_schema: &Schema,
151+
codec: &dyn PhysicalExtensionCodec,
152+
) -> Result<Arc<dyn WindowExpr>> {
137153
let window_node_expr =
138-
parse_physical_exprs(&proto.args, registry, input_schema, &codec)?;
154+
parse_physical_exprs(&proto.args, registry, input_schema, codec)?;
139155

140156
let partition_by =
141-
parse_physical_exprs(&proto.partition_by, registry, input_schema, &codec)?;
157+
parse_physical_exprs(&proto.partition_by, registry, input_schema, codec)?;
142158

143159
let order_by =
144-
parse_physical_sort_exprs(&proto.order_by, registry, input_schema, &codec)?;
160+
parse_physical_sort_exprs(&proto.order_by, registry, input_schema, codec)?;
145161

146162
let window_frame = proto
147163
.window_frame
@@ -187,9 +203,10 @@ where
187203
/// # Arguments
188204
///
189205
/// * `proto` - Input proto with physical expression node
190-
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
206+
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
191207
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
192208
/// when performing type coercion.
209+
/// * `codec` - An extension codec used to decode custom UDFs.
193210
pub fn parse_physical_expr(
194211
proto: &protobuf::PhysicalExprNode,
195212
registry: &dyn FunctionRegistry,
@@ -213,13 +230,15 @@ pub fn parse_physical_expr(
213230
registry,
214231
"left",
215232
input_schema,
233+
codec,
216234
)?,
217235
logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
218236
parse_required_physical_expr(
219237
binary_expr.r.as_deref(),
220238
registry,
221239
"right",
222240
input_schema,
241+
codec,
223242
)?,
224243
)),
225244
ExprType::AggregateExpr(_) => {
@@ -241,6 +260,7 @@ pub fn parse_physical_expr(
241260
registry,
242261
"expr",
243262
input_schema,
263+
codec,
244264
)?))
245265
}
246266
ExprType::IsNotNullExpr(e) => {
@@ -249,20 +269,23 @@ pub fn parse_physical_expr(
249269
registry,
250270
"expr",
251271
input_schema,
272+
codec,
252273
)?))
253274
}
254275
ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
255276
e.expr.as_deref(),
256277
registry,
257278
"expr",
258279
input_schema,
280+
codec,
259281
)?)),
260282
ExprType::Negative(e) => {
261283
Arc::new(NegativeExpr::new(parse_required_physical_expr(
262284
e.expr.as_deref(),
263285
registry,
264286
"expr",
265287
input_schema,
288+
codec,
266289
)?))
267290
}
268291
ExprType::InList(e) => in_list(
@@ -271,6 +294,7 @@ pub fn parse_physical_expr(
271294
registry,
272295
"expr",
273296
input_schema,
297+
codec,
274298
)?,
275299
parse_physical_exprs(&e.list, registry, input_schema, codec)?,
276300
&e.negated,
@@ -290,12 +314,14 @@ pub fn parse_physical_expr(
290314
registry,
291315
"when_expr",
292316
input_schema,
317+
codec,
293318
)?,
294319
parse_required_physical_expr(
295320
e.then_expr.as_ref(),
296321
registry,
297322
"then_expr",
298323
input_schema,
324+
codec,
299325
)?,
300326
))
301327
})
@@ -311,6 +337,7 @@ pub fn parse_physical_expr(
311337
registry,
312338
"expr",
313339
input_schema,
340+
codec,
314341
)?,
315342
convert_required!(e.arrow_type)?,
316343
None,
@@ -321,6 +348,7 @@ pub fn parse_physical_expr(
321348
registry,
322349
"expr",
323350
input_schema,
351+
codec,
324352
)?,
325353
convert_required!(e.arrow_type)?,
326354
)),
@@ -371,12 +399,14 @@ pub fn parse_physical_expr(
371399
registry,
372400
"expr",
373401
input_schema,
402+
codec,
374403
)?,
375404
parse_required_physical_expr(
376405
like_expr.pattern.as_deref(),
377406
registry,
378407
"pattern",
379408
input_schema,
409+
codec,
380410
)?,
381411
)),
382412
};
@@ -389,9 +419,9 @@ fn parse_required_physical_expr(
389419
registry: &dyn FunctionRegistry,
390420
field: &str,
391421
input_schema: &Schema,
422+
codec: &dyn PhysicalExtensionCodec,
392423
) -> Result<Arc<dyn PhysicalExpr>> {
393-
let codec = DefaultPhysicalExtensionCodec {};
394-
expr.map(|e| parse_physical_expr(e, registry, input_schema, &codec))
424+
expr.map(|e| parse_physical_expr(e, registry, input_schema, codec))
395425
.transpose()?
396426
.ok_or_else(|| {
397427
DataFusionError::Internal(format!("Missing required field {field:?}"))
@@ -433,15 +463,29 @@ pub fn parse_protobuf_hash_partitioning(
433463
partitioning: Option<&protobuf::PhysicalHashRepartition>,
434464
registry: &dyn FunctionRegistry,
435465
input_schema: &Schema,
466+
) -> Result<Option<Partitioning>> {
467+
parse_protobuf_hash_partitioning_ext(
468+
partitioning,
469+
registry,
470+
input_schema,
471+
&DefaultPhysicalExtensionCodec {},
472+
)
473+
}
474+
475+
// TODO: Make this the public function on next major release.
476+
fn parse_protobuf_hash_partitioning_ext(
477+
partitioning: Option<&protobuf::PhysicalHashRepartition>,
478+
registry: &dyn FunctionRegistry,
479+
input_schema: &Schema,
480+
codec: &dyn PhysicalExtensionCodec,
436481
) -> Result<Option<Partitioning>> {
437482
match partitioning {
438483
Some(hash_part) => {
439-
let codec = DefaultPhysicalExtensionCodec {};
440484
let expr = parse_physical_exprs(
441485
&hash_part.hash_expr,
442486
registry,
443487
input_schema,
444-
&codec,
488+
codec,
445489
)?;
446490

447491
Ok(Some(Partitioning::Hash(
@@ -456,6 +500,19 @@ pub fn parse_protobuf_hash_partitioning(
456500
pub fn parse_protobuf_file_scan_config(
457501
proto: &protobuf::FileScanExecConf,
458502
registry: &dyn FunctionRegistry,
503+
) -> Result<FileScanConfig> {
504+
parse_protobuf_file_scan_config_ext(
505+
proto,
506+
registry,
507+
&DefaultPhysicalExtensionCodec {},
508+
)
509+
}
510+
511+
// TODO: Make this the public function on next major release.
512+
pub(crate) fn parse_protobuf_file_scan_config_ext(
513+
proto: &protobuf::FileScanExecConf,
514+
registry: &dyn FunctionRegistry,
515+
codec: &dyn PhysicalExtensionCodec,
459516
) -> Result<FileScanConfig> {
460517
let schema: Arc<Schema> = Arc::new(convert_required!(proto.schema)?);
461518
let projection = proto
@@ -489,7 +546,7 @@ pub fn parse_protobuf_file_scan_config(
489546
.collect::<Result<Vec<_>>>()?;
490547

491548
// Remove partition columns from the schema after recreating table_partition_cols
492-
// because the partition columns are not in the file. They are present to allow the
549+
// because the partition columns are not in the file. They are present to allow
493550
// the partition column types to be reconstructed after serde.
494551
let file_schema = Arc::new(Schema::new(
495552
schema
@@ -502,12 +559,11 @@ pub fn parse_protobuf_file_scan_config(
502559

503560
let mut output_ordering = vec![];
504561
for node_collection in &proto.output_ordering {
505-
let codec = DefaultPhysicalExtensionCodec {};
506562
let sort_expr = parse_physical_sort_exprs(
507563
&node_collection.physical_sort_expr_nodes,
508564
registry,
509565
&schema,
510-
&codec,
566+
codec,
511567
)?;
512568
output_ordering.push(sort_expr);
513569
}

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,8 @@ use std::convert::TryInto;
1919
use std::fmt::Debug;
2020
use std::sync::Arc;
2121

22-
use self::from_proto::parse_physical_window_expr;
23-
use self::to_proto::serialize_physical_expr;
24-
25-
use crate::common::{byte_to_string, proto_error, str_to_byte};
26-
use crate::convert_required;
27-
use crate::physical_plan::from_proto::{
28-
parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
29-
parse_protobuf_file_scan_config,
30-
};
31-
use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
32-
use crate::protobuf::physical_expr_node::ExprType;
33-
use crate::protobuf::physical_plan_node::PhysicalPlanType;
34-
use crate::protobuf::repartition_exec_node::PartitionMethod;
35-
use crate::protobuf::{
36-
self, window_agg_exec_node, PhysicalPlanNode, PhysicalSortExprNodeCollection,
37-
};
22+
use prost::bytes::BufMut;
23+
use prost::Message;
3824

3925
use datafusion::arrow::compute::SortOptions;
4026
use datafusion::arrow::datatypes::SchemaRef;
@@ -79,8 +65,22 @@ use datafusion::physical_plan::{
7965
use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
8066
use datafusion_expr::ScalarUDF;
8167

82-
use prost::bytes::BufMut;
83-
use prost::Message;
68+
use crate::common::{byte_to_string, proto_error, str_to_byte};
69+
use crate::convert_required;
70+
use crate::physical_plan::from_proto::{
71+
parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
72+
parse_physical_window_expr_ext, parse_protobuf_file_scan_config,
73+
parse_protobuf_file_scan_config_ext,
74+
};
75+
use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
76+
use crate::protobuf::physical_expr_node::ExprType;
77+
use crate::protobuf::physical_plan_node::PhysicalPlanType;
78+
use crate::protobuf::repartition_exec_node::PartitionMethod;
79+
use crate::protobuf::{
80+
self, window_agg_exec_node, PhysicalPlanNode, PhysicalSortExprNodeCollection,
81+
};
82+
83+
use self::to_proto::serialize_physical_expr;
8484

8585
pub mod from_proto;
8686
pub mod to_proto;
@@ -188,9 +188,10 @@ impl AsExecutionPlan for PhysicalPlanNode {
188188
}
189189
}
190190
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
191-
parse_protobuf_file_scan_config(
191+
parse_protobuf_file_scan_config_ext(
192192
scan.base_conf.as_ref().unwrap(),
193193
registry,
194+
extension_codec,
194195
)?,
195196
scan.has_header,
196197
str_to_byte(&scan.delimiter, "delimiter")?,
@@ -230,12 +231,13 @@ impl AsExecutionPlan for PhysicalPlanNode {
230231
Default::default(),
231232
)))
232233
}
233-
PhysicalPlanType::AvroScan(scan) => {
234-
Ok(Arc::new(AvroExec::new(parse_protobuf_file_scan_config(
234+
PhysicalPlanType::AvroScan(scan) => Ok(Arc::new(AvroExec::new(
235+
parse_protobuf_file_scan_config_ext(
235236
scan.base_conf.as_ref().unwrap(),
236237
registry,
237-
)?)))
238-
}
238+
extension_codec,
239+
)?,
240+
))),
239241
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
240242
let input: Arc<dyn ExecutionPlan> = into_physical_plan(
241243
&coalesce_batches.input,
@@ -334,10 +336,11 @@ impl AsExecutionPlan for PhysicalPlanNode {
334336
.window_expr
335337
.iter()
336338
.map(|window_expr| {
337-
parse_physical_window_expr(
339+
parse_physical_window_expr_ext(
338340
window_expr,
339341
registry,
340342
input_schema.as_ref(),
343+
extension_codec,
341344
)
342345
})
343346
.collect::<Result<Vec<_>, _>>()?;

0 commit comments

Comments
 (0)