Skip to content

Improve Spill Performance: mmap the spill files #15321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Tracked by #15271
alamb opened this issue Mar 19, 2025 · 10 comments
Open
Tracked by #15271

Improve Spill Performance: mmap the spill files #15321

alamb opened this issue Mar 19, 2025 · 10 comments
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Mar 19, 2025

Is your feature request related to a problem or challenge?

Today when DataFusion spills files to disk, it uses the Arrow IPC format

Here is the code:

pub(crate) fn spill_record_batches(
batches: &[RecordBatch],
path: PathBuf,
schema: SchemaRef,
) -> Result<(usize, usize)> {
let mut writer = IPCStreamWriter::new(path.as_ref(), schema.as_ref())?;
for batch in batches {
writer.write(batch)?;
}
writer.finish()?;
debug!(
"Spilled {} batches of total {} rows to disk, memory released {}",
writer.num_batches,
writer.num_rows,
human_readable_size(writer.num_bytes),
);
Ok((writer.num_rows, writer.num_bytes))
}
fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(path)?);
let reader = StreamReader::try_new(file, None)?;
for batch in reader {
sender
.blocking_send(batch.map_err(Into::into))
.map_err(|e| exec_datafusion_err!("{e}"))?;
}
Ok(())
}

The IPC reader currently reads the spill files using file IO and into memory.

it is possible to use mmap to zero copy the contents of the files into memory. Here is an example of how to do so:

https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs

Describe the solution you'd like

I would like to see if using mmap to read the spill files back in is faster

Describe alternatives you've considered

  1. Use mmap to read spill files
  2. Add / use a benchmark showing the peformance benefit of doing this

Additional context

@zebsme
Copy link
Contributor

zebsme commented Mar 22, 2025

take

@alamb
Copy link
Contributor Author

alamb commented Mar 22, 2025

BTW I am not sure the code is really in a great position to do this one yet -- it might help to wait for @2010YOUY01 (or help him) to pull some of the spilling code into its own structure -- see #15355

@zebsme
Copy link
Contributor

zebsme commented Mar 26, 2025

hi @alamb , Thanks for sharing your great example here. Following your approach, I noticed that when using an Arrow buffer with mmap as a field in the Decoder, running the tests produces the following output:

thread 'memory_limit::test_stringview_external_sort' panicked at datafusion/core/tests/memory_limit/mod.rs:468:32:
Query execution failed: ResourcesExhausted("Failed to allocate additional 6643090 bytes for ExternalSorterMerge[0] with 27430266 bytes already allocated for this reservation - 1410938 bytes remain available for the total pool")

It seems that the drop method defined in memmap2 isn't being executed:

impl Drop for MmapInner {
    fn drop(&mut self) {
        let (ptr, len, _) = self.as_mmap_params();

        // Any errors during unmapping/closing are ignored as the only way
        // to report them would be through panicking which is highly discouraged
        // in Drop impls, c.f. https://github.com/rust-lang/lang-team/issues/97
        unsafe { libc::munmap(ptr, len as libc::size_t) };
    }
}

However, after switching to using mmap directly as a field, the related tests passed successfully.

@2010YOUY01
Copy link
Contributor

hi @alamb , Thanks for sharing your great example here. Following your approach, I noticed that when using an Arrow buffer with mmap as a field in the Decoder, running the tests produces the following output:

thread 'memory_limit::test_stringview_external_sort' panicked at datafusion/core/tests/memory_limit/mod.rs:468:32:
Query execution failed: ResourcesExhausted("Failed to allocate additional 6643090 bytes for ExternalSorterMerge[0] with 27430266 bytes already allocated for this reservation - 1410938 bytes remain available for the total pool")

It seems that the drop method defined in memmap2 isn't being executed:

impl Drop for MmapInner {
    fn drop(&mut self) {
        let (ptr, len, _) = self.as_mmap_params();

        // Any errors during unmapping/closing are ignored as the only way
        // to report them would be through panicking which is highly discouraged
        // in Drop impls, c.f. https://github.com/rust-lang/lang-team/issues/97
        unsafe { libc::munmap(ptr, len as libc::size_t) };
    }
}

However, after switching to using mmap directly as a field, the related tests passed successfully.

This error is triggered by DataFusion's internal memory tracking component, instead of thrown by the OS. So likely there is something wrong with the spill logic, could you share the draft code?

