Skip to content

Commit 23dccce

Browse files
authored
feat: Upgrade to DataFusion 46.0.0-rc2 (#1423)
1 parent 569ca71 commit 23dccce

File tree

14 files changed

+346
-283
lines changed

14 files changed

+346
-283
lines changed

native/Cargo.lock

Lines changed: 234 additions & 201 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,26 +31,26 @@ license = "Apache-2.0"
3131
edition = "2021"
3232

3333
# Comet uses the same minimum Rust version as DataFusion
34-
rust-version = "1.81"
34+
rust-version = "1.85"
3535

3636
[workspace.dependencies]
37-
arrow = { version = "54.1.0", features = ["prettyprint", "ffi", "chrono-tz"] }
38-
arrow-array = { version = "54.1.0" }
39-
arrow-buffer = { version = "54.1.0" }
40-
arrow-data = { version = "54.1.0" }
41-
arrow-schema = { version = "54.1.0" }
37+
arrow = { version = "54.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
38+
arrow-array = { version = "54.2.0" }
39+
arrow-buffer = { version = "54.2.0" }
40+
arrow-data = { version = "54.2.0" }
41+
arrow-schema = { version = "54.2.0" }
4242
async-trait = { version = "0.1" }
4343
bytes = { version = "1.10.0" }
44-
parquet = { version = "54.1.0", default-features = false, features = ["experimental"] }
45-
datafusion = { version = "45.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions"] }
46-
datafusion-common = { version = "45.0.0", default-features = false }
47-
datafusion-functions = { version = "45.0.0", default-features = false, features = ["crypto_expressions"] }
48-
datafusion-functions-nested = { version = "45.0.0", default-features = false }
49-
datafusion-expr = { version = "45.0.0", default-features = false }
50-
datafusion-expr-common = { version = "45.0.0", default-features = false }
51-
datafusion-execution = { version = "45.0.0", default-features = false }
52-
datafusion-physical-plan = { version = "45.0.0", default-features = false }
53-
datafusion-physical-expr = { version = "45.0.0", default-features = false }
44+
parquet = { version = "54.2.0", default-features = false, features = ["experimental"] }
45+
datafusion = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false, features = ["unicode_expressions", "crypto_expressions"] }
46+
datafusion-common = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false }
47+
datafusion-datasource = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false }
48+
datafusion-functions = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false, features = ["crypto_expressions"] }
49+
datafusion-functions-nested = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false }
50+
datafusion-expr = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false }
51+
datafusion-expr-common = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false }
52+
datafusion-execution = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false }
53+
datafusion-physical-expr = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false }
5454
datafusion-comet-spark-expr = { path = "spark-expr", version = "0.7.0" }
5555
datafusion-comet-proto = { path = "proto", version = "0.7.0" }
5656
chrono = { version = "0.4", default-features = false, features = ["clock"] }

native/core/benches/bloom_filter_agg.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ use arrow_array::{ArrayRef, RecordBatch};
2121
use arrow_schema::SchemaRef;
2222
use comet::execution::expressions::bloom_filter_agg::BloomFilterAgg;
2323
use criterion::{black_box, criterion_group, criterion_main, Criterion};
24+
use datafusion::datasource::memory::MemorySourceConfig;
25+
use datafusion::datasource::source::DataSourceExec;
2426
use datafusion::physical_expr::PhysicalExpr;
2527
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
26-
use datafusion::physical_plan::memory::MemoryExec;
2728
use datafusion::physical_plan::ExecutionPlan;
2829
use datafusion_common::ScalarValue;
2930
use datafusion_execution::TaskContext;
@@ -88,8 +89,9 @@ async fn agg_test(
8889
mode: AggregateMode,
8990
) {
9091
let schema = &partitions[0][0].schema();
91-
let scan: Arc<dyn ExecutionPlan> =
92-
Arc::new(MemoryExec::try_new(partitions, Arc::clone(schema), None).unwrap());
92+
let scan: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
93+
MemorySourceConfig::try_new(partitions, Arc::clone(schema), None).unwrap(),
94+
)));
9395
let aggregate = create_aggregate(scan, c0.clone(), schema, aggregate_udf, alias, mode);
9496
let mut stream = aggregate
9597
.execute(0, Arc::new(TaskContext::default()))

