Skip to content

Commit 4496d46

Browse files
jordanhunt22Convex, Inc.
authored and
Convex, Inc.
committed
[Fix] Shutdown + join futures on retention_manager shutdown (#24515)
It turns out that if we don't join a handle after shutdown, it can still perform work after the worker that created it has been dropped. So, I refactored the retention manager code to join all of its handles on shutdown. Its slightly concerning that we use this same pattern of calling `.shutdown()` in other places without joining the future because we can't guarantee that the handle stops doing work before the creator goes away. Should we try to get rid of this pattern entirely? Intuitively speaking it would seem that calling `.shutdown()` would terminate a thread synchronously, but this is not actually what happens. GitOrigin-RevId: db38a2e748ebd958f6be2cde8258f7ae2f1ce00e
1 parent 7318d3c commit 4496d46

File tree

3 files changed

+21
-18
lines changed

3 files changed

+21
-18
lines changed

crates/application/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2531,7 +2531,7 @@ impl<RT: Runtime> Application<RT> {
25312531
self.actions_isolate.shutdown().await?;
25322532
self.database_isolate.shutdown().await?;
25332533
self.module_cache.shutdown();
2534-
self.database.shutdown();
2534+
self.database.shutdown().await?;
25352535
tracing::info!("Application shut down");
25362536
Ok(())
25372537
}

crates/database/src/database.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -912,11 +912,12 @@ impl<RT: Runtime> Database<RT> {
912912
)
913913
}
914914

915-
pub fn shutdown(&self) {
915+
pub async fn shutdown(&self) -> anyhow::Result<()> {
916916
self.committer.shutdown();
917917
self.subscriptions.shutdown();
918-
self.retention_manager.shutdown();
918+
self.retention_manager.shutdown().await?;
919919
tracing::info!("Database shutdown");
920+
Ok(())
920921
}
921922

922923
pub fn retention_validator(&self) -> Arc<dyn RetentionValidator> {

crates/database/src/retention.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ use common::{
6868
query::Order,
6969
runtime::{
7070
new_rate_limiter,
71+
shutdown_and_join,
7172
Runtime,
7273
RuntimeInstant,
73-
SpawnHandle,
7474
},
7575
sha256::Sha256,
7676
sync::split_rw_lock::{
@@ -172,25 +172,21 @@ impl Checkpoint {
172172
pub struct LeaderRetentionManager<RT: Runtime> {
173173
rt: RT,
174174
bounds_reader: Reader<SnapshotBounds>,
175-
advance_min_snapshot_handle: Arc<Mutex<RT::Handle>>,
176-
deletion_handle: Arc<Mutex<RT::Handle>>,
177-
document_deletion_handle: Arc<Mutex<RT::Handle>>,
178175
index_table_id: TableIdAndTableNumber,
179176
checkpoint_reader: Reader<Checkpoint>,
180177
document_checkpoint_reader: Reader<Checkpoint>,
178+
handles: Arc<Mutex<Vec<RT::Handle>>>,
181179
}
182180

183181
impl<RT: Runtime> Clone for LeaderRetentionManager<RT> {
184182
fn clone(&self) -> Self {
185183
Self {
186184
rt: self.rt.clone(),
187185
bounds_reader: self.bounds_reader.clone(),
188-
advance_min_snapshot_handle: self.advance_min_snapshot_handle.clone(),
189-
deletion_handle: self.deletion_handle.clone(),
190-
document_deletion_handle: self.deletion_handle.clone(),
191186
index_table_id: self.index_table_id,
192187
checkpoint_reader: self.checkpoint_reader.clone(),
193188
document_checkpoint_reader: self.document_checkpoint_reader.clone(),
189+
handles: self.handles.clone(),
194190
}
195191
}
196192
}
@@ -342,19 +338,25 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
342338
Ok(Self {
343339
rt,
344340
bounds_reader,
345-
advance_min_snapshot_handle: Arc::new(Mutex::new(advance_min_snapshot_handle)),
346-
deletion_handle: Arc::new(Mutex::new(deletion_handle)),
347-
document_deletion_handle: Arc::new(Mutex::new(document_deletion_handle)),
348341
index_table_id,
349342
checkpoint_reader,
350343
document_checkpoint_reader,
344+
handles: Arc::new(Mutex::new(vec![
345+
// Order matters because we need to shutdown the threads that have
346+
// receivers before the senders
347+
deletion_handle,
348+
document_deletion_handle,
349+
advance_min_snapshot_handle,
350+
])),
351351
})
352352
}
353353

354-
pub fn shutdown(&self) {
355-
self.deletion_handle.lock().shutdown();
356-
self.document_deletion_handle.lock().shutdown();
357-
self.advance_min_snapshot_handle.lock().shutdown();
354+
pub async fn shutdown(&self) -> anyhow::Result<()> {
355+
let handles: Vec<_> = self.handles.lock().drain(..).collect();
356+
for handle in handles.into_iter() {
357+
shutdown_and_join(handle).await?;
358+
}
359+
Ok(())
358360
}
359361

360362
/// Returns the timestamp which we would like to use as min_snapshot_ts.
@@ -1143,7 +1145,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
11431145
if !is_working {
11441146
min_document_snapshot_ts = match min_document_snapshot_rx.changed().await {
11451147
Err(err) => {
1146-
tracing::warn!("Failed to receive document snapshot: {}", err);
1148+
report_error(&mut err.into());
11471149
// Fall back to polling if the channel is closed or falls over. This should
11481150
// really never happen.
11491151
Self::wait_with_jitter(&rt, *MAX_RETENTION_DELAY_SECONDS).await;

0 commit comments

Comments
 (0)