From 4db8b453add4b9493c9d969dfffd912c0b8abb35 Mon Sep 17 00:00:00 2001 From: Miles Johnson Date: Fri, 17 Jan 2025 10:24:47 -0800 Subject: [PATCH] Try and test things. --- .moon/workspace.yml | 1 + Cargo.lock | 1 + crates/action-pipeline/src/action_pipeline.rs | 59 ++++++------ crates/action-pipeline/src/job.rs | 84 +++++++++------- crates/process/Cargo.toml | 1 + crates/process/src/exec_command.rs | 17 ++-- crates/process/src/process_registry.rs | 27 +++--- crates/process/src/shared_child.rs | 95 ++++++++++--------- crates/process/src/shell.rs | 4 +- package.json | 1 + scenarios/signals/moon.yml | 18 ++++ scenarios/signals/signals.mjs | 15 +++ tsconfig.json | 3 + 13 files changed, 199 insertions(+), 127 deletions(-) create mode 100644 scenarios/signals/moon.yml create mode 100644 scenarios/signals/signals.mjs diff --git a/.moon/workspace.yml b/.moon/workspace.yml index a463d2809ea..080f2424688 100644 --- a/.moon/workspace.yml +++ b/.moon/workspace.yml @@ -10,6 +10,7 @@ projects: - './packages/*' - '!packages/cli' - '!packages/core-*' + - 'scenarios/*' - 'website' generator: diff --git a/Cargo.lock b/Cargo.lock index 65fc9b1cf97..fb59fda35e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5310,6 +5310,7 @@ dependencies = [ "indexmap 2.7.0", "nix 0.29.0", "tokio", + "tracing", "windows 0.59.0", ] diff --git a/crates/action-pipeline/src/action_pipeline.rs b/crates/action-pipeline/src/action_pipeline.rs index 42a96e20f30..98281c59e53 100644 --- a/crates/action-pipeline/src/action_pipeline.rs +++ b/crates/action-pipeline/src/action_pipeline.rs @@ -163,7 +163,7 @@ impl ActionPipeline { let signal_handle = self.monitor_signals(cancel_token.clone()); // Dispatch jobs from the graph to run actions - let queue_handle = self.dispatch_jobs(action_graph, job_context)?; + let queue_handle = self.dispatch_jobs(action_graph, job_context.clone())?; // Wait and receive all results coming through debug!("Waiting for jobs to return results"); @@ -198,17 +198,20 @@ impl ActionPipeline { drop(receiver); - // Wait for the queue to shutdown all running tasks - queue_handle.await.into_diagnostic()?; - process_registry.wait_for_running_to_shutdown().await; - - // Force abort the signal handler + // Capture and handle any signals if cancel_token.is_cancelled() { self.status = signal_handle.await.into_diagnostic()?; } else { signal_handle.abort(); } + // Wait for the queue to shutdown all running tasks + process_registry.wait_for_running_to_shutdown().await; + + let mut _job_handles = queue_handle.await.into_diagnostic()?; + + // exhaust_job_handles(&mut job_handles, &job_context).await; + self.actions = actions; self.duration = Some(start.elapsed()); @@ -224,7 +227,7 @@ impl ActionPipeline { &self, action_graph: ActionGraph, job_context: JobContext, - ) -> miette::Result> { + ) -> miette::Result>> { let node_indices = action_graph.sort_topological()?; let app_context = Arc::clone(&self.app_context); let action_context = Arc::clone(&self.action_context); @@ -244,11 +247,9 @@ impl ActionPipeline { // If the pipeline was aborted or cancelled (signal), // loop through and abort all currently running handles if job_context.is_aborted_or_cancelled() { - exhaust_job_handles(&mut job_handles, &job_context).await; - // Return instead of break, so that we avoid // running persistent tasks below - return; + return job_handles; } // If none is returned, then we are waiting on other currently running @@ -301,18 +302,18 @@ impl ActionPipeline { if node.is_interactive() && exhaust_job_handles(&mut job_handles, &job_context).await { - return; + return job_handles; } } // Ensure all non-persistent actions have finished if exhaust_job_handles(&mut job_handles, &job_context).await { - return; + return job_handles; } // Then run all persistent actions in parallel if persistent_indices.is_empty() { - return; + return job_handles; } debug!( @@ -346,24 +347,26 @@ impl ActionPipeline { )); }); - // Since these tasks are persistent and never complete, - // we need to continually check if they've been aborted or - // cancelled, otherwise we will end up with zombie processes - loop { - sleep(Duration::from_millis(50)).await; + // // Since these tasks are persistent and never complete, + // // we need to continually check if they've been aborted or + // // cancelled, otherwise we will end up with zombie processes + // loop { + // sleep(Duration::from_millis(50)).await; - // No tasks running, so don't hang forever - if job_context.result_sender.is_closed() { - break; - } + // // No tasks running, so don't hang forever + // if job_context.result_sender.is_closed() { + // break; + // } - if job_context.is_aborted_or_cancelled() { - debug!("Shutting down {} persistent jobs", job_handles.len()); + // if job_context.is_aborted_or_cancelled() { + // debug!("Shutting down {} persistent jobs", job_handles.len()); - exhaust_job_handles(&mut job_handles, &job_context).await; - break; - } - } + // exhaust_job_handles(&mut job_handles, &job_context).await; + // break; + // } + // } + + job_handles })) } diff --git a/crates/action-pipeline/src/job.rs b/crates/action-pipeline/src/job.rs index 20536dd7e14..9deda7ac399 100644 --- a/crates/action-pipeline/src/job.rs +++ b/crates/action-pipeline/src/job.rs @@ -22,40 +22,56 @@ impl Job { let mut action = Action::new(self.node); action.node_index = self.node_index; - tokio::select! { - // Run conditions in order! - biased; - - // Abort if a sibling job has failed - _ = self.context.abort_token.cancelled() => { - trace!( - index = self.node_index, - "Job aborted", - ); - - action.finish(ActionStatus::Aborted); - } - - // Cancel if we receive a shutdown signal - _ = self.context.cancel_token.cancelled() => { - trace!( - index = self.node_index, - "Job cancelled (via signal)", - ); - - action.finish(ActionStatus::Skipped); - } - - // Or run the job to completion - _ = run_action( - &mut action, - self.action_context, - self.app_context, - self.context.workspace_graph.clone(), - self.context.toolchain_registry.clone(), - self.context.emitter.clone(), - ) => {}, - }; + // Don't use `select!` here because if the abort or cancel tokens + // are triggered, then the async task running the task child process + // is cancelled, immediately terminating the process, and ignoring + // any signals we attempt to pass down! + + run_action( + &mut action, + self.action_context, + self.app_context, + self.context.workspace_graph.clone(), + self.context.toolchain_registry.clone(), + self.context.emitter.clone(), + ) + .await + .unwrap(); + + // tokio::select! { + // // Run conditions in order! + // biased; + + // // Abort if a sibling job has failed + // _ = self.context.abort_token.cancelled() => { + // trace!( + // index = self.node_index, + // "Job aborted", + // ); + + // action.finish(ActionStatus::Aborted); + // } + + // // Cancel if we receive a shutdown signal + // _ = self.context.cancel_token.cancelled() => { + // trace!( + // index = self.node_index, + // "Job cancelled (via signal)", + // ); + + // action.finish(ActionStatus::Skipped); + // } + + // // Or run the job to completion + // _ = run_action( + // &mut action, + // self.action_context, + // self.app_context, + // self.context.workspace_graph.clone(), + // self.context.toolchain_registry.clone(), + // self.context.emitter.clone(), + // ) => {}, + // }; // Send the result back to the pipeline self.context.send_result(action).await; diff --git a/crates/process/Cargo.toml b/crates/process/Cargo.toml index 417537a174b..bccd2f972fb 100644 --- a/crates/process/Cargo.toml +++ b/crates/process/Cargo.toml @@ -20,6 +20,7 @@ process-wrap = { version = "8.2.0", default-features = false, features = [ "kill-on-drop", "process-group", "tokio1", + "tracing", ] } rustc-hash = { workspace = true } starbase_shell = { workspace = true } diff --git a/crates/process/src/exec_command.rs b/crates/process/src/exec_command.rs index a74c46ad376..0b78889fdac 100644 --- a/crates/process/src/exec_command.rs +++ b/crates/process/src/exec_command.rs @@ -22,15 +22,15 @@ fn wrap_command(command: TokioCommand) -> TokioCommandWrap { let mut command = TokioCommandWrap::from(command); command.wrap(KillOnDrop); - #[cfg(unix)] - { - command.wrap(ProcessGroup::leader()); - } + // #[cfg(unix)] + // { + // command.wrap(ProcessGroup::leader()); + // } - #[cfg(windows)] - { - command.wrap(JobObject); - } + // #[cfg(windows)] + // { + // command.wrap(JobObject); + // } command } @@ -451,6 +451,7 @@ impl Command { debug!( pid = child.id(), + shell = self.shell.as_ref().map(|sh| &sh.bin_name), env_vars = ?env_vars, working_dir = ?working_dir, "Running command {}", diff --git a/crates/process/src/process_registry.rs b/crates/process/src/process_registry.rs index 4c31a5e70ff..44e69416d85 100644 --- a/crates/process/src/process_registry.rs +++ b/crates/process/src/process_registry.rs @@ -8,7 +8,7 @@ use tokio::sync::broadcast::{self, error::RecvError, Receiver, Sender}; use tokio::sync::RwLock; use tokio::task::JoinHandle; use tokio::time::sleep; -use tracing::debug; +use tracing::{debug, trace, warn}; static INSTANCE: OnceLock> = OnceLock::new(); @@ -145,18 +145,21 @@ async fn shutdown_processes_with_signal( children.len() ); - // We are using process groups, so manually killing these - // child processes should not be necessary! Instead, just - // empty the map so that our wait method doesn't hang... - children.clear(); + // children.clear(); - // for (pid, child) in children.drain() { - // trace!(pid, "Killing child process"); + let mut futures = vec![]; - // if let Err(error) = child.kill_with_signal(signal).await { - // warn!(pid, "Failed to kill child process: {error}"); - // } + for (pid, child) in children.drain() { + trace!(pid, "Killing child process"); - // drop(child); - // } + futures.push(tokio::spawn(async move { + if let Err(error) = child.kill_with_signal(signal).await { + warn!(pid, "Failed to kill child process: {error}"); + } + })); + } + + for future in futures { + let _ = future.await; + } } diff --git a/crates/process/src/shared_child.rs b/crates/process/src/shared_child.rs index 67a3fcbd87b..74f1d757606 100644 --- a/crates/process/src/shared_child.rs +++ b/crates/process/src/shared_child.rs @@ -8,7 +8,7 @@ use tokio::sync::Mutex; #[derive(Clone)] pub struct SharedChild { - inner: Arc>>>, + inner: Arc>>, pid: u32, } @@ -16,7 +16,7 @@ impl SharedChild { pub fn new(child: Box) -> Self { Self { pid: child.id().unwrap(), - inner: Arc::new(Mutex::new(Some(child))), + inner: Arc::new(Mutex::new(child)), } } @@ -25,35 +25,21 @@ impl SharedChild { } pub async fn take_stdin(&self) -> Option { - self.inner - .lock() - .await - .as_mut() - .and_then(|child| child.stdin().take()) + self.inner.lock().await.as_mut().stdin().take() } pub async fn take_stdout(&self) -> Option { - self.inner - .lock() - .await - .as_mut() - .and_then(|child| child.stdout().take()) + self.inner.lock().await.as_mut().stdout().take() } pub async fn take_stderr(&self) -> Option { - self.inner - .lock() - .await - .as_mut() - .and_then(|child| child.stderr().take()) + self.inner.lock().await.as_mut().stderr().take() } pub async fn kill(&self) -> io::Result<()> { let mut child = self.inner.lock().await; - if let Some(mut child) = child.take() { - Box::into_pin(child.kill()).await?; - } + Box::into_pin(child.kill()).await?; Ok(()) } @@ -61,46 +47,67 @@ impl SharedChild { pub async fn kill_with_signal(&self, signal: SignalType) -> io::Result<()> { let mut child = self.inner.lock().await; - if let Some(mut child) = child.take() { - // https://github.com/rust-lang/rust/blob/master/library/std/src/sys/pal/unix/process/process_unix.rs#L947 - #[cfg(unix)] - { - child.signal(match signal { - SignalType::Interrupt => 2, // SIGINT - SignalType::Quit => 3, // SIGQUIT - SignalType::Terminate => 15, // SIGTERM - })?; - } + dbg!("kill_with_signal", self.id(), signal); - // https://github.com/rust-lang/rust/blob/master/library/std/src/sys/pal/windows/process.rs#L658 - #[cfg(windows)] - { - child.start_kill()?; - } + // https://github.com/rust-lang/rust/blob/master/library/std/src/sys/pal/unix/process/process_unix.rs#L947 + #[cfg(unix)] + { + child.signal(match signal { + SignalType::Interrupt => 2, // SIGINT + SignalType::Quit => 3, // SIGQUIT + SignalType::Terminate => 15, // SIGTERM + })?; + } - Box::into_pin(child.wait()).await?; + // https://github.com/rust-lang/rust/blob/master/library/std/src/sys/pal/windows/process.rs#L658 + #[cfg(windows)] + { + child.start_kill()?; } + Box::into_pin(child.wait()).await?; + Ok(()) } pub(crate) async fn wait(&self) -> io::Result { let mut child = self.inner.lock().await; - if let Some(child) = child.as_mut() { - return Box::into_pin(child.wait()).await; - } - - unreachable!() + Box::into_pin(child.wait()).await } + // This method re-implements the tokio `wait_with_output` method + // but does not take ownership of self. This is required to be able + // to call `kill`, otherwise the child does not exist. pub(crate) async fn wait_with_output(&self) -> io::Result { + use tokio::{io::AsyncReadExt, try_join}; + let mut child = self.inner.lock().await; - if let Some(child) = child.take() { - return Box::into_pin(child.wait_with_output()).await; + async fn read_to_end(data: &mut Option) -> io::Result> { + let mut vec = Vec::new(); + if let Some(data) = data.as_mut() { + data.read_to_end(&mut vec).await?; + } + Ok(vec) } - unreachable!() + let mut stdout_pipe = child.stdout().take(); + let mut stderr_pipe = child.stderr().take(); + + let stdout_fut = read_to_end(&mut stdout_pipe); + let stderr_fut = read_to_end(&mut stderr_pipe); + + let (status, stdout, stderr) = + try_join!(Box::into_pin(child.wait()), stdout_fut, stderr_fut)?; + + drop(stdout_pipe); + drop(stderr_pipe); + + Ok(Output { + status, + stdout, + stderr, + }) } } diff --git a/crates/process/src/shell.rs b/crates/process/src/shell.rs index 7df7a9acc50..dbfe660d48e 100644 --- a/crates/process/src/shell.rs +++ b/crates/process/src/shell.rs @@ -33,6 +33,7 @@ pub fn is_windows_script>(bin: T) -> bool { pub struct Shell { pub bin: PathBuf, + pub bin_name: String, pub command: ShellCommand, } @@ -42,7 +43,8 @@ impl Shell { let command = type_of.build().get_exec_command(); Self { - bin: find_command_on_path(bin_name.clone()).unwrap_or_else(|| bin_name.into()), + bin: find_command_on_path(bin_name.clone()).unwrap_or_else(|| bin_name.clone().into()), + bin_name, command, } } diff --git a/package.json b/package.json index 7e161b7d7a6..b4ecbc2f2e3 100644 --- a/package.json +++ b/package.json @@ -9,6 +9,7 @@ }, "workspaces": [ "packages/*", + "scenarios/*", "website" ], "engines": { diff --git a/scenarios/signals/moon.yml b/scenarios/signals/moon.yml new file mode 100644 index 00000000000..007eaf6e096 --- /dev/null +++ b/scenarios/signals/moon.yml @@ -0,0 +1,18 @@ +language: 'javascript' + +tasks: + dev-1: + command: 'node signals.mjs' + preset: 'server' + dev-2: + command: 'node signals.mjs' + preset: 'server' + dev-3: + command: 'node signals.mjs' + preset: 'server' + dev: + deps: ['dev-1', 'dev-2', 'dev-3'] + +toolchain: + typescript: + disabled: true diff --git a/scenarios/signals/signals.mjs b/scenarios/signals/signals.mjs new file mode 100644 index 00000000000..b609267bfc7 --- /dev/null +++ b/scenarios/signals/signals.mjs @@ -0,0 +1,15 @@ +let target = process.env.MOON_TARGET; + +console.log(`[${target}] Running`); + +for (let event of ['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM', 'SIGBREAK']) { + process.on(event, (signal, code) => { + console.log(`[${target}] Received ${signal} (${code})!`); + process.exit(128 + code); + }); +} + +// Cause the process to take a while! +await new Promise((resolve) => { + setTimeout(resolve, 30000); +}); diff --git a/tsconfig.json b/tsconfig.json index 8a469614b6b..528eab34904 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -17,6 +17,9 @@ { "path": "packages/visualizer" }, + { + "path": "scenarios/signals" + }, { "path": "website" }