Skip to content

Commit 638b73b

Browse files
committed
Ray DataFusionDatasource fixes
1 parent 68f1891 commit 638b73b

File tree

1 file changed

+25
-22
lines changed

1 file changed

+25
-22
lines changed

src/dataframe.rs

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use datafusion::datasource::memory::DataSourceExec;
3636
use datafusion::datasource::TableProvider;
3737
use datafusion::datasource::physical_plan::FileScanConfig;
3838
use datafusion::datasource::source::DataSource;
39-
use datafusion::execution::SendableRecordBatchStream;
39+
use datafusion::execution::{SendableRecordBatchStream};
4040
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
4141
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
4242
use datafusion::prelude::*;
@@ -708,17 +708,16 @@ impl PyDataFrame {
708708
}
709709

710710
fn distributed_plan(&self, py: Python<'_>) -> PyResult<DistributedPlan> {
711-
let future_plan = self.df.as_ref().clone().create_physical_plan();
712-
wait_for_future(py, future_plan)
713-
.map(DistributedPlan::new)
714-
.map_err(py_datafusion_err)
711+
let future_plan = DistributedPlan::try_new(self.df.as_ref());
712+
wait_for_future(py, future_plan).map_err(py_datafusion_err)
715713
}
716714

717715
}
718716

719717
#[pyclass(get_all)]
720718
#[derive(Debug, Clone)]
721719
pub struct DistributedPlan {
720+
repartition_file_min_size: usize,
722721
physical_plan: PyExecutionPlan,
723722
}
724723

@@ -755,13 +754,13 @@ impl DistributedPlan {
755754
}
756755
let updated_plan = self.plan().clone().transform_up(|node| {
757756
if let Some(exec) = node.as_any().downcast_ref::<DataSourceExec>() {
758-
// Remove redundant ranges from partition files because ParquetExec refuses to repartition
757+
// Remove redundant ranges from partition files because FileScanConfig refuses to repartition
759758
// if any file has a range defined (even when the range actually covers the entire file).
760759
// The EnforceDistribution optimizer rule adds ranges for both full and partial files,
761-
// so this tries to rever that to trigger a repartition when no files are actually split.
760+
// so this tries to revert that in order to trigger a repartition when no files are actually split.
762761
if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>() {
763-
let mut file_groups = file_scan.file_groups.clone();
764-
for group in file_groups.iter_mut() {
762+
let mut range_free_file_scan = file_scan.clone();
763+
for group in range_free_file_scan.file_groups.iter_mut() {
765764
for file in group.iter_mut() {
766765
if let Some(range) = &file.range {
767766
if range.start == 0 && range.end == file.object_meta.size as i64 {
@@ -770,18 +769,14 @@ impl DistributedPlan {
770769
}
771770
}
772771
}
773-
if let Some(repartitioned) = file_scan.clone().with_file_groups(file_groups)
774-
.repartitioned(desired_parallelism, 10 * 1024 * 1024, None)? {
775-
Ok(Transformed::yes(Arc::new(DataSourceExec::new(repartitioned))))
776-
} else {
777-
Ok(Transformed::no(node))
772+
let ordering = range_free_file_scan.eq_properties().output_ordering();
773+
if let Some(repartitioned) = range_free_file_scan
774+
.repartitioned(desired_parallelism, self.repartition_file_min_size, ordering)? {
775+
return Ok(Transformed::yes(Arc::new(DataSourceExec::new(repartitioned))))
778776
}
779-
} else {
780-
Ok(Transformed::no(node))
781777
}
782-
} else {
783-
Ok(Transformed::no(node))
784778
}
779+
Ok(Transformed::no(node))
785780
}).map_err(py_datafusion_err)?.data;
786781
self.physical_plan = PyExecutionPlan::new(updated_plan);
787782
Ok(())
@@ -790,10 +785,18 @@ impl DistributedPlan {
790785

791786
impl DistributedPlan {
792787

793-
fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
794-
Self {
795-
physical_plan: PyExecutionPlan::new(plan)
796-
}
788+
async fn try_new(df: &DataFrame) -> Result<Self, DataFusionError> {
789+
let (mut session_state, logical_plan) = df.clone().into_parts();
790+
let repartition_file_min_size = session_state.config_options().optimizer.repartition_file_min_size;
791+
// Create the physical plan with a single partition, to ensure that no files are split into ranges.
792+
// Otherwise, any subsequent repartition attempt would fail (see the comment in `set_desired_parallelism`)
793+
session_state.config_mut().options_mut().execution.target_partitions = 1;
794+
let physical_plan = session_state.create_physical_plan(&logical_plan).await?;
795+
let physical_plan = PyExecutionPlan::new(physical_plan);
796+
Ok(Self {
797+
repartition_file_min_size,
798+
physical_plan,
799+
})
797800
}
798801

799802
fn plan(&self) -> &Arc<dyn ExecutionPlan> {

0 commit comments

Comments
 (0)