Skip to content

Commit 875e191

Browse files
vidhyavfacebook-github-bot
authored andcommitted
Added watchdog timeout when stop is issued for stuck actors. (#336)
Summary: Pull Request resolved: #336 Allocators have a timeout to kill unresponsive procs when stop() is issued. Here is the reason why we needed in the first place: While running an xlformers test, we encountered a problem wherein the process was getting stuck upon issuing an exit. The root cause for the blocking was the cuda unregistry routine getting stuck blocking the exit() call and all other calls. In order to simulate the same, we added here an exit handler that loops forever. Reviewed By: suo, technicianted Differential Revision: D77303504
1 parent 25316ff commit 875e191

File tree

11 files changed

+329
-10
lines changed

11 files changed

+329
-10
lines changed

hyperactor/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ declare_attrs! {
3232
/// Message delivery timeout
3333
pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
3434

35+
/// Process exit timeout
36+
pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
37+
3538
/// Message acknowledgment interval
3639
pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
3740

hyperactor/src/sync/monitor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl IntoFuture for Handle {
103103
/// the group will be aborted if any task fails or if the group is aborted.
104104
///
105105
/// The group is also aborted if the group itself is dropped.
106+
#[derive(Clone)]
106107
pub struct Group(Arc<Mutex<State>>);
107108

108109
/// The status of a group. Groups start out in [`Status::Running`]

hyperactor_mesh/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ futures = { version = "0.3.30", features = ["async-await", "compat"] }
3636
hyperactor = { version = "0.0.0", path = "../hyperactor" }
3737
hyperactor_mesh_macros = { version = "0.0.0", path = "../hyperactor_mesh_macros" }
3838
hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" }
39+
libc = "0.2.139"
3940
mockall = "0.13.1"
4041
ndslice = { version = "0.0.0", path = "../ndslice" }
4142
nix = { version = "0.29.0", features = ["dir", "event", "hostname", "inotify", "ioctl", "mman", "mount", "net", "poll", "ptrace", "reboot", "resource", "sched", "signal", "term", "time", "user", "zerocopy"] }

hyperactor_mesh/src/alloc.rs

Lines changed: 214 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,62 @@ pub trait Alloc {
235235
}
236236

237237
pub mod test_utils {
238+
use std::time::Duration;
239+
240+
use hyperactor::Actor;
241+
use hyperactor::Handler;
242+
use hyperactor::Instance;
243+
use hyperactor::Named;
244+
use libc::atexit;
238245
use tokio::sync::broadcast::Receiver;
239246
use tokio::sync::broadcast::Sender;
240247

241248
use super::*;
242249

250+
extern "C" fn exit_handler() {
251+
loop {
252+
#[allow(clippy::disallowed_methods)]
253+
std::thread::sleep(Duration::from_secs(60));
254+
}
255+
}
256+
257+
// This can't be defined under a `#[cfg(test)]` because there needs to
258+
// be an entry in the spawnable actor registry in the executable
259+
// 'hyperactor_mesh_test_bootstrap' for the `tests::process` actor
260+
// mesh test suite.
261+
#[derive(Debug)]
262+
#[hyperactor::export(
263+
spawn = true,
264+
handlers = [
265+
Wait
266+
],
267+
)]
268+
pub struct TestActor;
269+
270+
#[async_trait]
271+
impl Actor for TestActor {
272+
type Params = ();
273+
274+
async fn new(_params: Self::Params) -> Result<Self, anyhow::Error> {
275+
Ok(Self)
276+
}
277+
}
278+
279+
#[derive(Debug, Serialize, Deserialize, Named, Clone)]
280+
pub struct Wait();
281+
282+
#[async_trait]
283+
impl Handler<Wait> for TestActor {
284+
async fn handle(&mut self, _: &Instance<Self>, Wait(): Wait) -> Result<(), anyhow::Error> {
285+
// SAFETY:
286+
// This is in order to simulate a process in tests that never exits.
287+
unsafe {
288+
atexit(exit_handler);
289+
}
290+
Ok(())
291+
}
292+
}
293+
243294
/// Test wrapper around MockAlloc to allow us to block next() calls since
244295
/// mockall doesn't support returning futures.
245296
pub struct MockAllocWrapper {
@@ -309,12 +360,29 @@ pub mod test_utils {
309360

310361
#[cfg(test)]
311362
pub(crate) mod testing {
363+
use core::panic;
312364
use std::collections::HashMap;
313365
use std::collections::HashSet;
314-
366+
use std::time::Duration;
367+
368+
use hyperactor::Mailbox;
369+
use hyperactor::actor::remote::Remote;
370+
use hyperactor::channel;
371+
use hyperactor::mailbox;
372+
use hyperactor::mailbox::BoxedMailboxSender;
373+
use hyperactor::mailbox::DialMailboxRouter;
374+
use hyperactor::mailbox::IntoBoxedMailboxSender;
375+
use hyperactor::mailbox::MailboxServer;
376+
use hyperactor::mailbox::UndeliverableMailboxSender;
377+
use hyperactor::proc::Proc;
378+
use hyperactor::reference::Reference;
315379
use ndslice::shape;
380+
use tokio::process::Command;
316381

317382
use super::*;
383+
use crate::alloc::test_utils::TestActor;
384+
use crate::alloc::test_utils::Wait;
385+
use crate::proc_mesh::mesh_agent::MeshAgentMessageClient;
318386

319387
#[macro_export]
320388
macro_rules! alloc_test_suite {
@@ -376,4 +444,149 @@ pub(crate) mod testing {
376444
assert!(alloc.next().await.is_none());
377445
assert_eq!(stopped, running);
378446
}
447+
448+
async fn spawn_proc(alloc: &ProcessAlloc) -> (DialMailboxRouter, Mailbox, Proc, ChannelAddr) {
449+
let (router_channel_addr, router_rx) = channel::serve(ChannelAddr::any(alloc.transport()))
450+
.await
451+
.map_err(|err| AllocatorError::Other(err.into()))
452+
.unwrap();
453+
let router =
454+
DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed());
455+
router
456+
.clone()
457+
.serve(router_rx, mailbox::monitored_return_handle());
458+
459+
let client_proc_id = ProcId(WorldId(format!("test_{}", alloc.world_id().name())), 0);
460+
let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(alloc.transport()))
461+
.await
462+
.map_err(|err| AllocatorError::Other(err.into()))
463+
.unwrap();
464+
let client_proc = Proc::new(
465+
client_proc_id.clone(),
466+
BoxedMailboxSender::new(router.clone()),
467+
);
468+
client_proc
469+
.clone()
470+
.serve(client_rx, mailbox::monitored_return_handle());
471+
router.bind(client_proc_id.clone().into(), client_proc_addr);
472+
(
473+
router,
474+
client_proc.attach("test_proc").unwrap(),
475+
client_proc,
476+
router_channel_addr,
477+
)
478+
}
479+
480+
async fn spawn_test_actor(
481+
rank: usize,
482+
client_proc: &Proc,
483+
client: &Mailbox,
484+
router_channel_addr: ChannelAddr,
485+
mesh_agent: ActorRef<MeshAgent>,
486+
) -> ActorRef<TestActor> {
487+
let supervisor = client_proc.attach("supervisor").unwrap();
488+
let (supervison_port, _) = supervisor.open_port();
489+
let (config_handle, _) = client.open_port();
490+
mesh_agent
491+
.configure(
492+
client,
493+
rank,
494+
router_channel_addr,
495+
supervison_port.bind(),
496+
HashMap::new(),
497+
config_handle.bind(),
498+
)
499+
.await
500+
.unwrap();
501+
let remote = Remote::collect();
502+
let actor_type = remote
503+
.name_of::<TestActor>()
504+
.ok_or(anyhow::anyhow!("actor not registered"))
505+
.unwrap()
506+
.to_string();
507+
let params = &();
508+
let (completed_handle, mut completed_receiver) = mailbox::open_port(client);
509+
// gspawn actor
510+
mesh_agent
511+
.gspawn(
512+
client,
513+
actor_type,
514+
"Stuck".to_string(),
515+
bincode::serialize(params).unwrap(),
516+
completed_handle.bind(),
517+
)
518+
.await
519+
.unwrap();
520+
let (_, actor_id) = completed_receiver.recv().await.unwrap();
521+
ActorRef::<TestActor>::attest(actor_id)
522+
}
523+
524+
/// In order to simulate stuckness, we have to do two things:
525+
/// An actor that is blocked forever AND
526+
/// a proc that does not time out when it is asked to wait for
527+
/// a stuck actor.
528+
#[tokio::test]
529+
async fn test_allocator_stuck_task() {
530+
// Override config.
531+
// Use temporary config for this test
532+
let config = hyperactor::config::global::lock();
533+
let _guard = config.override_key(
534+
hyperactor::config::PROCESS_EXIT_TIMEOUT,
535+
Duration::from_secs(1),
536+
);
537+
538+
let mut command =
539+
Command::new(buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap());
540+
let mut allocator = ProcessAllocator::new(command);
541+
let mut alloc = allocator
542+
.allocate(AllocSpec {
543+
shape: shape! { replica = 1 },
544+
constraints: Default::default(),
545+
})
546+
.await
547+
.unwrap();
548+
549+
// Get everything up into running state. We require that we get
550+
let mut procs = HashMap::new();
551+
let mut running = HashSet::new();
552+
let mut actor_ref = None;
553+
let (router, client, client_proc, router_addr) = spawn_proc(&alloc).await;
554+
while running.is_empty() {
555+
match alloc.next().await.unwrap() {
556+
ProcState::Created {
557+
proc_id, coords, ..
558+
} => {
559+
procs.insert(proc_id, coords);
560+
}
561+
ProcState::Running {
562+
proc_id,
563+
mesh_agent,
564+
addr,
565+
} => {
566+
router.bind(Reference::Proc(proc_id.clone()), addr.clone());
567+
568+
assert!(procs.contains_key(&proc_id));
569+
assert!(!running.contains(&proc_id));
570+
571+
actor_ref = Some(
572+
spawn_test_actor(0, &client_proc, &client, router_addr, mesh_agent).await,
573+
);
574+
running.insert(proc_id);
575+
break;
576+
}
577+
event => panic!("unexpected event: {:?}", event),
578+
}
579+
}
580+
assert!(actor_ref.unwrap().send(&client, Wait()).is_ok());
581+
582+
// There is a stuck actor! We should get a watchdog failure.
583+
alloc.stop().await.unwrap();
584+
let mut stopped = HashSet::new();
585+
while let Some(ProcState::Stopped { proc_id, reason }) = alloc.next().await {
586+
assert_eq!(reason, ProcStopReason::Watchdog);
587+
stopped.insert(proc_id);
588+
}
589+
assert!(alloc.next().await.is_none());
590+
assert_eq!(stopped, running);
591+
}
379592
}

hyperactor_mesh/src/alloc/process.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use hyperactor::channel::ChannelTx;
2828
use hyperactor::channel::Rx;
2929
use hyperactor::channel::Tx;
3030
use hyperactor::channel::TxStatus;
31+
use hyperactor::sync::flag;
3132
use hyperactor::sync::monitor;
3233
use ndslice::Shape;
3334
use tokio::io;
@@ -130,8 +131,9 @@ enum ChannelState {
130131
struct Child {
131132
channel: ChannelState,
132133
group: monitor::Group,
133-
stdout: LogTailer,
134+
exit_flag: Option<flag::Flag>,
134135
stderr: LogTailer,
136+
stdout: LogTailer,
135137
stop_reason: Arc<OnceLock<ProcStopReason>>,
136138
}
137139

@@ -140,6 +142,7 @@ impl Child {
140142
mut process: tokio::process::Child,
141143
) -> (Self, impl Future<Output = ProcStopReason>) {
142144
let (group, handle) = monitor::group();
145+
let (exit_flag, exit_guard) = flag::guarded();
143146

144147
let stdout = LogTailer::tee(
145148
MAX_TAIL_LOG_LINES,
@@ -156,6 +159,7 @@ impl Child {
156159
let child = Self {
157160
channel: ChannelState::NotConnected,
158161
group,
162+
exit_flag: Some(exit_flag),
159163
stdout,
160164
stderr,
161165
stop_reason: Arc::clone(&stop_reason),
@@ -178,6 +182,8 @@ impl Child {
178182
}
179183
result = process.wait() => Self::exit_status_to_reason(result),
180184
};
185+
exit_guard.signal();
186+
181187
stop_reason.get_or_init(|| reason).clone()
182188
};
183189

@@ -234,10 +240,23 @@ impl Child {
234240
true
235241
}
236242

243+
fn spawn_watchdog(&mut self) {
244+
if let Some(exit_flag) = self.exit_flag.take() {
245+
let group = self.group.clone();
246+
let stop_reason = self.stop_reason.clone();
247+
tokio::spawn(async move {
248+
let exit_timeout =
249+
hyperactor::config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
250+
if tokio::time::timeout(exit_timeout, exit_flag).await.is_err() {
251+
let _ = stop_reason.set(ProcStopReason::Watchdog);
252+
group.fail();
253+
}
254+
});
255+
}
256+
}
257+
237258
#[hyperactor::instrument_infallible]
238259
fn post(&mut self, message: Allocator2Process) {
239-
// We're here simply assuming that if we're not connected, we're about to
240-
// be killed.
241260
if let ChannelState::Connected(channel) = &mut self.channel {
242261
channel.post(message);
243262
} else {
@@ -446,6 +465,7 @@ impl Alloc for ProcessAlloc {
446465
// for liveness.
447466
for (_index, child) in self.active.iter_mut() {
448467
child.post(Allocator2Process::StopAndExit(0));
468+
child.spawn_watchdog();
449469
}
450470

451471
self.running = false;

0 commit comments

Comments
 (0)