Skip to content

Commit 373b721

Browse files
committed
- replace IPCStreamWriter with IPCFileWriter
- remove test related to apache#14868 remove test
1 parent 46f4024 commit 373b721

File tree

7 files changed

+152
-128
lines changed

7 files changed

+152
-128
lines changed

Cargo.lock

Lines changed: 51 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ ahash = { version = "0.8", default-features = false, features = [
8787
"runtime-rng",
8888
] }
8989
apache-avro = { version = "0.17", default-features = false }
90-
arrow = { version = "54.2.1", features = [
90+
arrow = { version = "54.3.0", features = [
9191
"prettyprint",
9292
"chrono-tz",
9393
] }

datafusion/physical-plan/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ hashbrown = { workspace = true }
6060
indexmap = { workspace = true }
6161
itertools = { workspace = true, features = ["use_std"] }
6262
log = { workspace = true }
63+
memmap2 = "0.9.5"
6364
parking_lot = { workspace = true }
6465
pin-project-lite = "^0.2.7"
6566
tokio = { workspace = true }

datafusion/physical-plan/src/joins/sort_merge_join.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ use std::any::Any;
2424
use std::cmp::Ordering;
2525
use std::collections::{HashMap, VecDeque};
2626
use std::fmt::Formatter;
27-
use std::fs::File;
28-
use std::io::BufReader;
2927
use std::mem::size_of;
3028
use std::ops::Range;
3129
use std::pin::Pin;
@@ -51,8 +49,8 @@ use crate::projection::{
5149
use crate::spill::spill_manager::SpillManager;
5250
use crate::{
5351
metrics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
54-
ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream,
55-
SendableRecordBatchStream, Statistics,
52+
ExecutionPlanProperties, IPCBufferDecoder, PhysicalExpr, PlanProperties,
53+
RecordBatchStream, SendableRecordBatchStream, Statistics,
5654
};
5755

5856
use arrow::array::{types::UInt64Type, *};
@@ -61,7 +59,6 @@ use arrow::compute::{
6159
};
6260
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
6361
use arrow::error::ArrowError;
64-
use arrow::ipc::reader::StreamReader;
6562
use datafusion_common::{
6663
exec_err, internal_err, not_impl_err, plan_err, DataFusionError, HashSet, JoinSide,
6764
JoinType, Result,
@@ -2279,15 +2276,13 @@ fn fetch_right_columns_from_batch_by_idxs(
22792276
let mut buffered_cols: Vec<ArrayRef> =
22802277
Vec::with_capacity(buffered_indices.len());
22812278

2282-
let file = BufReader::new(File::open(spill_file.path())?);
2283-
let reader = StreamReader::try_new(file, None)?;
2279+
let decoder = IPCBufferDecoder::new(spill_file.path());
22842280

2285-
for batch in reader {
2286-
batch?.columns().iter().for_each(|column| {
2281+
for i in 0..decoder.num_batches(){
2282+
decoder.get_batch(i)?.unwrap().columns().iter().for_each(|column| {
22872283
buffered_cols.extend(take(column, &buffered_indices, None))
22882284
});
22892285
}
2290-
22912286
Ok(buffered_cols)
22922287
}
22932288
// Invalid combination

datafusion/physical-plan/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ pub use crate::ordering::InputOrderMode;
5050
pub use crate::stream::EmptyRecordBatchStream;
5151
pub use crate::topk::TopK;
5252
pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};
53-
53+
pub use spill::spill_manager::SpillManager;
54+
pub use spill::IPCBufferDecoder;
5455
mod ordering;
5556
mod render_tree;
5657
mod topk;

datafusion/physical-plan/src/spill/in_progress_spill_file.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ use arrow::array::RecordBatch;
2424
use datafusion_common::exec_datafusion_err;
2525
use datafusion_execution::disk_manager::RefCountedTempFile;
2626

27-
use super::{spill_manager::SpillManager, IPCStreamWriter};
27+
use super::{spill_manager::SpillManager, IPCFileWriter};
2828

2929
/// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
3030
/// Caller is able to use this struct to incrementally append in-memory batches to
3131
/// the file, and then finalize the file by calling the `finish` method.
3232
pub struct InProgressSpillFile {
3333
pub(crate) spill_writer: Arc<SpillManager>,
3434
/// Lazily initialized writer
35-
writer: Option<IPCStreamWriter>,
35+
writer: Option<IPCFileWriter>,
3636
/// Lazily initialized in-progress file, it will be moved out when the `finish` method is invoked
3737
in_progress_file: Option<RefCountedTempFile>,
3838
}
@@ -59,7 +59,7 @@ impl InProgressSpillFile {
5959
if self.writer.is_none() {
6060
let schema = batch.schema();
6161
if let Some(ref in_progress_file) = self.in_progress_file {
62-
self.writer = Some(IPCStreamWriter::new(
62+
self.writer = Some(IPCFileWriter::new(
6363
in_progress_file.path(),
6464
schema.as_ref(),
6565
)?);

0 commit comments

Comments
 (0)