native/core/benches/shuffle_writer.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ use arrow_array::{builder::StringBuilder, RecordBatch};
2020
use arrow_schema::{DataType, Field, Schema};
2121
use comet::execution::shuffle::{CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec};
2222
use criterion::{criterion_group, criterion_main, Criterion};
23+
use datafusion::datasource::memory::MemorySourceConfig;
24+
use datafusion::datasource::source::DataSourceExec;
2325
use datafusion::physical_plan::metrics::Time;
2426
use datafusion::{
25-
physical_plan::{common::collect, memory::MemoryExec, ExecutionPlan},
27+
physical_plan::{common::collect, ExecutionPlan},
2628
prelude::SessionContext,
2729
};
2830
use datafusion_physical_expr::{expressions::Column, Partitioning};
@@ -88,7 +90,9 @@ fn create_shuffle_writer_exec(compression_codec: CompressionCodec) -> ShuffleWri
8890
let schema = batches[0].schema();
8991
let partitions = &[batches];
9092
ShuffleWriterExec::try_new(
91-
Arc::new(MemoryExec::try_new(partitions, schema, None).unwrap()),
93+
Arc::new(DataSourceExec::new(Arc::new(
94+
MemorySourceConfig::try_new(partitions, Arc::clone(&schema), None).unwrap(),
95+
))),
9296
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
9397
compression_codec,
9498
"/tmp/data.out".to_string(),

native/core/src/execution/expressions/bloom_filter_agg.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow::array::ArrayRef;
2525
use arrow_array::BinaryArray;
2626
use datafusion::error::Result;
2727
use datafusion::physical_expr::PhysicalExpr;
28-
use datafusion_common::{downcast_value, DataFusionError, ScalarValue};
28+
use datafusion_common::{downcast_value, ScalarValue};
2929
use datafusion_expr::{
3030
function::{AccumulatorArgs, StateFieldsArgs},
3131
Accumulator, AggregateUDFImpl, Signature,

native/core/src/execution/planner.rs

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ use crate::execution::spark_plan::SparkPlan;
7777
use crate::parquet::parquet_support::{register_object_store, SparkParquetOptions};
7878
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
7979
use datafusion::datasource::listing::PartitionedFile;
80-
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
81-
use datafusion::datasource::physical_plan::FileScanConfig;
80+
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
81+
use datafusion::datasource::source::DataSourceExec;
8282
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
8383
use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec;
8484
use datafusion_comet_proto::{
@@ -1274,16 +1274,6 @@ impl PhysicalPlanner {
12741274
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
12751275
})
12761276
.collect_vec();
1277-
let mut file_scan_config =
1278-
FileScanConfig::new(object_store_url, Arc::clone(&data_schema))
1279-
.with_file_groups(file_groups)
1280-
.with_table_partition_cols(partition_fields);
1281-
1282-
assert_eq!(
1283-
projection_vector.len(),
1284-
required_schema.fields.len() + partition_schema.fields.len()
1285-
);
1286-
file_scan_config = file_scan_config.with_projection(Some(projection_vector));
12871277

12881278
let mut table_parquet_options = TableParquetOptions::new();
12891279
// TODO: Maybe these are configs?
@@ -1297,17 +1287,31 @@ impl PhysicalPlanner {
12971287
);
12981288
spark_parquet_options.allow_cast_unsigned_ints = true;
12991289

1300-
let mut builder = ParquetExecBuilder::new(file_scan_config)
1301-
.with_table_parquet_options(table_parquet_options)
1290+
let mut parquet_source = ParquetSource::new(table_parquet_options)
13021291
.with_schema_adapter_factory(Arc::new(SparkSchemaAdapterFactory::new(
13031292
spark_parquet_options,
13041293
)));
13051294

13061295
if let Some(filter) = cnf_data_filters {
1307-
builder = builder.with_predicate(filter);
1296+
parquet_source =
1297+
parquet_source.with_predicate(Arc::clone(&data_schema), filter);
13081298
}
13091299

1310-
let scan = builder.build();
1300+
let mut file_scan_config = FileScanConfig::new(
1301+
object_store_url,
1302+
Arc::clone(&data_schema),
1303+
Arc::new(parquet_source),
1304+
)
1305+
.with_file_groups(file_groups)
1306+
.with_table_partition_cols(partition_fields);
1307+
1308+
assert_eq!(
1309+
projection_vector.len(),
1310+
required_schema.fields.len() + partition_schema.fields.len()
1311+
);
1312+
file_scan_config = file_scan_config.with_projection(Some(projection_vector));
1313+
1314+
let scan = DataSourceExec::new(Arc::new(file_scan_config));
13111315
Ok((
13121316
vec![],
13131317
Arc::new(SparkPlan::new(spark_plan.plan_id, Arc::new(scan), vec![])),
@@ -1604,8 +1608,8 @@ impl PhysicalPlanner {
16041608
let window_agg = Arc::new(BoundedWindowAggExec::try_new(
16051609
window_expr?,
16061610
Arc::clone(&child.native_plan),
1607-
partition_exprs.to_vec(),
16081611
InputOrderMode::Sorted,
1612+
!partition_exprs.is_empty(),
16091613
)?);
16101614
Ok((
16111615
scans,

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -909,8 +909,9 @@ mod test {
909909
use super::*;
910910
use crate::execution::shuffle::read_ipc_compressed;
911911
use arrow_schema::{DataType, Field, Schema};
912+
use datafusion::datasource::memory::MemorySourceConfig;
913+
use datafusion::datasource::source::DataSourceExec;
912914
use datafusion::physical_plan::common::collect;
913-
use datafusion::physical_plan::memory::MemoryExec;
914915
use datafusion::prelude::SessionContext;
915916
use datafusion_execution::config::SessionConfig;
916917
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
@@ -1152,7 +1153,9 @@ mod test {
11521153

11531154
let partitions = &[batches];
11541155
let exec = ShuffleWriterExec::try_new(
1155-
Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()),
1156+
Arc::new(DataSourceExec::new(Arc::new(
1157+
MemorySourceConfig::try_new(partitions, batch.schema(), None).unwrap(),
1158+
))),
11561159
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions),
11571160
CompressionCodec::Zstd(1),
11581161
"/tmp/data.out".to_string(),

native/core/src/parquet/mod.rs

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
5151
use arrow::buffer::{Buffer, MutableBuffer};
5252
use arrow_array::{Array, RecordBatch};
5353
use datafusion::datasource::listing::PartitionedFile;
54-
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
55-
use datafusion::datasource::physical_plan::FileScanConfig;
54+
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
55+
use datafusion::datasource::source::DataSourceExec;
5656
use datafusion::physical_plan::ExecutionPlan;
5757
use datafusion_comet_spark_expr::EvalMode;
5858
use datafusion_common::config::TableParquetOptions;
@@ -666,18 +666,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
666666
start + length,
667667
);
668668
partitioned_file.object_meta.location = object_store_path;
669-
// We build the file scan config with the *required* schema so that the reader knows
670-
// the output schema we want
671-
let file_scan_config = FileScanConfig::new(object_store_url, Arc::new(required_schema_arrow))
672-
.with_file(partitioned_file)
673-
// TODO: (ARROW NATIVE) - do partition columns in native
674-
// - will need partition schema and partition values to do so
675-
// .with_table_partition_cols(partition_fields)
676-
;
677-
let mut table_parquet_options = TableParquetOptions::new();
678-
// TODO: Maybe these are configs?
679-
table_parquet_options.global.pushdown_filters = true;
680-
table_parquet_options.global.reorder_filters = true;
681669
let session_timezone: String = env
682670
.get_string(&JString::from_raw(session_timezone))
683671
.unwrap()
@@ -687,16 +675,29 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
687675
SparkParquetOptions::new(EvalMode::Legacy, session_timezone.as_str(), false);
688676
spark_parquet_options.allow_cast_unsigned_ints = true;
689677

690-
let builder2 = ParquetExecBuilder::new(file_scan_config)
691-
.with_table_parquet_options(table_parquet_options)
692-
.with_schema_adapter_factory(Arc::new(SparkSchemaAdapterFactory::new(
693-
spark_parquet_options,
694-
)));
678+
let mut table_parquet_options = TableParquetOptions::new();
679+
// TODO: Maybe these are configs?
680+
table_parquet_options.global.pushdown_filters = true;
681+
table_parquet_options.global.reorder_filters = true;
682+
683+
let parquet_source = ParquetSource::new(table_parquet_options).with_schema_adapter_factory(
684+
Arc::new(SparkSchemaAdapterFactory::new(spark_parquet_options)),
685+
);
686+
687+
// We build the file scan config with the *required* schema so that the reader knows
688+
// the output schema we want
689+
let file_scan_config = FileScanConfig::new(object_store_url, Arc::new(required_schema_arrow), Arc::new(parquet_source))
690+
.with_file(partitioned_file)
691+
// TODO: (ARROW NATIVE) - do partition columns in native
692+
// - will need partition schema and partition values to do so
693+
// .with_table_partition_cols(partition_fields)
694+
;
695695

696696
//TODO: (ARROW NATIVE) - predicate pushdown??
697697
// builder = builder.with_predicate(filter);
698698

699-
let scan = builder2.build();
699+
let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config)));
700+
700701
let ctx = TaskContext::default();
701702
let partition_index: usize = 0;
702703
batch_stream = Some(scan.execute(partition_index, Arc::new(ctx))?);

native/core/src/parquet/schema_adapter.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -279,12 +279,14 @@ mod test {
279279
use arrow_array::UInt32Array;
280280
use arrow_schema::SchemaRef;
281281
use datafusion::datasource::listing::PartitionedFile;
282-
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
282+
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
283+
use datafusion::datasource::source::DataSourceExec;
283284
use datafusion::execution::object_store::ObjectStoreUrl;
284285
use datafusion::execution::TaskContext;
285286
use datafusion::physical_plan::ExecutionPlan;
286287
use datafusion_comet_spark_expr::test_common::file_util::get_temp_filename;
287288
use datafusion_comet_spark_expr::EvalMode;
289+
use datafusion_common::config::TableParquetOptions;
288290
use datafusion_common::DataFusionError;
289291
use futures::StreamExt;
290292
use parquet::arrow::ArrowWriter;
@@ -341,19 +343,23 @@ mod test {
341343
writer.close()?;
342344

343345
let object_store_url = ObjectStoreUrl::local_filesystem();
344-
let file_scan_config = FileScanConfig::new(object_store_url, required_schema)
345-
.with_file_groups(vec![vec![PartitionedFile::from_path(
346-
filename.to_string(),
347-
)?]]);
348346

349347
let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
350348
spark_parquet_options.allow_cast_unsigned_ints = true;
351349

352-
let parquet_exec = ParquetExec::builder(file_scan_config)
353-
.with_schema_adapter_factory(Arc::new(SparkSchemaAdapterFactory::new(
354-
spark_parquet_options,
355-
)))
356-
.build();
350+
let parquet_source = Arc::new(
351+
ParquetSource::new(TableParquetOptions::new()).with_schema_adapter_factory(Arc::new(
352+
SparkSchemaAdapterFactory::new(spark_parquet_options),
353+
)),
354+
);
355+
356+
let file_scan_config =
357+
FileScanConfig::new(object_store_url, required_schema, parquet_source)
358+
.with_file_groups(vec![vec![PartitionedFile::from_path(
359+
filename.to_string(),
360+
)?]]);
361+
362+
let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config));
357363

358364
let mut stream = parquet_exec
359365
.execute(0, Arc::new(TaskContext::default()))

native/spark-expr/benches/aggregate.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ use arrow_array::builder::{Decimal128Builder, StringBuilder};
2020
use arrow_array::{ArrayRef, RecordBatch};
2121
use arrow_schema::SchemaRef;
2222
use criterion::{black_box, criterion_group, criterion_main, Criterion};
23+
use datafusion::datasource::memory::MemorySourceConfig;
24+
use datafusion::datasource::source::DataSourceExec;
2325
use datafusion::execution::TaskContext;
2426
use datafusion::functions_aggregate::average::avg_udaf;
2527
use datafusion::functions_aggregate::sum::sum_udaf;
2628
use datafusion::physical_expr::PhysicalExpr;
2729
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
28-
use datafusion::physical_plan::memory::MemoryExec;
2930
use datafusion::physical_plan::ExecutionPlan;
3031
use datafusion_comet_spark_expr::AvgDecimal;
3132
use datafusion_comet_spark_expr::SumDecimal;
@@ -119,8 +120,9 @@ async fn agg_test(
119120
alias: &str,
120121
) {
121122
let schema = &partitions[0][0].schema();
122-
let scan: Arc<dyn ExecutionPlan> =
123-
Arc::new(MemoryExec::try_new(partitions, Arc::clone(schema), None).unwrap());
123+
let scan: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
124+
MemorySourceConfig::try_new(partitions, Arc::clone(schema), None).unwrap(),
125+
)));
124126
let aggregate = create_aggregate(scan, c0.clone(), c1.clone(), schema, aggregate_udf, alias);
125127
let mut stream = aggregate
126128
.execute(0, Arc::new(TaskContext::default()))

native/spark-expr/src/agg_funcs/stddev.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use datafusion_common::types::NativeType;
2727
use datafusion_common::{internal_err, Result, ScalarValue};
2828
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
2929
use datafusion_expr::{AggregateUDFImpl, Signature, Volatility};
30+
use datafusion_expr_common::signature::Coercion;
3031
use datafusion_physical_expr::expressions::format_state_name;
3132
use datafusion_physical_expr::expressions::StatsType;
3233

@@ -56,11 +57,11 @@ impl Stddev {
5657
Self {
5758
name: name.into(),
5859
signature: Signature::coercible(
59-
vec![
60+
vec![Coercion::new_exact(
6061
datafusion_expr_common::signature::TypeSignatureClass::Native(Arc::new(
6162
NativeType::Float64,
6263
)),
63-
],
64+
)],
6465
Volatility::Immutable,
6566
),
6667
stats_type,

native/spark-expr/src/agg_funcs/sum_decimal.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -465,9 +465,10 @@ mod tests {
465465
use arrow::datatypes::*;
466466
use arrow_array::builder::{Decimal128Builder, StringBuilder};
467467
use arrow_array::RecordBatch;
468+
use datafusion::datasource::memory::MemorySourceConfig;
469+
use datafusion::datasource::source::DataSourceExec;
468470
use datafusion::execution::TaskContext;
469471
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
470-
use datafusion::physical_plan::memory::MemoryExec;
471472
use datafusion::physical_plan::ExecutionPlan;
472473
use datafusion_common::Result;
473474
use datafusion_expr::AggregateUDF;
@@ -495,8 +496,9 @@ mod tests {
495496

496497
let data_type = DataType::Decimal128(8, 2);
497498
let schema = Arc::clone(&partitions[0][0].schema());
498-
let scan: Arc<dyn ExecutionPlan> =
499-
Arc::new(MemoryExec::try_new(partitions, Arc::clone(&schema), None).unwrap());
499+
let scan: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
500+
MemorySourceConfig::try_new(partitions, Arc::clone(&schema), None).unwrap(),
501+
)));
500502

501503
let aggregate_udf = Arc::new(AggregateUDF::new_from_impl(SumDecimal::try_new(
502504
data_type.clone(),

0 commit comments

Comments
 (0)