Skip to content

Remove waits from blocking threads reading spill files. #15654

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 187 additions & 15 deletions datafusion/physical-plan/src/spill/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,161 @@ pub(crate) mod spill_manager;
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow::array::ArrayData;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
use arrow::record_batch::RecordBatch;
use tokio::sync::mpsc::Sender;

use datafusion_common::{exec_datafusion_err, HashSet, Result};

fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(path)?);
// SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications
// with validated schemas and buffers. Skip redundant validation during read
// to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written.
let reader = unsafe { StreamReader::try_new(file, None)?.with_skip_validation(true) };
for batch in reader {
sender
.blocking_send(batch.map_err(Into::into))
.map_err(|e| exec_datafusion_err!("{e}"))?;

use datafusion_common::{exec_datafusion_err, DataFusionError, HashSet, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::RecordBatchStream;
use futures::{FutureExt as _, Stream};

/// Stream that reads spill files from disk where each batch is read in a spawned blocking task
/// It will read one batch at a time and will not do any buffering, to buffer data use [`crate::common::spawn_buffered`]
///
/// A simpler solution would be spawning a long-running blocking task for each
/// file read (instead of each batch). This approach does not work because when
/// the number of concurrent reads exceeds the Tokio thread pool limit,
/// deadlocks can occur and block progress.
struct SpillReaderStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
struct SpillReaderStream {
/// A simpler solution would be spawning a long-running blocking task for each
/// file read (instead of each batch). This approach does not work because when
/// the number of concurrent reads exceeds the Tokio thread pool limit,
/// deadlocks can occur and block progress.
struct SpillReaderStream {

I recommend to add a 'why' comment here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strongly agree

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

schema: SchemaRef,
state: SpillReaderStreamState,
}

/// When we poll for the next batch, we will get back both the batch and the reader,
/// so we can call `next` again.
type NextRecordBatchResult = Result<(StreamReader<BufReader<File>>, Option<RecordBatch>)>;

enum SpillReaderStreamState {
/// Initial state: the stream was not initialized yet
/// and the file was not opened
Uninitialized(RefCountedTempFile),

/// A read is in progress in a spawned blocking task for which we hold the handle.
ReadInProgress(SpawnedTask<NextRecordBatchResult>),

/// A read has finished and we wait for being polled again in order to start reading the next batch.
Waiting(StreamReader<BufReader<File>>),

/// The stream has finished, successfully or not.
Done,
}

impl SpillReaderStream {
fn new(schema: SchemaRef, spill_file: RefCountedTempFile) -> Self {
Self {
schema,
state: SpillReaderStreamState::Uninitialized(spill_file),
}
}

fn poll_next_inner(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
match &mut self.state {
SpillReaderStreamState::Uninitialized(_) => {
// Temporarily replace with `Done` to be able to pass the file to the task.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another pattern for this that could avoid the unreachable might be to change the origianl match to something like

// temporily mark as done:
let state = std::mem::replace(&mut self.state, SpillReaderStreamState::Done);

// Now you can match with an owned state
match state { 
...
}

Copy link
Contributor

@rluvaton rluvaton Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem with this is that it is easier to leave that done state, for example the futures::ready! macro does return in it in case of pending so we would not be able to use it and it is prune to errors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fair -- I don't have a strong preference about which pattern to use, I was just mentioning an alternate pattern as a possibility

let SpillReaderStreamState::Uninitialized(spill_file) =
std::mem::replace(&mut self.state, SpillReaderStreamState::Done)
else {
unreachable!()
};

let task = SpawnedTask::spawn_blocking(move || {
let file = BufReader::new(File::open(spill_file.path())?);
// SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications
// with validated schemas and buffers. Skip redundant validation during read
// to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written.
let mut reader = unsafe {
StreamReader::try_new(file, None)?.with_skip_validation(true)
};

let next_batch = reader.next().transpose()?;

Ok((reader, next_batch))
});

self.state = SpillReaderStreamState::ReadInProgress(task);

// Poll again immediately so the inner task is polled and the waker is
// registered.
self.poll_next_inner(cx)
}

SpillReaderStreamState::ReadInProgress(task) => {
let result = futures::ready!(task.poll_unpin(cx))
.unwrap_or_else(|err| Err(DataFusionError::External(Box::new(err))));

match result {
Ok((reader, batch)) => {
match batch {
Some(batch) => {
self.state = SpillReaderStreamState::Waiting(reader);

Poll::Ready(Some(Ok(batch)))
}
None => {
// Stream is done
self.state = SpillReaderStreamState::Done;

Poll::Ready(None)
}
}
}
Err(err) => {
self.state = SpillReaderStreamState::Done;

Poll::Ready(Some(Err(err)))
}
}
}

SpillReaderStreamState::Waiting(_) => {
// Temporarily replace with `Done` to be able to pass the file to the task.
let SpillReaderStreamState::Waiting(mut reader) =
std::mem::replace(&mut self.state, SpillReaderStreamState::Done)
else {
unreachable!()
};

let task = SpawnedTask::spawn_blocking(move || {
let next_batch = reader.next().transpose()?;

Ok((reader, next_batch))
});

self.state = SpillReaderStreamState::ReadInProgress(task);

// Poll again immediately so the inner task is polled and the waker is
// registered.
self.poll_next_inner(cx)
}

SpillReaderStreamState::Done => Poll::Ready(None),
}
}
}

impl Stream for SpillReaderStream {
type Item = Result<RecordBatch>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut().poll_next_inner(cx)
}
}

impl RecordBatchStream for SpillReaderStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
Ok(())
}

/// Spill the `RecordBatch` to disk as smaller batches
Expand Down Expand Up @@ -205,6 +338,7 @@ mod tests {
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_execution::runtime_env::RuntimeEnv;
use futures::StreamExt as _;

use std::sync::Arc;

Expand Down Expand Up @@ -604,4 +738,42 @@ mod tests {

Ok(())
}

#[test]
fn test_reading_more_spills_than_tokio_blocking_threads() -> Result<()> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(1)
.build()
.unwrap()
.block_on(async {
let batch = build_table_i32(
("a2", &vec![0, 1, 2]),
("b2", &vec![3, 4, 5]),
("c2", &vec![4, 5, 6]),
);

let schema = batch.schema();

// Construct SpillManager
let env = Arc::new(RuntimeEnv::default());
let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema));
let batches: [_; 10] = std::array::from_fn(|_| batch.clone());

let spill_file_1 = spill_manager
.spill_record_batch_and_finish(&batches, "Test1")?
.unwrap();
let spill_file_2 = spill_manager
.spill_record_batch_and_finish(&batches, "Test2")?
.unwrap();

let mut stream_1 = spill_manager.read_spill_as_stream(spill_file_1)?;
let mut stream_2 = spill_manager.read_spill_as_stream(spill_file_2)?;
stream_1.next().await;
stream_2.next().await;

Ok(())
})
}
}
16 changes: 6 additions & 10 deletions datafusion/physical-plan/src/spill/spill_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ use datafusion_common::Result;
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::SendableRecordBatchStream;

use crate::metrics::SpillMetrics;
use crate::stream::RecordBatchReceiverStream;
use crate::{common::spawn_buffered, metrics::SpillMetrics};

use super::{in_progress_spill_file::InProgressSpillFile, read_spill};
use super::{in_progress_spill_file::InProgressSpillFile, SpillReaderStream};

/// The `SpillManager` is responsible for the following tasks:
/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations.
Expand Down Expand Up @@ -126,14 +125,11 @@ impl SpillManager {
&self,
spill_file_path: RefCountedTempFile,
) -> Result<SendableRecordBatchStream> {
let mut builder = RecordBatchReceiverStream::builder(
let stream = Box::pin(SpillReaderStream::new(
Arc::clone(&self.schema),
self.batch_read_buffer_capacity,
);
let sender = builder.tx();
spill_file_path,
));

builder.spawn_blocking(move || read_spill(sender, spill_file_path.path()));

Ok(builder.build())
Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
}
}