Skip to content

Commit b41d607

Browse files
committed
Remove waits from blocking threads reading spill files.
Fixes #15323. The previous design of reading spill files was a `push` design, spawning long lived blocking tasks which repeatedly read records, send them and wait until they are received. This design had an issue where progress wasn't guaranteed (i.e., there was a deadlock) if there were more spill files than the blocking thread pool in tokio which were all waited for together. To solve this, the design is changed to a `pull` design, where blocking tasks are spawned for every read, removing waiting on the IO threads and guaranteeing progress. While there might be an added overhead for repeatedly calling `spawn_blocking`, it's probably insignificant compared to the IO cost of reading from the disk.
1 parent 18a80f0 commit b41d607

File tree

2 files changed

+193
-25
lines changed

2 files changed

+193
-25
lines changed

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 187 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,161 @@ pub(crate) mod spill_manager;
2323
use std::fs::File;
2424
use std::io::BufReader;
2525
use std::path::{Path, PathBuf};
26+
use std::pin::Pin;
2627
use std::ptr::NonNull;
28+
use std::sync::Arc;
29+
use std::task::{Context, Poll};
2730

2831
use arrow::array::ArrayData;
2932
use arrow::datatypes::{Schema, SchemaRef};
3033
use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
3134
use arrow::record_batch::RecordBatch;
32-
use tokio::sync::mpsc::Sender;
33-
34-
use datafusion_common::{exec_datafusion_err, HashSet, Result};
35-
36-
fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
37-
let file = BufReader::new(File::open(path)?);
38-
// SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications
39-
// with validated schemas and buffers. Skip redundant validation during read
40-
// to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written.
41-
let reader = unsafe { StreamReader::try_new(file, None)?.with_skip_validation(true) };
42-
for batch in reader {
43-
sender
44-
.blocking_send(batch.map_err(Into::into))
45-
.map_err(|e| exec_datafusion_err!("{e}"))?;
35+
36+
use datafusion_common::{exec_datafusion_err, DataFusionError, HashSet, Result};
37+
use datafusion_common_runtime::SpawnedTask;
38+
use datafusion_execution::disk_manager::RefCountedTempFile;
39+
use datafusion_execution::RecordBatchStream;
40+
use futures::{FutureExt as _, Stream};
41+
42+
/// Stream that reads spill files from disk where each batch is read in a spawned blocking task
43+
/// It will read one batch at a time and will not do any buffering, to buffer data use [`crate::common::spawn_buffered`]
44+
///
45+
/// A simpler solution would be spawning a long-running blocking task for each
46+
/// file read (instead of each batch). This approach does not work because when
47+
/// the number of concurrent reads exceeds the Tokio thread pool limit,
48+
/// deadlocks can occur and block progress.
49+
struct SpillReaderStream {
50+
schema: SchemaRef,
51+
state: SpillReaderStreamState,
52+
}
53+
54+
/// When we poll for the next batch, we will get back both the batch and the reader,
55+
/// so we can call `next` again.
56+
type NextRecordBatchResult = Result<(StreamReader<BufReader<File>>, Option<RecordBatch>)>;
57+
58+
enum SpillReaderStreamState {
59+
/// Initial state: the stream was not initialized yet
60+
/// and the file was not opened
61+
Uninitialized(RefCountedTempFile),
62+
63+
/// A read is in progress in a spawned blocking task for which we hold the handle.
64+
ReadInProgress(SpawnedTask<NextRecordBatchResult>),
65+
66+
/// A read has finished and we wait for being polled again in order to start reading the next batch.
67+
Waiting(StreamReader<BufReader<File>>),
68+
69+
/// The stream has finished, successfully or not.
70+
Done,
71+
}
72+
73+
impl SpillReaderStream {
74+
fn new(schema: SchemaRef, spill_file: RefCountedTempFile) -> Self {
75+
Self {
76+
schema,
77+
state: SpillReaderStreamState::Uninitialized(spill_file),
78+
}
79+
}
80+
81+
fn poll_next_inner(
82+
&mut self,
83+
cx: &mut Context<'_>,
84+
) -> Poll<Option<Result<RecordBatch>>> {
85+
match &mut self.state {
86+
SpillReaderStreamState::Uninitialized(_) => {
87+
// Temporarily replace with `Done` to be able to pass the file to the task.
88+
let SpillReaderStreamState::Uninitialized(spill_file) =
89+
std::mem::replace(&mut self.state, SpillReaderStreamState::Done)
90+
else {
91+
unreachable!()
92+
};
93+
94+
let task = SpawnedTask::spawn_blocking(move || {
95+
let file = BufReader::new(File::open(spill_file.path())?);
96+
// SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications
97+
// with validated schemas and buffers. Skip redundant validation during read
98+
// to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written.
99+
let mut reader = unsafe {
100+
StreamReader::try_new(file, None)?.with_skip_validation(true)
101+
};
102+
103+
let next_batch = reader.next().transpose()?;
104+
105+
Ok((reader, next_batch))
106+
});
107+
108+
self.state = SpillReaderStreamState::ReadInProgress(task);
109+
110+
// Poll again immediately so the inner task is polled and the waker is
111+
// registered.
112+
self.poll_next_inner(cx)
113+
}
114+
115+
SpillReaderStreamState::ReadInProgress(task) => {
116+
let result = futures::ready!(task.poll_unpin(cx))
117+
.unwrap_or_else(|err| Err(DataFusionError::External(Box::new(err))));
118+
119+
match result {
120+
Ok((reader, batch)) => {
121+
match batch {
122+
Some(batch) => {
123+
self.state = SpillReaderStreamState::Waiting(reader);
124+
125+
Poll::Ready(Some(Ok(batch)))
126+
}
127+
None => {
128+
// Stream is done
129+
self.state = SpillReaderStreamState::Done;
130+
131+
Poll::Ready(None)
132+
}
133+
}
134+
}
135+
Err(err) => {
136+
self.state = SpillReaderStreamState::Done;
137+
138+
Poll::Ready(Some(Err(err)))
139+
}
140+
}
141+
}
142+
143+
SpillReaderStreamState::Waiting(_) => {
144+
// Temporarily replace with `Done` to be able to pass the file to the task.
145+
let SpillReaderStreamState::Waiting(mut reader) =
146+
std::mem::replace(&mut self.state, SpillReaderStreamState::Done)
147+
else {
148+
unreachable!()
149+
};
150+
151+
let task = SpawnedTask::spawn_blocking(move || {
152+
let next_batch = reader.next().transpose()?;
153+
154+
Ok((reader, next_batch))
155+
});
156+
157+
self.state = SpillReaderStreamState::ReadInProgress(task);
158+
159+
// Poll again immediately so the inner task is polled and the waker is
160+
// registered.
161+
self.poll_next_inner(cx)
162+
}
163+
164+
SpillReaderStreamState::Done => Poll::Ready(None),
165+
}
166+
}
167+
}
168+
169+
impl Stream for SpillReaderStream {
170+
type Item = Result<RecordBatch>;
171+
172+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
173+
self.get_mut().poll_next_inner(cx)
174+
}
175+
}
176+
177+
impl RecordBatchStream for SpillReaderStream {
178+
fn schema(&self) -> SchemaRef {
179+
Arc::clone(&self.schema)
46180
}
47-
Ok(())
48181
}
49182

