Skip to content

Commit 09288f6

Browse files
committed
add win interruption using tokio::windows
Signed-off-by: Philippe Llerena <[email protected]>
1 parent 19ff372 commit 09288f6

File tree

4 files changed

+100
-31
lines changed

4 files changed

+100
-31
lines changed

Cargo.lock

+19-18
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/spfs-cli/cmd-monitor/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,4 @@ spfs-cli-common = { workspace = true }
2828
tokio = { version = "1.20", features = ["rt", "rt-multi-thread"] }
2929
tracing = { workspace = true }
3030
url = "2.2"
31+
futures = "0.3.30"

crates/spfs-cli/cmd-monitor/src/cmd_monitor.rs

+10-13
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,18 @@ use clap::Parser;
88
#[cfg(feature = "sentry")]
99
use cli::configure_sentry;
1010
use miette::{Context, IntoDiagnostic, Result};
11-
use spfs::Error;
1211
use spfs_cli_common as cli;
1312
use spfs_cli_common::CommandName;
1413
use tokio::io::AsyncReadExt;
15-
use tokio::signal::unix::{signal, SignalKind};
1614
use tokio::time::timeout;
1715

16+
mod signal;
17+
#[cfg(unix)]
18+
use signal::unix_signal_handler::UnixSignalHandler as SignalHandlerImpl;
19+
use signal::SignalHandler;
20+
#[cfg(windows)]
21+
use windows_signal_handler::WindowsSignalHandler as SignalHandlerImpl;
22+
1823
fn main() -> Result<()> {
1924
// because this function exits right away it does not
2025
// properly handle destruction of data, so we put the actual
@@ -86,11 +91,10 @@ impl CmdMonitor {
8691
rt.block_on(self.wait_for_ready());
8792
// clean up this runtime and all other threads before detaching
8893
drop(rt);
89-
94+
#[cfg(unix)]
9095
nix::unistd::daemon(self.no_chdir, self.no_close)
9196
.into_diagnostic()
9297
.wrap_err("Failed to daemonize the monitor process")?;
93-
9498
#[cfg(feature = "sentry")]
9599
{
96100
// Initialize sentry after the call to `daemon` so it is safe for
@@ -142,12 +146,7 @@ impl CmdMonitor {
142146
}
143147

144148
pub async fn run_async(&mut self, config: &spfs::Config) -> Result<i32> {
145-
let mut interrupt = signal(SignalKind::interrupt())
146-
.map_err(|err| Error::process_spawn_error("signal()", err, None))?;
147-
let mut quit = signal(SignalKind::quit())
148-
.map_err(|err| Error::process_spawn_error("signal()", err, None))?;
149-
let mut terminate = signal(SignalKind::terminate())
150-
.map_err(|err| Error::process_spawn_error("signal()", err, None))?;
149+
let signal_future = SignalHandlerImpl::build_signal_future();
151150

152151
let repo = spfs::open_repository(&self.runtime_storage).await?;
153152
let storage = spfs::runtime::Storage::new(repo)?;
@@ -165,9 +164,7 @@ impl CmdMonitor {
165164
}
166165
// we explicitly catch any signal related to interruption
167166
// and will act by cleaning up the runtime early
168-
_ = terminate.recv() => Err(spfs::Error::String("Terminate signal received, cleaning up runtime early".to_string())),
169-
_ = interrupt.recv() => Err(spfs::Error::String("Interrupt signal received, cleaning up runtime early".to_string())),
170-
_ = quit.recv() => Err(spfs::Error::String("Quit signal received, cleaning up runtime early".to_string())),
167+
_ = signal_future => Err(spfs::Error::String("Signal received, cleaning up runtime early".to_string())),
171168
};
172169
tracing::trace!("runtime empty of processes ");
173170

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
use std::pin::Pin;
2+
3+
use futures::future::Future;
4+
use spfs::Error;
5+
6+
pub trait SignalHandler {
7+
fn build_signal_future() -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
8+
}
9+
10+
#[cfg(unix)]
11+
pub mod unix_signal_handler {
12+
use tokio::signal::unix::{signal, SignalKind};
13+
14+
use super::*;
15+
16+
pub struct UnixSignalHandler;
17+
18+
impl SignalHandler for UnixSignalHandler {
19+
fn build_signal_future() -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> {
20+
Box::pin(async move {
21+
let mut interrupt = signal(SignalKind::interrupt())
22+
.map_err(|err| Error::process_spawn_error("signal()", err, None))?;
23+
let mut quit = signal(SignalKind::quit())
24+
.map_err(|err| Error::process_spawn_error("signal()", err, None))?;
25+
let mut terminate = signal(SignalKind::terminate())
26+
.map_err(|err| Error::process_spawn_error("signal()", err, None))?;
27+
28+
futures::future::select_all(vec![
29+
Box::pin(interrupt.recv()),
30+
Box::pin(quit.recv()),
31+
Box::pin(terminate.recv()),
32+
])
33+
.await;
34+
35+
Ok(())
36+
})
37+
}
38+
}
39+
}
40+
41+
#[cfg(windows)]
42+
pub mod windows_signal_handler {
43+
use tokio::signal::ctrl_c;
44+
45+
use super::*;
46+
47+
pub struct WindowsSignalHandler;
48+
49+
impl SignalHandler for WindowsSignalHandler {
50+
fn build_signal_future() -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> {
51+
Box::pin(async move {
52+
let mut interrupt =
53+
ctrl_c().map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?;
54+
let mut quit =
55+
ctrl_c().map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?;
56+
let mut terminate =
57+
ctrl_c().map_err(|err| Error::process_spawn_error("ctrl_c()", err, None))?;
58+
59+
futures::future::select_all(vec![
60+
Box::pin(interrupt),
61+
Box::pin(quit),
62+
Box::pin(terminate),
63+
])
64+
.await;
65+
66+
Ok(())
67+
})
68+
}
69+
}
70+
}

0 commit comments

Comments
 (0)