Skip to content

Commit ea3834e

Browse files
Dandandanclaude
andcommitted
feat: morsel splitting and merging in SharedWorkSource
Split large files and merge small same-file ranges by byte range in the shared work queue, so idle sibling streams always have right-sized work to steal. - Target morsel size of at least 1 MiB projected size - Split: when queue depth < 2 * target_partitions and file >= 2 MiB, split in half and push the second half back - Merge: when popped file < 1 MiB, absorb adjacent same-file entries until reaching 1 MiB - Estimate projected size using per-column byte_size stats from PartitionedFile.statistics when available, otherwise fall back to file_size * (projected_cols / total_cols) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 5287210 commit ea3834e

File tree

1 file changed

+163
-12
lines changed

1 file changed

+163
-12
lines changed

datafusion/datasource/src/file_stream/work_source.rs

Lines changed: 163 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818
use std::collections::VecDeque;
1919
use std::sync::Arc;
2020

21-
use crate::PartitionedFile;
2221
use crate::file_groups::FileGroup;
2322
use crate::file_scan_config::FileScanConfig;
23+
use crate::{FileRange, PartitionedFile};
2424
use parking_lot::Mutex;
2525

26+
/// Minimum morsel size in bytes. Morsels smaller than this are combined;
27+
/// files are only split when each half would be at least this large.
28+
const MIN_MORSEL_SIZE: usize = 1024 * 1024; // 1 MiB
29+
2630
/// Source of work for `ScanState`.
2731
///
2832
/// Streams that may share work across siblings use [`WorkSource::Shared`],
@@ -38,6 +42,9 @@ pub(super) enum WorkSource {
3842

3943
impl WorkSource {
4044
/// Pop the next file to plan from this work source.
45+
///
46+
/// For shared sources, large files may be split in half when the queue
47+
/// is running low, so idle siblings have work to steal.
4148
pub(super) fn pop_front(&mut self) -> Option<PartitionedFile> {
4249
match self {
4350
Self::Local(files) => files.pop_front(),
@@ -60,43 +67,187 @@ impl WorkSource {
6067
/// sibling streams for that execution. Whichever stream becomes idle first may
6168
/// take the next unopened file from the front of the queue.
6269
///
70+
/// When the queue is running low (fewer than `2 * target_partitions` items),
71+
/// large files are split in half so idle siblings have work to steal.
72+
/// Conversely, very small files are batched together so each stream processes
73+
/// at least ~1 MiB of data per round-trip.
74+
///
6375
/// It uses a [`Mutex`] internally to provide thread-safe access
6476
/// to the shared file queue.
6577
#[derive(Debug, Clone)]
6678
pub(crate) struct SharedWorkSource {
6779
inner: Arc<SharedWorkSourceInner>,
6880
}
6981

70-
#[derive(Debug, Default)]
82+
#[derive(Debug)]
7183
pub(super) struct SharedWorkSourceInner {
7284
files: Mutex<VecDeque<PartitionedFile>>,
85+
target_partitions: usize,
86+
/// Column indices (into the table schema) that the scan projects.
87+
/// `None` means all columns are read.
88+
projected_columns: Option<Vec<usize>>,
89+
/// Fallback ratio when per-column byte_size stats are absent.
90+
projection_ratio: f64,
7391
}
7492

7593
impl SharedWorkSource {
76-
/// Create a shared work source containing the provided unopened files.
77-
pub(crate) fn new(files: impl IntoIterator<Item = PartitionedFile>) -> Self {
78-
let files = files.into_iter().collect();
94+
/// Create a shared work source for the unopened files in `config`.
95+
pub(crate) fn from_config(config: &FileScanConfig) -> Self {
96+
let target_partitions = config.file_groups.len();
97+
let total_file_columns = config.file_schema().fields().len().max(1);
98+
let projected_columns = config.file_source.projection().map(|p| {
99+
p.expr_iter()
100+
.filter_map(|e| {
101+
e.as_any()
102+
.downcast_ref::<datafusion_physical_expr::expressions::Column>()
103+
.map(|c| c.index())
104+
})
105+
.collect::<Vec<_>>()
106+
});
107+
let projection_ratio = projected_columns
108+
.as_ref()
109+
.map(|cols| cols.len() as f64 / total_file_columns as f64)
110+
.unwrap_or(1.0);
111+
112+
let files = config
113+
.file_groups
114+
.iter()
115+
.flat_map(FileGroup::iter)
116+
.cloned()
117+
.collect();
79118
Self {
80119
inner: Arc::new(SharedWorkSourceInner {
81120
files: Mutex::new(files),
121+
target_partitions,
122+
projected_columns,
123+
projection_ratio,
82124
}),
83125
}
84126
}
85127

86-
/// Create a shared work source for the unopened files in `config`.
87-
pub(crate) fn from_config(config: &FileScanConfig) -> Self {
88-
Self::new(config.file_groups.iter().flat_map(FileGroup::iter).cloned())
89-
}
90-
91128
/// Pop the next file from the shared work queue.
92129
///
93-
/// Returns `None` if the queue is empty
130+
/// **Splitting**: when the remaining queue depth is below
131+
/// `2 * target_partitions` and the file's projected size is at least
132+
/// `2 * MIN_MORSEL_SIZE`, it is split in half and the second half is
133+
/// pushed back onto the queue.
134+
///
135+
/// **Merging**: when the popped file is below `MIN_MORSEL_SIZE`,
136+
/// adjacent queue entries that refer to the same underlying file are
137+
/// merged (their byte ranges are combined) until the merged result
138+
/// reaches `MIN_MORSEL_SIZE` or no more same-file entries remain.
94139
fn pop_front(&self) -> Option<PartitionedFile> {
95-
self.inner.files.lock().pop_front()
140+
let mut files = self.inner.files.lock();
141+
let mut file = files.pop_front()?;
142+
143+
let projected_size = self.inner.projected_byte_size(&file);
144+
145+
// Split large files when the queue is shallow.
146+
if files.len() < 2 * self.inner.target_partitions
147+
&& projected_size >= 2 * MIN_MORSEL_SIZE
148+
{
149+
let (first, second) = split_file(file);
150+
files.push_back(second);
151+
return Some(first);
152+
}
153+
154+
// Merge small same-file ranges until we reach MIN_MORSEL_SIZE.
155+
if projected_size < MIN_MORSEL_SIZE {
156+
merge_same_file(&mut file, &mut files, &self.inner, MIN_MORSEL_SIZE);
157+
}
158+
159+
Some(file)
96160
}
97161

98162
/// Return the number of files still waiting in the shared queue.
99163
fn len(&self) -> usize {
100164
self.inner.files.lock().len()
101165
}
102166
}
167+
168+
impl SharedWorkSourceInner {
169+
/// Estimate the projected byte size for `file`.
170+
///
171+
/// Uses per-column `byte_size` from [`PartitionedFile::statistics`] when
172+
/// available, otherwise falls back to
173+
/// `raw_file_size * projection_ratio`.
174+
fn projected_byte_size(&self, file: &PartitionedFile) -> usize {
175+
if let (Some(cols), Some(stats)) =
176+
(&self.projected_columns, file.statistics.as_ref())
177+
{
178+
let col_stats = &stats.column_statistics;
179+
let sum: Option<usize> = cols
180+
.iter()
181+
.map(|&idx| {
182+
col_stats
183+
.get(idx)
184+
.and_then(|cs| cs.byte_size.get_value().copied())
185+
})
186+
.collect::<Option<Vec<_>>>()
187+
.map(|v| v.into_iter().sum());
188+
if let Some(size) = sum {
189+
return size;
190+
}
191+
}
192+
// Fallback: raw file/range size scaled by projection ratio.
193+
let raw = raw_file_byte_size(file);
194+
(raw as f64 * self.projection_ratio) as usize
195+
}
196+
}
197+
198+
/// Return the raw on-disk byte size of a [`PartitionedFile`].
199+
fn raw_file_byte_size(file: &PartitionedFile) -> usize {
200+
let (start, end) = file_range(file);
201+
(end - start) as usize
202+
}
203+
204+
/// Merge entries from `queue` into `file` while they refer to the same
205+
/// underlying path and the projected size stays below `target`.
206+
fn merge_same_file(
207+
file: &mut PartitionedFile,
208+
queue: &mut VecDeque<PartitionedFile>,
209+
inner: &SharedWorkSourceInner,
210+
target: usize,
211+
) {
212+
let path = &file.object_meta.location;
213+
while inner.projected_byte_size(file) < target {
214+
let same_file = queue
215+
.front()
216+
.is_some_and(|next| next.object_meta.location == *path);
217+
if !same_file {
218+
break;
219+
}
220+
let next = queue.pop_front().unwrap();
221+
// Extend the byte range to cover both entries.
222+
let (a_start, a_end) = file_range(file);
223+
let (b_start, b_end) = file_range(&next);
224+
file.range = Some(FileRange {
225+
start: a_start.min(b_start),
226+
end: a_end.max(b_end),
227+
});
228+
// Drop per-file statistics — they no longer match the merged range.
229+
file.statistics = None;
230+
}
231+
}
232+
233+
/// Return the effective (start, end) byte range of a file.
234+
fn file_range(file: &PartitionedFile) -> (i64, i64) {
235+
match &file.range {
236+
Some(range) => (range.start, range.end),
237+
None => (0, file.object_meta.size as i64),
238+
}
239+
}
240+
241+
/// Split a file into two halves by byte range.
242+
fn split_file(file: PartitionedFile) -> (PartitionedFile, PartitionedFile) {
243+
let (start, end) = file_range(&file);
244+
let mid = start + (end - start) / 2;
245+
246+
let mut first = file.clone();
247+
first.range = Some(FileRange { start, end: mid });
248+
249+
let mut second = file;
250+
second.range = Some(FileRange { start: mid, end });
251+
252+
(first, second)
253+
}

0 commit comments

Comments
 (0)