@@ -30,11 +30,12 @@ use datafusion::arrow::util::pretty;
3030use datafusion:: common:: stats:: Precision ;
3131use datafusion:: common:: { DFSchema , DataFusionError , Statistics , UnnestOptions } ;
3232use datafusion:: common:: tree_node:: { Transformed , TreeNode } ;
33- use datafusion:: config:: { ConfigOptions , CsvOptions , TableParquetOptions } ;
33+ use datafusion:: config:: { CsvOptions , TableParquetOptions } ;
3434use datafusion:: dataframe:: { DataFrame , DataFrameWriteOptions } ;
35+ use datafusion:: datasource:: memory:: DataSourceExec ;
3536use datafusion:: datasource:: TableProvider ;
36- use datafusion:: execution :: runtime_env :: RuntimeEnvBuilder ;
37- use datafusion:: datasource:: physical_plan :: ParquetExec ;
37+ use datafusion:: datasource :: physical_plan :: FileScanConfig ;
38+ use datafusion:: datasource:: source :: DataSource ;
3839use datafusion:: execution:: SendableRecordBatchStream ;
3940use datafusion:: parquet:: basic:: { BrotliLevel , Compression , GzipLevel , ZstdLevel } ;
4041use datafusion:: physical_plan:: { ExecutionPlan , ExecutionPlanProperties } ;
@@ -753,25 +754,28 @@ impl DistributedPlan {
753754 return Ok ( ( ) )
754755 }
755756 let updated_plan = self . plan ( ) . clone ( ) . transform_up ( |node| {
756- if let Some ( parquet ) = node. as_any ( ) . downcast_ref :: < ParquetExec > ( ) {
757+ if let Some ( exec ) = node. as_any ( ) . downcast_ref :: < DataSourceExec > ( ) {
757758 // Remove redundant ranges from partition files because ParquetExec refuses to repartition
758759 // if any file has a range defined (even when the range actually covers the entire file).
759760 // The EnforceDistribution optimizer rule adds ranges for both full and partial files,
760761 // so this tries to rever that to trigger a repartition when no files are actually split.
761- let mut file_groups = parquet. base_config ( ) . file_groups . clone ( ) ;
762- for group in file_groups. iter_mut ( ) {
763- for file in group. iter_mut ( ) {
764- if let Some ( range) = & file. range {
765- if range. start == 0 && range. end == file. object_meta . size as i64 {
766- file. range = None ; // remove redundant range
762+ if let Some ( file_scan) = exec. data_source ( ) . as_any ( ) . downcast_ref :: < FileScanConfig > ( ) {
763+ let mut file_groups = file_scan. file_groups . clone ( ) ;
764+ for group in file_groups. iter_mut ( ) {
765+ for file in group. iter_mut ( ) {
766+ if let Some ( range) = & file. range {
767+ if range. start == 0 && range. end == file. object_meta . size as i64 {
768+ file. range = None ; // remove redundant range
769+ }
767770 }
768771 }
769772 }
770- }
771- if let Some ( repartitioned) = parquet. clone ( ) . into_builder ( ) . with_file_groups ( file_groups)
772- . build_arc ( )
773- . repartitioned ( desired_parallelism, & ConfigOptions :: default ( ) ) ? {
774- Ok ( Transformed :: yes ( repartitioned) )
773+ if let Some ( repartitioned) = file_scan. clone ( ) . with_file_groups ( file_groups)
774+ . repartitioned ( desired_parallelism, 10 * 1024 * 1024 , None ) ? {
775+ Ok ( Transformed :: yes ( Arc :: new ( DataSourceExec :: new ( repartitioned) ) ) )
776+ } else {
777+ Ok ( Transformed :: no ( node) )
778+ }
775779 } else {
776780 Ok ( Transformed :: no ( node) )
777781 }
0 commit comments