Skip to content

Commit 36f6e0f

Browse files
authored
Use PhysicalExtensionCodec consistently (#10075)
* Use PhysicalExtensionCodec consistently * Use PhysicalExtensionCodec consisdently also when serializing * Add a test for window aggregation with UDF codec * Commit binary incompatible changes
1 parent 03b314c commit 36f6e0f

File tree

4 files changed

+402
-394
lines changed

4 files changed

+402
-394
lines changed

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@ use std::collections::HashMap;
2121
use std::convert::{TryFrom, TryInto};
2222
use std::sync::Arc;
2323

24-
use crate::common::proto_error;
25-
use crate::convert_required;
26-
use crate::logical_plan::{self, csv_writer_options_from_proto};
27-
use crate::protobuf::physical_expr_node::ExprType;
28-
use crate::protobuf::{self, copy_to_node};
29-
3024
use arrow::compute::SortOptions;
25+
use chrono::{TimeZone, Utc};
26+
use object_store::path::Path;
27+
use object_store::ObjectMeta;
28+
3129
use datafusion::arrow::datatypes::Schema;
3230
use datafusion::datasource::file_format::csv::CsvSink;
3331
use datafusion::datasource::file_format::json::JsonSink;
@@ -57,13 +55,15 @@ use datafusion_common::file_options::json_writer::JsonWriterOptions;
5755
use datafusion_common::parsers::CompressionTypeVariant;
5856
use datafusion_common::stats::Precision;
5957
use datafusion_common::{not_impl_err, DataFusionError, JoinSide, Result, ScalarValue};
60-
61-
use chrono::{TimeZone, Utc};
6258
use datafusion_expr::ScalarFunctionDefinition;
63-
use object_store::path::Path;
64-
use object_store::ObjectMeta;
6559

66-
use super::{DefaultPhysicalExtensionCodec, PhysicalExtensionCodec};
60+
use crate::common::proto_error;
61+
use crate::convert_required;
62+
use crate::logical_plan::{self, csv_writer_options_from_proto};
63+
use crate::protobuf::physical_expr_node::ExprType;
64+
use crate::protobuf::{self, copy_to_node};
65+
66+
use super::PhysicalExtensionCodec;
6767

6868
impl From<&protobuf::PhysicalColumn> for Column {
6969
fn from(c: &protobuf::PhysicalColumn) -> Column {
@@ -76,9 +76,10 @@ impl From<&protobuf::PhysicalColumn> for Column {
7676
/// # Arguments
7777
///
7878
/// * `proto` - Input proto with physical sort expression node
79-
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
79+
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
8080
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
8181
/// when performing type coercion.
82+
/// * `codec` - An extension codec used to decode custom UDFs.
8283
pub fn parse_physical_sort_expr(
8384
proto: &protobuf::PhysicalSortExprNode,
8485
registry: &dyn FunctionRegistry,
@@ -102,9 +103,10 @@ pub fn parse_physical_sort_expr(
102103
/// # Arguments
103104
///
104105
/// * `proto` - Input proto with vector of physical sort expression node
105-
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
106+
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
106107
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
107108
/// when performing type coercion.
109+
/// * `codec` - An extension codec used to decode custom UDFs.
108110
pub fn parse_physical_sort_exprs(
109111
proto: &[protobuf::PhysicalSortExprNode],
110112
registry: &dyn FunctionRegistry,
@@ -123,25 +125,26 @@ pub fn parse_physical_sort_exprs(
123125
///
124126
/// # Arguments
125127
///
126-
/// * `proto` - Input proto with physical window exprression node.
128+
/// * `proto` - Input proto with physical window expression node.
127129
/// * `name` - Name of the window expression.
128-
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
130+
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
129131
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
130132
/// when performing type coercion.
133+
/// * `codec` - An extension codec used to decode custom UDFs.
131134
pub fn parse_physical_window_expr(
132135
proto: &protobuf::PhysicalWindowExprNode,
133136
registry: &dyn FunctionRegistry,
134137
input_schema: &Schema,
138+
codec: &dyn PhysicalExtensionCodec,
135139
) -> Result<Arc<dyn WindowExpr>> {
136-
let codec = DefaultPhysicalExtensionCodec {};
137140
let window_node_expr =
138-
parse_physical_exprs(&proto.args, registry, input_schema, &codec)?;
141+
parse_physical_exprs(&proto.args, registry, input_schema, codec)?;
139142

140143
let partition_by =
141-
parse_physical_exprs(&proto.partition_by, registry, input_schema, &codec)?;
144+
parse_physical_exprs(&proto.partition_by, registry, input_schema, codec)?;
142145

143146
let order_by =
144-
parse_physical_sort_exprs(&proto.order_by, registry, input_schema, &codec)?;
147+
parse_physical_sort_exprs(&proto.order_by, registry, input_schema, codec)?;
145148

146149
let window_frame = proto
147150
.window_frame
@@ -187,9 +190,10 @@ where
187190
/// # Arguments
188191
///
189192
/// * `proto` - Input proto with physical expression node
190-
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
193+
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
191194
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
192195
/// when performing type coercion.
196+
/// * `codec` - An extension codec used to decode custom UDFs.
193197
pub fn parse_physical_expr(
194198
proto: &protobuf::PhysicalExprNode,
195199
registry: &dyn FunctionRegistry,
@@ -213,13 +217,15 @@ pub fn parse_physical_expr(
213217
registry,
214218
"left",
215219
input_schema,
220+
codec,
216221
)?,
217222
logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
218223
parse_required_physical_expr(
219224
binary_expr.r.as_deref(),
220225
registry,
221226
"right",
222227
input_schema,
228+
codec,
223229
)?,
224230
)),
225231
ExprType::AggregateExpr(_) => {
@@ -241,6 +247,7 @@ pub fn parse_physical_expr(
241247
registry,
242248
"expr",
243249
input_schema,
250+
codec,
244251
)?))
245252
}
246253
ExprType::IsNotNullExpr(e) => {
@@ -249,20 +256,23 @@ pub fn parse_physical_expr(
249256
registry,
250257
"expr",
251258
input_schema,
259+
codec,
252260
)?))
253261
}
254262
ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
255263
e.expr.as_deref(),
256264
registry,
257265
"expr",
258266
input_schema,
267+
codec,
259268
)?)),
260269
ExprType::Negative(e) => {
261270
Arc::new(NegativeExpr::new(parse_required_physical_expr(
262271
e.expr.as_deref(),
263272
registry,
264273
"expr",
265274
input_schema,
275+
codec,
266276
)?))
267277
}
268278
ExprType::InList(e) => in_list(
@@ -271,6 +281,7 @@ pub fn parse_physical_expr(
271281
registry,
272282
"expr",
273283
input_schema,
284+
codec,
274285
)?,
275286
parse_physical_exprs(&e.list, registry, input_schema, codec)?,
276287
&e.negated,
@@ -290,12 +301,14 @@ pub fn parse_physical_expr(
290301
registry,
291302
"when_expr",
292303
input_schema,
304+
codec,
293305
)?,
294306
parse_required_physical_expr(
295307
e.then_expr.as_ref(),
296308
registry,
297309
"then_expr",
298310
input_schema,
311+
codec,
299312
)?,
300313
))
301314
})
@@ -311,6 +324,7 @@ pub fn parse_physical_expr(
311324
registry,
312325
"expr",
313326
input_schema,
327+
codec,
314328
)?,
315329
convert_required!(e.arrow_type)?,
316330
None,
@@ -321,6 +335,7 @@ pub fn parse_physical_expr(
321335
registry,
322336
"expr",
323337
input_schema,
338+
codec,
324339
)?,
325340
convert_required!(e.arrow_type)?,
326341
)),
@@ -371,12 +386,14 @@ pub fn parse_physical_expr(
371386
registry,
372387
"expr",
373388
input_schema,
389+
codec,
374390
)?,
375391
parse_required_physical_expr(
376392
like_expr.pattern.as_deref(),
377393
registry,
378394
"pattern",
379395
input_schema,
396+
codec,
380397
)?,
381398
)),
382399
};
@@ -389,9 +406,9 @@ fn parse_required_physical_expr(
389406
registry: &dyn FunctionRegistry,
390407
field: &str,
391408
input_schema: &Schema,
409+
codec: &dyn PhysicalExtensionCodec,
392410
) -> Result<Arc<dyn PhysicalExpr>> {
393-
let codec = DefaultPhysicalExtensionCodec {};
394-
expr.map(|e| parse_physical_expr(e, registry, input_schema, &codec))
411+
expr.map(|e| parse_physical_expr(e, registry, input_schema, codec))
395412
.transpose()?
396413
.ok_or_else(|| {
397414
DataFusionError::Internal(format!("Missing required field {field:?}"))
@@ -433,15 +450,15 @@ pub fn parse_protobuf_hash_partitioning(
433450
partitioning: Option<&protobuf::PhysicalHashRepartition>,
434451
registry: &dyn FunctionRegistry,
435452
input_schema: &Schema,
453+
codec: &dyn PhysicalExtensionCodec,
436454
) -> Result<Option<Partitioning>> {
437455
match partitioning {
438456
Some(hash_part) => {
439-
let codec = DefaultPhysicalExtensionCodec {};
440457
let expr = parse_physical_exprs(
441458
&hash_part.hash_expr,
442459
registry,
443460
input_schema,
444-
&codec,
461+
codec,
445462
)?;
446463

447464
Ok(Some(Partitioning::Hash(
@@ -456,6 +473,7 @@ pub fn parse_protobuf_hash_partitioning(
456473
pub fn parse_protobuf_file_scan_config(
457474
proto: &protobuf::FileScanExecConf,
458475
registry: &dyn FunctionRegistry,
476+
codec: &dyn PhysicalExtensionCodec,
459477
) -> Result<FileScanConfig> {
460478
let schema: Arc<Schema> = Arc::new(convert_required!(proto.schema)?);
461479
let projection = proto
@@ -489,7 +507,7 @@ pub fn parse_protobuf_file_scan_config(
489507
.collect::<Result<Vec<_>>>()?;
490508

491509
// Remove partition columns from the schema after recreating table_partition_cols
492-
// because the partition columns are not in the file. They are present to allow the
510+
// because the partition columns are not in the file. They are present to allow
493511
// the partition column types to be reconstructed after serde.
494512
let file_schema = Arc::new(Schema::new(
495513
schema
@@ -502,12 +520,11 @@ pub fn parse_protobuf_file_scan_config(
502520

503521
let mut output_ordering = vec![];
504522
for node_collection in &proto.output_ordering {
505-
let codec = DefaultPhysicalExtensionCodec {};
506523
let sort_expr = parse_physical_sort_exprs(
507524
&node_collection.physical_sort_expr_nodes,
508525
registry,
509526
&schema,
510-
&codec,
527+
codec,
511528
)?;
512529
output_ordering.push(sort_expr);
513530
}

0 commit comments

Comments
 (0)