Skip to content

Commit d680098

Browse files
goffrieConvex, Inc.
authored and
Convex, Inc.
committed
Fix shutdown race on instance startup (#36382)
GitOrigin-RevId: 278d6249ee14b8cd73eaaa9a701b4592f567a59d
1 parent 8f9f55e commit d680098

File tree

4 files changed

+41
-52
lines changed

4 files changed

+41
-52
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/common/src/shutdown.rs

Lines changed: 32 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,71 +1,63 @@
11
use std::sync::Arc;
22

33
use parking_lot::Mutex;
4-
use tokio::sync::mpsc;
4+
use tokio::sync::oneshot;
55

66
use crate::errors::report_error_sync;
77

8-
// Used by the database to signal it has encountered a fatal error.
8+
/// Used by the database to signal it has encountered a fatal error.
99
#[derive(Clone)]
1010
pub struct ShutdownSignal {
11-
shutdown_tx: Option<Arc<Mutex<Option<mpsc::UnboundedSender<ShutdownMessage>>>>>,
12-
instance_name: String,
13-
generation_id: u64,
11+
mode: Mode,
1412
}
1513

16-
#[derive(Debug)]
17-
pub struct ShutdownMessage {
18-
pub error: anyhow::Error,
19-
pub instance_name: String,
20-
pub generation_id: u64,
14+
/// Indicates what to do when an error is reported.
15+
#[derive(Clone)]
16+
enum Mode {
17+
Panic,
18+
/// If the `Option` inside the mutex is `Some`, the next fatal error will be
19+
/// sent to that sender. Otherwise, signalling will do nothing (under the
20+
/// presumption that an earlier error was reported and removed the sender).
21+
Notify(Arc<Mutex<Option<oneshot::Sender<anyhow::Error>>>>),
2122
}
2223

2324
impl ShutdownSignal {
24-
pub fn new(
25-
shutdown_tx: mpsc::UnboundedSender<ShutdownMessage>,
26-
instance_name: String,
27-
generation_id: u64,
28-
) -> Self {
25+
/// Creates a new ShutdownSignal that sends the first encountered error to
26+
/// the provided oneshot sender.
27+
pub fn new(shutdown_tx: oneshot::Sender<anyhow::Error>) -> Self {
2928
Self {
30-
shutdown_tx: Some(Arc::new(Mutex::new(Some(shutdown_tx)))),
31-
instance_name,
32-
generation_id,
29+
mode: Mode::Notify(Arc::new(Mutex::new(Some(shutdown_tx)))),
3330
}
3431
}
3532

33+
/// Signals that an instance has encountered a fatal error and needs to be
34+
/// shut down.
3635
pub fn signal(&self, mut fatal_error: anyhow::Error) {
3736
report_error_sync(&mut fatal_error);
38-
if let Some(ref shutdown_tx_mutex) = self.shutdown_tx {
39-
let Some(shutdown_tx) = shutdown_tx_mutex.lock().take() else {
40-
// A shutdown message has already been sent for this instance. Do nothing.
41-
return;
42-
};
43-
_ = shutdown_tx.send(ShutdownMessage {
44-
error: fatal_error,
45-
instance_name: self.instance_name.clone(),
46-
generation_id: self.generation_id,
47-
});
48-
} else {
49-
// We don't anyone to shutdown signal configured. Just panic.
50-
panic!("Shutting down due to fatal error: {}", fatal_error);
37+
match &self.mode {
38+
Mode::Notify(shutdown_tx_mutex) => {
39+
let Some(shutdown_tx) = shutdown_tx_mutex.lock().take() else {
40+
// A shutdown message has already been sent for this instance. Do nothing.
41+
return;
42+
};
43+
_ = shutdown_tx.send(fatal_error);
44+
},
45+
Mode::Panic => {
46+
// We don't have the shutdown signal configured. Just panic.
47+
panic!("Shutting down due to fatal error: {}", fatal_error);
48+
},
5149
}
5250
}
5351

54-
// Creates a new ShutdownSignal that panics when signaled.
52+
/// Creates a new ShutdownSignal that panics when signaled.
5553
pub fn panic() -> Self {
56-
Self {
57-
shutdown_tx: None,
58-
instance_name: "".to_owned(),
59-
generation_id: 0,
60-
}
54+
Self { mode: Mode::Panic }
6155
}
6256

6357
#[cfg(any(test, feature = "testing"))]
6458
pub fn no_op() -> Self {
6559
Self {
66-
shutdown_tx: Some(Arc::new(Mutex::new(None))),
67-
instance_name: "".to_owned(),
68-
generation_id: 0,
60+
mode: Mode::Notify(Arc::new(Mutex::new(None))),
6961
}
7062
}
7163
}

crates/convex/Cargo.oss.lock

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1617,9 +1617,9 @@ dependencies = [
16171617

16181618
[[package]]
16191619
name = "tokio-util"
1620-
version = "0.7.13"
1620+
version = "0.7.14"
16211621
source = "registry+https://github.com/rust-lang/crates.io-index"
1622-
checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078"
1622+
checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034"
16231623
dependencies = [
16241624
"bytes",
16251625
"futures-core",

crates/local_backend/src/main.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use tokio::{
3232
signal::{
3333
self,
3434
},
35-
sync::mpsc,
35+
sync::oneshot,
3636
};
3737

3838
fn main() -> Result<(), MainError> {
@@ -99,8 +99,8 @@ async fn run_server(runtime: ProdRuntime, config: LocalConfig) -> anyhow::Result
9999

100100
async fn run_server_inner(runtime: ProdRuntime, config: LocalConfig) -> anyhow::Result<()> {
101101
// Used to receive fatal errors from the database or /preempt endpoint.
102-
let (preempt_tx, mut preempt_rx) = mpsc::unbounded_channel();
103-
let preempt_signal = ShutdownSignal::new(preempt_tx.clone(), config.name(), 0);
102+
let (preempt_tx, preempt_rx) = oneshot::channel();
103+
let preempt_signal = ShutdownSignal::new(preempt_tx);
104104
// Use to signal to the http service to stop.
105105
let (shutdown_tx, shutdown_rx) = async_broadcast::broadcast(1);
106106
let persistence = connect_persistence(
@@ -142,9 +142,6 @@ async fn run_server_inner(runtime: ProdRuntime, config: LocalConfig) -> anyhow::
142142
let serve_future = future::try_join(serve_http_future, proxy_future).fuse();
143143
futures::pin_mut!(serve_future);
144144

145-
let preempt_future = async move { preempt_rx.recv().await }.fuse();
146-
futures::pin_mut!(preempt_future);
147-
148145
// Start shutdown when we get a manual shutdown signal or with the first
149146
// ctrl-c.
150147
let mut force_exit_duration = None;
@@ -153,7 +150,7 @@ async fn run_server_inner(runtime: ProdRuntime, config: LocalConfig) -> anyhow::
153150
r?;
154151
panic!("Serve future stopped unexpectedly!")
155152
},
156-
_err = preempt_future => {
153+
_err = preempt_rx.fuse() => {
157154
// If we fail with a fatal error, we want to exit immediately.
158155
tracing::info!("Received a fatal error. Shutting down immediately");
159156
force_exit_duration = Some(Duration::from_secs(0));

0 commit comments

Comments
 (0)