Skip to content

feat: add multi level merge for sorting #15608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

rluvaton
Copy link
Contributor

@rluvaton rluvaton commented Apr 6, 2025

Which issue does this PR close?

Rationale for this change

To be able to sort any amount spill files without getting over the tokio blocking thread limit

What changes are included in this PR?

Currently the PR is in draft as I want your opinions about the design.

  1. Created a new Stream that use underneath the SortPreservingMergeStream
  2. Change StreamingMergeBuilder to use the new stream when having spill files and spill manager

Are these changes tested?

Existing tests

Are there any user-facing changes?

Yes, kinda

Comment on lines +94 to +95
// TODO - add a check to see the actual number of available blocking threads
let max_blocking_threads = max_blocking_threads.unwrap_or(128);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently tokio don't expose the value of maximum number of blocking threads, and you have no way to know if you reached the limit as spawn_blocking doesn't return an indication that the current pool is full.

Comment on lines +126 to +132
// Only in-memory stream
(0, 1) => Ok(sorted_streams.into_iter().next().unwrap()),

// Only single sorted spill file so stream it
(1, 0) => self
.spill_manager
.read_spill_as_stream(self.sorted_spill_files.drain(..).next().unwrap()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add metrics in these cases

Comment on lines +145 to +161
pub fn with_spill_manager(mut self, spill_manager: SpillManager) -> Self {
self.spill_manager = Some(spill_manager);
self
}

pub fn with_sorted_spill_files(
mut self,
sorted_spill_files: Vec<RefCountedTempFile>,
) -> Self {
self.sorted_spill_files = sorted_spill_files;
self
}

pub fn with_max_blocking_threads(mut self, max_blocking_threads: usize) -> Self {
self.max_blocking_threads = Some(max_blocking_threads);
self
}
Copy link
Contributor Author

@rluvaton rluvaton Apr 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a better API than this, this is confusing as they are dependent

Comment on lines +179 to +192
return Ok(Box::pin(MultiLevelSortPreservingMergeStream::new(
spill_manager.unwrap(),
schema.clone().unwrap(),
sorted_spill_files,
streams,
expressions.clone(),
metrics.clone().unwrap(),
batch_size.unwrap(),
reservation.unwrap(),
max_blocking_threads,
fetch,
enable_round_robin_tie_breaker,
)?));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove all the unwrap. first want to know what do you think about the solution

@2010YOUY01
Copy link
Contributor

I think this is a similar problem as #14692, will check this out soon

@rluvaton
Copy link
Contributor Author

rluvaton commented Apr 9, 2025

I saw the pr from @ashdnazg and it is a better solution in my opinion as it does not need to know the number of available tokio tasks, and just make the read spill work

and although multi level merge sort should be implemented, @2010YOUY01 implementation (#15610) with adaption for row_hash in AggregateExec to use it might be the better way, so closing this

row_hash code to update:

fn update_merged_stream(&mut self) -> Result<()> {
let Some(batch) = self.emit(EmitTo::All, true)? else {
return Ok(());
};
// clear up memory for streaming_merge
self.clear_all();
self.update_memory_reservation()?;
let mut streams: Vec<SendableRecordBatchStream> = vec![];
let expr = self.spill_state.spill_expr.clone();
let schema = batch.schema();
streams.push(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&schema),
futures::stream::once(futures::future::lazy(move |_| {
sort_batch(&batch, expr.as_ref(), None)
})),
)));
for spill in self.spill_state.spills.drain(..) {
let stream = self.spill_state.spill_manager.read_spill_as_stream(spill)?;
streams.push(stream);
}
self.spill_state.is_stream_merging = true;
self.input = StreamingMergeBuilder::new()
.with_streams(streams)
.with_schema(schema)
.with_expressions(self.spill_state.spill_expr.as_ref())
.with_metrics(self.baseline_metrics.clone())
.with_batch_size(self.batch_size)
.with_reservation(self.reservation.new_empty())
.build()?;
self.input_done = false;
self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());
Ok(())
}

@rluvaton rluvaton closed this Apr 9, 2025
@rluvaton rluvaton deleted the add-spill-to-sort-preserving-merge-stream branch April 9, 2025 10:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reduce number of tokio blocking threads in SortExec spill
2 participants