Skip to content

Commit 2cf6abd

Browse files
authored
fix: conflicting (#46)
1 parent eb01db1 commit 2cf6abd

File tree

2 files changed

+22
-0
lines changed

2 files changed

+22
-0
lines changed

datafusion/tests/test_dataframe.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,3 +312,14 @@ def test_except_all():
312312
df_a_e_b = df_a.except_all(df_b).sort(column("a").sort(ascending=True))
313313

314314
assert df_c.collect() == df_a_e_b.collect()
315+
316+
317+
def test_collect_partitioned():
318+
ctx = SessionContext()
319+
320+
batch = pa.RecordBatch.from_arrays(
321+
[pa.array([1, 2, 3]), pa.array([4, 5, 6])],
322+
names=["a", "b"],
323+
)
324+
325+
assert [[batch]] == ctx.create_dataframe([[batch]]).collect_partitioned()

src/dataframe.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,17 @@ impl PyDataFrame {
129129
batches.into_iter().map(|rb| rb.to_pyarrow(py)).collect()
130130
}
131131

132+
/// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
133+
/// maintaining the input partitioning.
134+
fn collect_partitioned(&self, py: Python) -> PyResult<Vec<Vec<PyObject>>> {
135+
let batches = wait_for_future(py, self.df.collect_partitioned())?;
136+
137+
batches
138+
.into_iter()
139+
.map(|rbs| rbs.into_iter().map(|rb| rb.to_pyarrow(py)).collect())
140+
.collect()
141+
}
142+
132143
/// Print the result, 20 lines by default
133144
#[args(num = "20")]
134145
fn show(&self, py: Python, num: usize) -> PyResult<()> {

0 commit comments

Comments
 (0)