@@ -41,7 +41,6 @@ use datafusion::arrow::compute::kernels::sort::SortOptions;
41
41
use datafusion:: arrow:: datatypes:: { DataType , Field , IntervalUnit , Schema } ;
42
42
use datafusion:: datasource:: empty:: EmptyTable ;
43
43
use datafusion:: datasource:: file_format:: csv:: CsvSink ;
44
- use datafusion:: datasource:: file_format:: file_compression_type:: FileCompressionType ;
45
44
use datafusion:: datasource:: file_format:: json:: JsonSink ;
46
45
use datafusion:: datasource:: file_format:: parquet:: ParquetSink ;
47
46
use datafusion:: datasource:: listing:: { ListingTableUrl , PartitionedFile } ;
@@ -95,7 +94,7 @@ use datafusion_common::file_options::json_writer::JsonWriterOptions;
95
94
use datafusion_common:: parsers:: CompressionTypeVariant ;
96
95
use datafusion_common:: stats:: Precision ;
97
96
use datafusion_common:: {
98
- internal_err, not_impl_err, Constraints , DataFusionError , Result , UnnestOptions ,
97
+ internal_err, not_impl_err, DataFusionError , Result , UnnestOptions ,
99
98
} ;
100
99
use datafusion_expr:: {
101
100
Accumulator , AccumulatorFactoryFunction , AggregateUDF , ColumnarValue , ScalarUDF ,
@@ -738,33 +737,23 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
738
737
let mut options = TableParquetOptions :: new ( ) ;
739
738
options. global . pushdown_filters = true ;
740
739
741
- let source = Arc :: new (
740
+ let file_source = Arc :: new (
742
741
ParquetSource :: new ( options) . with_predicate ( Arc :: clone ( & file_schema) , predicate) ,
743
742
) ;
744
743
745
- let scan_config = FileScanConfig {
746
- object_store_url : ObjectStoreUrl :: local_filesystem ( ) ,
747
- file_schema,
748
- file_groups : vec ! [ vec![ PartitionedFile :: new(
749
- "/path/to/file.parquet" . to_string( ) ,
750
- 1024 ,
751
- ) ] ] ,
752
- constraints : Constraints :: empty ( ) ,
753
- statistics : Statistics {
754
- num_rows : Precision :: Inexact ( 100 ) ,
755
- total_byte_size : Precision :: Inexact ( 1024 ) ,
756
- column_statistics : Statistics :: unknown_column ( & Arc :: new ( Schema :: new ( vec ! [
757
- Field :: new( "col" , DataType :: Utf8 , false ) ,
758
- ] ) ) ) ,
759
- } ,
760
- projection : None ,
761
- limit : None ,
762
- table_partition_cols : vec ! [ ] ,
763
- output_ordering : vec ! [ ] ,
764
- file_compression_type : FileCompressionType :: UNCOMPRESSED ,
765
- new_lines_in_values : false ,
766
- file_source : source,
767
- } ;
744
+ let scan_config =
745
+ FileScanConfig :: new ( ObjectStoreUrl :: local_filesystem ( ) , file_schema, file_source)
746
+ . with_file_groups ( vec ! [ vec![ PartitionedFile :: new(
747
+ "/path/to/file.parquet" . to_string( ) ,
748
+ 1024 ,
749
+ ) ] ] )
750
+ . with_statistics ( Statistics {
751
+ num_rows : Precision :: Inexact ( 100 ) ,
752
+ total_byte_size : Precision :: Inexact ( 1024 ) ,
753
+ column_statistics : Statistics :: unknown_column ( & Arc :: new ( Schema :: new (
754
+ vec ! [ Field :: new( "col" , DataType :: Utf8 , false ) ] ,
755
+ ) ) ) ,
756
+ } ) ;
768
757
769
758
roundtrip_test ( scan_config. build ( ) )
770
759
}
@@ -777,9 +766,9 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> {
777
766
vec ! [ wrap_partition_value_in_dict( ScalarValue :: Int64 ( Some ( 0 ) ) ) ] ;
778
767
let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "col" , DataType :: Utf8 , false ) ] ) ) ;
779
768
780
- let source = Arc :: new ( ParquetSource :: default ( ) ) ;
769
+ let file_source = Arc :: new ( ParquetSource :: default ( ) ) ;
781
770
let scan_config =
782
- FileScanConfig :: new ( ObjectStoreUrl :: local_filesystem ( ) , schema, source )
771
+ FileScanConfig :: new ( ObjectStoreUrl :: local_filesystem ( ) , schema, file_source )
783
772
. with_projection ( Some ( vec ! [ 0 , 1 ] ) )
784
773
. with_file_group ( vec ! [ file_group] )
785
774
. with_table_partition_cols ( vec ! [ Field :: new(
@@ -801,34 +790,24 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> {
801
790
inner : Arc :: new ( Column :: new ( "col" , 1 ) ) ,
802
791
} ) ;
803
792
804
- let source = Arc :: new (
793
+ let file_source = Arc :: new (
805
794
ParquetSource :: default ( )
806
795
. with_predicate ( Arc :: clone ( & file_schema) , custom_predicate_expr) ,
807
796
) ;
808
797
809
- let scan_config = FileScanConfig {
810
- object_store_url : ObjectStoreUrl :: local_filesystem ( ) ,
811
- file_schema,
812
- file_groups : vec ! [ vec![ PartitionedFile :: new(
813
- "/path/to/file.parquet" . to_string( ) ,
814
- 1024 ,
815
- ) ] ] ,
816
- constraints : Constraints :: empty ( ) ,
817
- statistics : Statistics {
818
- num_rows : Precision :: Inexact ( 100 ) ,
819
- total_byte_size : Precision :: Inexact ( 1024 ) ,
820
- column_statistics : Statistics :: unknown_column ( & Arc :: new ( Schema :: new ( vec ! [
821
- Field :: new( "col" , DataType :: Utf8 , false ) ,
822
- ] ) ) ) ,
823
- } ,
824
- projection : None ,
825
- limit : None ,
826
- table_partition_cols : vec ! [ ] ,
827
- output_ordering : vec ! [ ] ,
828
- file_compression_type : FileCompressionType :: UNCOMPRESSED ,
829
- new_lines_in_values : false ,
830
- file_source : source,
831
- } ;
798
+ let scan_config =
799
+ FileScanConfig :: new ( ObjectStoreUrl :: local_filesystem ( ) , file_schema, file_source)
800
+ . with_file_groups ( vec ! [ vec![ PartitionedFile :: new(
801
+ "/path/to/file.parquet" . to_string( ) ,
802
+ 1024 ,
803
+ ) ] ] )
804
+ . with_statistics ( Statistics {
805
+ num_rows : Precision :: Inexact ( 100 ) ,
806
+ total_byte_size : Precision :: Inexact ( 1024 ) ,
807
+ column_statistics : Statistics :: unknown_column ( & Arc :: new ( Schema :: new (
808
+ vec ! [ Field :: new( "col" , DataType :: Utf8 , false ) ] ,
809
+ ) ) ) ,
810
+ } ) ;
832
811
833
812
#[ derive( Debug , Clone , Eq ) ]
834
813
struct CustomPredicateExpr {
@@ -1608,24 +1587,18 @@ async fn roundtrip_projection_source() -> Result<()> {
1608
1587
1609
1588
let statistics = Statistics :: new_unknown ( & schema) ;
1610
1589
1611
- let source = ParquetSource :: default ( ) . with_statistics ( statistics. clone ( ) ) ;
1612
- let scan_config = FileScanConfig {
1613
- object_store_url : ObjectStoreUrl :: local_filesystem ( ) ,
1614
- file_groups : vec ! [ vec![ PartitionedFile :: new(
1615
- "/path/to/file.parquet" . to_string( ) ,
1616
- 1024 ,
1617
- ) ] ] ,
1618
- constraints : Constraints :: empty ( ) ,
1619
- statistics,
1620
- file_schema : schema. clone ( ) ,
1621
- projection : Some ( vec ! [ 0 , 1 , 2 ] ) ,
1622
- limit : None ,
1623
- table_partition_cols : vec ! [ ] ,
1624
- output_ordering : vec ! [ ] ,
1625
- file_compression_type : FileCompressionType :: UNCOMPRESSED ,
1626
- new_lines_in_values : false ,
1627
- file_source : source,
1628
- } ;
1590
+ let file_source = ParquetSource :: default ( ) . with_statistics ( statistics. clone ( ) ) ;
1591
+ let scan_config = FileScanConfig :: new (
1592
+ ObjectStoreUrl :: local_filesystem ( ) ,
1593
+ schema. clone ( ) ,
1594
+ file_source,
1595
+ )
1596
+ . with_file_groups ( vec ! [ vec![ PartitionedFile :: new(
1597
+ "/path/to/file.parquet" . to_string( ) ,
1598
+ 1024 ,
1599
+ ) ] ] )
1600
+ . with_statistics ( statistics)
1601
+ . with_projection ( Some ( vec ! [ 0 , 1 , 2 ] ) ) ;
1629
1602
1630
1603
let filter = Arc :: new (
1631
1604
FilterExec :: try_new (
0 commit comments