@zebsme
Copy link
Contributor

zebsme commented Mar 26, 2025

hi @alamb , Thanks for sharing your great example here. Following your approach, I noticed that when using an Arrow buffer with mmap as a field in the Decoder, running the tests produces the following output:

thread 'memory_limit::test_stringview_external_sort' panicked at datafusion/core/tests/memory_limit/mod.rs:468:32:
Query execution failed: ResourcesExhausted("Failed to allocate additional 6643090 bytes for ExternalSorterMerge[0] with 27430266 bytes already allocated for this reservation - 1410938 bytes remain available for the total pool")

It seems that the drop method defined in memmap2 isn't being executed:

impl Drop for MmapInner {
    fn drop(&mut self) {
        let (ptr, len, _) = self.as_mmap_params();

        // Any errors during unmapping/closing are ignored as the only way
        // to report them would be through panicking which is highly discouraged
        // in Drop impls, c.f. https://github.com/rust-lang/lang-team/issues/97
        unsafe { libc::munmap(ptr, len as libc::size_t) };
    }
}

However, after switching to using mmap directly as a field, the related tests passed successfully.

This error is triggered by DataFusion's internal memory tracking component, instead of thrown by the OS. So likely there is something wrong with the spill logic, could you share the draft code?

In this issue, I've been experimenting with replacing StreamWriter/StreamReader with FileWriter/FileReader to evaluate potential performance improvements using mmap.

However directly replacement seems to cause some other problems, see #14868.

Those errors above are encountered with my own FileWriter and FileReader implementation. So I don't think it's an issue for our current spill implementation :)

@zebsme
Copy link
Contributor

zebsme commented Apr 8, 2025

hi @2010YOUY01 , create a draft here, zebsme#3.

This draft can reproduce the errors, and apply the code changes in comment will resolve all the test failures.

I'm uncertain if the munmap is responsible for memory allocation failures - could you please help review this ?

@2010YOUY01
Copy link
Contributor

2010YOUY01 commented Apr 8, 2025

I took a look, but the reason for those test failures isn’t obvious to me. However, I think the implementation requires two changes:

  1. Always use SpillManager's read utility to read spill, instead of using a lower-level utility function in SMJ (I forgot to update the spill read path in refactor: Use SpillManager for all spilling scenarios #15405 🤦‍♂, will update it soon it's hard to refactor SMJ code, maybe we can ignore this and use the existing hack)
  2. We have to use IPCStreamWriter instead of IPCFileWriter, otherwise it will cause a regression for not supporting spilling dictionary type, see Use arrow IPC Stream format for spill files #14868. (unless we can also support spilling dictionaries in IPCFileWriter)

@zebsme
Copy link
Contributor

zebsme commented Apr 8, 2025

Thanks for your reply @2010YOUY01 , and the benchmark shows that StreamReader with mmap has no performance improvement compared to current implementation:

spill_io/StreamReader/read_100/
                        time:   [7.1020 ms 7.2847 ms 7.4983 ms]
                        change: [-2.7305% +1.2574% +5.1899%] (p = 0.55 > 0.05)
                        No change in performance detected.

So there is no need to enable mmap now, as we need stream format here.

@2010YOUY01
Copy link
Contributor

Thanks for your reply @2010YOUY01 , and the benchmark shows that StreamReader with mmap has no performance improvement compared to current implementation:

spill_io/StreamReader/read_100/
                        time:   [7.1020 ms 7.2847 ms 7.4983 ms]
                        change: [-2.7305% +1.2574% +5.1899%] (p = 0.55 > 0.05)
                        No change in performance detected.

So there is no need to enable mmap now, as we need stream format here.

Maybe it's already using mmap 🤔 I'll double-check it when I start to look into the performance, thank you for the experiments.

@zebsme
Copy link
Contributor

zebsme commented Apr 8, 2025

StreamReader with mmap enabled:

fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
    let file = File::open(path)?;
    let mmap = unsafe { memmap2::Mmap::map(&file)? };
    // SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications
    // with validated schemas and buffers. Skip redundant validation during read
    // to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written.
    let reader = unsafe {
        StreamReader::try_new(Cursor::new(mmap), None)?.with_skip_validation(true)
    };
    for batch in reader {
        sender
            .blocking_send(batch.map_err(Into::into))
            .map_err(|e| exec_datafusion_err!("{e}"))?;
    }
    Ok(())
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants