Skip to content

Commit 750cb30

Browse files
committed
fixup! [HSTACK] Building blocks for Ray DataFusionDatasource
1 parent 64bb603 commit 750cb30

File tree

3 files changed

+14
-15
lines changed

3 files changed

+14
-15
lines changed

python/datafusion/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from . import functions, object_store, substrait
3030

3131
# The following imports are okay to remain as opaque to the user.
32-
from ._internal import Config
32+
from ._internal import Config, partition_stream
3333
from .catalog import Catalog, Database, Table
3434
from .common import (
3535
DFSchema,
@@ -81,6 +81,7 @@
8181
"functions",
8282
"object_store",
8383
"substrait",
84+
"partition_stream",
8485
]
8586

8687

python/datafusion/dataframe.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -797,8 +797,8 @@ def count(self) -> int:
797797
"""
798798
return self.df.count()
799799

800-
def distributed_plan(self, num_shards: int):
801-
return self.df.distributed_plan(num_shards)
800+
def distributed_plan(self):
801+
return self.df.distributed_plan()
802802

803803
@deprecated("Use :py:func:`unnest_columns` instead.")
804804
def unnest_column(self, column: str, preserve_nulls: bool = True) -> DataFrame:

src/dataframe.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
2929
use datafusion::arrow::util::pretty;
3030
use datafusion::common::stats::Precision;
3131
use datafusion::common::{DFSchema, DataFusionError, UnnestOptions};
32-
use datafusion::config::{ConfigOptions, CsvOptions, TableParquetOptions};
32+
use datafusion::config::{CsvOptions, TableParquetOptions};
3333
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
3434
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
3535
use datafusion::execution::SendableRecordBatchStream;
@@ -669,10 +669,10 @@ impl PyDataFrame {
669669
Ok(wait_for_future(py, self.df.as_ref().clone().count())?)
670670
}
671671

672-
fn distributed_plan(&self, parallelism: usize, py: Python<'_>) -> PyResult<DistributedPlan> {
672+
fn distributed_plan(&self, py: Python<'_>) -> PyResult<DistributedPlan> {
673673
let future_plan = self.df.as_ref().clone().create_physical_plan();
674674
let physical_plan = wait_for_future(py, future_plan).map_err(py_datafusion_err)?;
675-
DistributedPlan::try_new(physical_plan, parallelism).map_err(py_datafusion_err)
675+
DistributedPlan::try_new(physical_plan).map_err(py_datafusion_err)
676676
}
677677

678678
}
@@ -682,7 +682,7 @@ impl PyDataFrame {
682682
pub struct DistributedPlan {
683683
physical_plan: Vec<u8>,
684684
schema: PyDFSchema,
685-
partitions: usize,
685+
num_partitions: usize,
686686
num_bytes: Option<usize>,
687687
num_rows: Option<usize>,
688688
}
@@ -693,29 +693,27 @@ fn codec() -> &'static dyn PhysicalExtensionCodec {
693693
}
694694

695695
impl DistributedPlan {
696-
fn try_new(plan: Arc<dyn ExecutionPlan>, parallelism: usize) -> Result<Self, DataFusionError> {
696+
fn try_new(plan: Arc<dyn ExecutionPlan>) -> Result<Self, DataFusionError> {
697697
fn extract(prec: Precision<usize>) -> Option<usize> {
698698
match prec {
699699
Precision::Exact(n) => Some(n),
700700
_ => None,
701701
}
702702
}
703703
let (num_bytes, num_rows) = if let Ok(stats) = plan.statistics() {
704-
let num_bytes = extract(stats.total_byte_size);
705-
let num_rows = extract(stats.num_rows);
706-
(num_bytes, num_rows)
704+
let bytes = extract(stats.total_byte_size);
705+
let rows = extract(stats.num_rows);
706+
(bytes, rows)
707707
} else {
708708
(None, None)
709709
};
710710

711711
let schema = DFSchema::try_from(plan.schema())
712712
.map(PyDFSchema::from)?;
713-
let plan = plan.repartitioned(parallelism, &ConfigOptions::default())?
714-
.unwrap_or(plan);
715-
let partitions = plan.properties().partitioning.partition_count();
713+
let num_partitions = plan.properties().partitioning.partition_count();
716714
let physical_plan = PhysicalPlanNode::try_from_physical_plan(plan, codec())?
717715
.encode_to_vec();
718-
Ok(Self { physical_plan, schema, partitions, num_bytes, num_rows })
716+
Ok(Self { physical_plan, schema, num_partitions, num_bytes, num_rows })
719717
}
720718

721719
}

0 commit comments

Comments
 (0)