50183
/// Spill the `RecordBatch` to disk as smaller batches
@@ -205,6 +338,7 @@ mod tests {
205338
use arrow::record_batch::RecordBatch;
206339
use datafusion_common::Result;
207340
use datafusion_execution::runtime_env::RuntimeEnv;
341+
use futures::StreamExt as _;
208342

209343
use std::sync::Arc;
210344

@@ -604,4 +738,42 @@ mod tests {
604738

605739
Ok(())
606740
}
741+
742+
#[test]
743+
fn test_reading_more_spills_than_tokio_blocking_threads() -> Result<()> {
744+
tokio::runtime::Builder::new_current_thread()
745+
.enable_all()
746+
.max_blocking_threads(1)
747+
.build()
748+
.unwrap()
749+
.block_on(async {
750+
let batch = build_table_i32(
751+
("a2", &vec![0, 1, 2]),
752+
("b2", &vec![3, 4, 5]),
753+
("c2", &vec![4, 5, 6]),
754+
);
755+
756+
let schema = batch.schema();
757+
758+
// Construct SpillManager
759+
let env = Arc::new(RuntimeEnv::default());
760+
let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
761+
let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema));
762+
let batches: [_; 10] = std::array::from_fn(|_| batch.clone());
763+
764+
let spill_file_1 = spill_manager
765+
.spill_record_batch_and_finish(&batches, "Test1")?
766+
.unwrap();
767+
let spill_file_2 = spill_manager
768+
.spill_record_batch_and_finish(&batches, "Test2")?
769+
.unwrap();
770+
771+
let mut stream_1 = spill_manager.read_spill_as_stream(spill_file_1)?;
772+
let mut stream_2 = spill_manager.read_spill_as_stream(spill_file_2)?;
773+
stream_1.next().await;
774+
stream_2.next().await;
775+
776+
Ok(())
777+
})
778+
}
607779
}

datafusion/physical-plan/src/spill/spill_manager.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@ use datafusion_common::Result;
2727
use datafusion_execution::disk_manager::RefCountedTempFile;
2828
use datafusion_execution::SendableRecordBatchStream;
2929

30-
use crate::metrics::SpillMetrics;
31-
use crate::stream::RecordBatchReceiverStream;
30+
use crate::{common::spawn_buffered, metrics::SpillMetrics};
3231

33-
use super::{in_progress_spill_file::InProgressSpillFile, read_spill};
32+
use super::{in_progress_spill_file::InProgressSpillFile, SpillReaderStream};
3433

3534
/// The `SpillManager` is responsible for the following tasks:
3635
/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations.
@@ -126,14 +125,11 @@ impl SpillManager {
126125
&self,
127126
spill_file_path: RefCountedTempFile,
128127
) -> Result<SendableRecordBatchStream> {
129-
let mut builder = RecordBatchReceiverStream::builder(
128+
let stream = Box::pin(SpillReaderStream::new(
130129
Arc::clone(&self.schema),
131-
self.batch_read_buffer_capacity,
132-
);
133-
let sender = builder.tx();
130+
spill_file_path,
131+
));
134132

135-
builder.spawn_blocking(move || read_spill(sender, spill_file_path.path()));
136-
137-
Ok(builder.build())
133+
Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
138134
}
139135
}

0 commit comments

Comments
 (0)