Skip to content

Commit cb1b053

Browse files
authored
Use spawn_blocking for the fallback commit in identity_dis/connected (#2730)
1 parent 6eef698 commit cb1b053

File tree

6 files changed

+70
-66
lines changed

6 files changed

+70
-66
lines changed

crates/core/src/client/client_connection.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::error::DBError;
99
use crate::host::module_host::ClientConnectedError;
1010
use crate::host::{ModuleHost, NoSuchModule, ReducerArgs, ReducerCallError, ReducerCallResult};
1111
use crate::messages::websocket::Subscribe;
12+
use crate::util::asyncify;
1213
use crate::util::prometheus_handle::IntGaugeExt;
1314
use crate::worker_metrics::WORKER_METRICS;
1415
use bytes::Bytes;
@@ -302,24 +303,22 @@ impl ClientConnection {
302303

303304
pub async fn subscribe_single(&self, subscription: SubscribeSingle, timer: Instant) -> Result<(), DBError> {
304305
let me = self.clone();
305-
tokio::task::spawn_blocking(move || {
306+
asyncify(move || {
306307
me.module
307308
.subscriptions()
308309
.add_single_subscription(me.sender, subscription, timer, None)
309310
})
310311
.await
311-
.unwrap() // TODO: is unwrapping right here?
312312
}
313313

314314
pub async fn unsubscribe(&self, request: Unsubscribe, timer: Instant) -> Result<(), DBError> {
315315
let me = self.clone();
316-
tokio::task::spawn_blocking(move || {
316+
asyncify(move || {
317317
me.module
318318
.subscriptions()
319319
.remove_single_subscription(me.sender, request, timer)
320320
})
321321
.await
322-
.unwrap() // TODO: is unwrapping right here?
323322
}
324323

325324
pub async fn subscribe_multi(
@@ -328,13 +327,12 @@ impl ClientConnection {
328327
timer: Instant,
329328
) -> Result<Option<ExecutionMetrics>, DBError> {
330329
let me = self.clone();
331-
tokio::task::spawn_blocking(move || {
330+
asyncify(move || {
332331
me.module
333332
.subscriptions()
334333
.add_multi_subscription(me.sender, request, timer, None)
335334
})
336335
.await
337-
.unwrap() // TODO: is unwrapping right here?
338336
}
339337

340338
pub async fn unsubscribe_multi(
@@ -343,24 +341,22 @@ impl ClientConnection {
343341
timer: Instant,
344342
) -> Result<Option<ExecutionMetrics>, DBError> {
345343
let me = self.clone();
346-
tokio::task::spawn_blocking(move || {
344+
asyncify(move || {
347345
me.module
348346
.subscriptions()
349347
.remove_multi_subscription(me.sender, request, timer)
350348
})
351349
.await
352-
.unwrap() // TODO: is unwrapping right here?
353350
}
354351

355352
pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<ExecutionMetrics, DBError> {
356353
let me = self.clone();
357-
tokio::task::spawn_blocking(move || {
354+
asyncify(move || {
358355
me.module
359356
.subscriptions()
360357
.add_legacy_subscriber(me.sender, subscription, timer, None)
361358
})
362359
.await
363-
.unwrap()
364360
}
365361

366362
pub fn one_off_query_json(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> {

crates/core/src/database_logger.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ use tokio::sync::broadcast;
66

77
use spacetimedb_paths::server::{ModuleLogPath, ModuleLogsDir};
88

9+
use crate::util::asyncify;
10+
911
pub struct DatabaseLogger {
1012
inner: Mutex<DatabaseLoggerInner>,
1113
pub tx: broadcast::Sender<bytes::Bytes>,
@@ -176,22 +178,20 @@ impl DatabaseLogger {
176178
}
177179
// if there's none for today, read the directory and
178180
let logs_dir = path.popped();
179-
return tokio::task::spawn_blocking(move || match logs_dir.most_recent()? {
181+
return asyncify(move || match logs_dir.most_recent()? {
180182
Some(newest_log_file) => std::fs::read_to_string(newest_log_file),
181183
None => Ok(String::new()),
182184
})
183185
.await
184-
.unwrap()
185186
.expect("couldn't read log file");
186187
};
187188

188189
if num_lines == 0 {
189190
return String::new();
190191
}
191192

192-
tokio::task::spawn_blocking(move || read_latest_lines(logs_dir, num_lines))
193+
asyncify(move || read_latest_lines(logs_dir, num_lines))
193194
.await
194-
.unwrap()
195195
.expect("couldn't read log file")
196196
}
197197

crates/core/src/db/relational_db.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::db::datastore::system_tables::{StModuleRow, WASM_MODULE};
1919
use crate::error::{DBError, DatabaseError, TableError};
2020
use crate::execution_context::{ReducerContext, Workload};
2121
use crate::messages::control_db::HostType;
22-
use crate::util::spawn_rayon;
22+
use crate::util::{asyncify, spawn_rayon};
2323
use anyhow::{anyhow, Context};
2424
use fs2::FileExt;
2525
use futures::channel::mpsc;
@@ -157,15 +157,14 @@ impl SnapshotWorkerActor {
157157
let start_time = std::time::Instant::now();
158158
let committed_state = self.committed_state.clone();
159159
let snapshot_repo = self.repo.clone();
160-
let res = tokio::task::spawn_blocking(move || {
160+
let res = asyncify(move || {
161161
Locking::take_snapshot_internal(&committed_state, &snapshot_repo).inspect(|opts| {
162162
if let Some(opts) = opts {
163163
Locking::compress_older_snapshot_internal(&snapshot_repo, opts.0);
164164
}
165165
})
166166
})
167-
.await
168-
.unwrap();
167+
.await;
169168
match res {
170169
Err(e) => {
171170
log::error!(

crates/core/src/host/host_controller.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::module_host_context::ModuleCreationContext;
1313
use crate::replica_context::ReplicaContext;
1414
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
1515
use crate::subscription::module_subscription_manager::SubscriptionManager;
16-
use crate::util::spawn_rayon;
16+
use crate::util::{asyncify, spawn_rayon};
1717
use anyhow::{anyhow, ensure, Context};
1818
use async_trait::async_trait;
1919
use durability::{Durability, EmptyHistory};
@@ -282,13 +282,11 @@ impl HostController {
282282
trace!("using database {}/{}", database.database_identity, replica_id);
283283
let module = self.get_or_launch_module_host(database, replica_id).await?;
284284
let on_panic = self.unregister_fn(replica_id);
285-
let result = tokio::task::spawn_blocking(move || f(&module.replica_ctx().relational_db))
286-
.await
287-
.unwrap_or_else(|e| {
288-
warn!("database operation panicked");
289-
on_panic();
290-
std::panic::resume_unwind(e.into_panic())
291-
});
285+
scopeguard::defer_on_unwind!({
286+
warn!("database operation panicked");
287+
on_panic();
288+
});
289+
let result = asyncify(move || f(&module.replica_ctx().relational_db)).await;
292290
Ok(result)
293291
}
294292

@@ -539,11 +537,7 @@ async fn make_replica_ctx(
539537
let Some(subscriptions) = downgraded.upgrade() else {
540538
break;
541539
};
542-
tokio::task::spawn_blocking(move || {
543-
subscriptions.write().remove_dropped_clients();
544-
})
545-
.await
546-
.unwrap();
540+
asyncify(move || subscriptions.write().remove_dropped_clients()).await
547541
}
548542
});
549543

crates/core/src/host/module_host.rs

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::sql::ast::SchemaViewer;
1515
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
1616
use crate::subscription::tx::DeltaTx;
1717
use crate::subscription::{execute_plan, record_exec_metrics};
18+
use crate::util::asyncify;
1819
use crate::util::lending_pool::{LendingPool, LentResource, PoolClosed};
1920
use crate::vm::check_row_limit;
2021
use crate::worker_metrics::WORKER_METRICS;
@@ -535,10 +536,7 @@ impl ModuleHost {
535536
pub async fn disconnect_client(&self, client_id: ClientActorId) {
536537
log::trace!("disconnecting client {}", client_id);
537538
let this = self.clone();
538-
let _ = tokio::task::spawn_blocking(move || {
539-
this.subscriptions().remove_subscriber(client_id);
540-
})
541-
.await;
539+
asyncify(move || this.subscriptions().remove_subscriber(client_id)).await;
542540
// ignore NoSuchModule; if the module's already closed, that's fine
543541
if let Err(e) = self
544542
.call_identity_disconnected(client_id.identity, client_id.connection_id)
@@ -622,18 +620,18 @@ impl ModuleHost {
622620
timestamp: Timestamp::now(),
623621
arg_bsatn: Bytes::new(),
624622
});
625-
self.inner
626-
.replica_ctx()
627-
.relational_db
628-
.with_auto_commit(workload, |mut_tx| {
623+
624+
let stdb = self.inner.replica_ctx().relational_db.clone();
625+
asyncify(move || {
626+
stdb.with_auto_commit(workload, |mut_tx| {
629627
mut_tx.insert_st_client(caller_identity, caller_connection_id)
630628
})
631-
.inspect_err(|e| {
632-
log::error!(
633-
"`call_identity_connected`: fallback transaction to insert into `st_client` failed: {e:#?}"
634-
);
635-
})
636-
.map_err(Into::into)
629+
})
630+
.await
631+
.inspect_err(|e| {
632+
log::error!("`call_identity_connected`: fallback transaction to insert into `st_client` failed: {e:#?}")
633+
})
634+
.map_err(Into::into)
637635
}
638636
}
639637

@@ -660,7 +658,7 @@ impl ModuleHost {
660658
let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
661659

662660
// A fallback transaction that deletes the client from `st_client`.
663-
let fallback = || {
661+
let fallback = || async {
664662
let reducer_name = reducer_lookup
665663
.as_ref()
666664
.map(|(_, def)| &*def.name)
@@ -673,24 +671,24 @@ impl ModuleHost {
673671
timestamp: Timestamp::now(),
674672
arg_bsatn: Bytes::new(),
675673
});
676-
self.inner
677-
.replica_ctx()
678-
.relational_db
679-
.with_auto_commit(workload, |mut_tx| {
680-
mut_tx.delete_st_client(caller_identity, caller_connection_id, self.info.database_identity)
681-
})
682-
.inspect_err(|e| {
683-
log::error!(
684-
"`call_identity_disconnected`: fallback transaction to delete from `st_client` failed: {e}"
685-
);
686-
})
687-
.map_err(|err| {
688-
InvalidReducerArguments {
689-
err: err.into(),
690-
reducer: reducer_name.into(),
691-
}
692-
.into()
674+
let stdb = self.inner.replica_ctx().relational_db.clone();
675+
let database_identity = self.info.database_identity;
676+
asyncify(move || {
677+
stdb.with_auto_commit(workload, |mut_tx| {
678+
mut_tx.delete_st_client(caller_identity, caller_connection_id, database_identity)
693679
})
680+
})
681+
.await
682+
.map_err(|err| {
683+
log::error!(
684+
"`call_identity_disconnected`: fallback transaction to delete from `st_client` failed: {err}"
685+
);
686+
InvalidReducerArguments {
687+
err: err.into(),
688+
reducer: reducer_name.into(),
689+
}
690+
.into()
691+
})
694692
};
695693

696694
if let Some((reducer_id, reducer_def)) = reducer_lookup {
@@ -720,12 +718,12 @@ impl ModuleHost {
720718
match result {
721719
Err(e) => {
722720
log::error!("call_reducer_inner of client_disconnected failed: {e:#?}");
723-
fallback()
721+
fallback().await
724722
}
725723
Ok(ReducerCallResult {
726724
outcome: ReducerOutcome::Failed(_) | ReducerOutcome::BudgetExceeded,
727725
..
728-
}) => fallback(),
726+
}) => fallback().await,
729727

730728
// If it succeeded, as mentioend above, `st_client` is already updated.
731729
Ok(ReducerCallResult {
@@ -736,7 +734,7 @@ impl ModuleHost {
736734
} else {
737735
// The module doesn't define a `client_disconnected` reducer.
738736
// Commit a transaction to update `st_clients`.
739-
fallback()
737+
fallback().await
740738
}
741739
}
742740

crates/core/src/util/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,23 @@ pub fn spawn_rayon<R: Send + 'static>(f: impl FnOnce() -> R + Send + 'static) ->
3232
rx.map(|res| res.unwrap().unwrap_or_else(|err| std::panic::resume_unwind(err)))
3333
}
3434

35+
/// Ergonomic wrapper for `tokio::task::spawn_blocking(f).await`.
36+
///
37+
/// If `f` panics, it will be bubbled up to the calling task.
38+
pub async fn asyncify<F, R>(f: F) -> R
39+
where
40+
F: FnOnce() -> R + Send + 'static,
41+
R: Send + 'static,
42+
{
43+
tokio::task::spawn_blocking(f)
44+
.await
45+
.unwrap_or_else(|e| match e.try_into_panic() {
46+
Ok(panic_payload) => std::panic::resume_unwind(panic_payload),
47+
// the only other variant is cancelled, which shouldn't happen because we don't cancel it.
48+
Err(e) => panic!("Unexpected JoinError: {e}"),
49+
})
50+
}
51+
3552
/// Await `fut`, while also polling `also`.
3653
pub async fn also_poll<Fut: Future>(fut: Fut, also: impl Future<Output = ()>) -> Fut::Output {
3754
let mut also = pin!(also.fuse());

0 commit comments

Comments
 (0)