Skip to content

Commit e9be98d

Browse files
vidhyavfacebook-github-bot
authored andcommitted
Added watchdog timeout when stop is issued for stuck actors. (#336)
Summary: Allocators have a timeout to kill unresponsive procs when stop() is issued. Differential Revision: D77303504
1 parent 0843625 commit e9be98d

File tree

11 files changed

+324
-16
lines changed

11 files changed

+324
-16
lines changed

hyperactor/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ declare_attrs! {
3232
/// Message delivery timeout
3333
pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
3434

35+
/// Process exit timeout
36+
pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
37+
38+
/// Process exit timeout
39+
pub attr ACTOR_EXIT_TIMEOUT: Duration = Duration::from_millis(10);
40+
3541
/// Message acknowledgment interval
3642
pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
3743

hyperactor/src/sync/monitor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl IntoFuture for Handle {
103103
/// the group will be aborted if any task fails or if the group is aborted.
104104
///
105105
/// The group is also aborted if the group itself is dropped.
106+
#[derive(Clone)]
106107
pub struct Group(Arc<Mutex<State>>);
107108

108109
/// The status of a group. Groups start out in [`Status::Running`]

hyperactor_mesh/src/alloc/mod.rs

Lines changed: 200 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,52 @@ pub trait Alloc {
228228
}
229229

230230
pub mod test_utils {
231+
use std::time::Duration;
232+
233+
use hyperactor::Actor;
234+
use hyperactor::Handler;
235+
use hyperactor::Instance;
236+
use hyperactor::Named;
231237
use tokio::sync::broadcast::Receiver;
232238
use tokio::sync::broadcast::Sender;
233239

234240
use super::*;
235241

242+
// This can't be defined under a `#[cfg(test)]` because there needs to
243+
// be an entry in the spawnable actor registry in the executable
244+
// 'hyperactor_mesh_test_bootstrap' for the `tests::process` actor
245+
// mesh test suite.
246+
#[derive(Debug)]
247+
#[hyperactor::export(
248+
spawn = true,
249+
handlers = [
250+
Wait
251+
],
252+
)]
253+
pub struct TestActor;
254+
255+
#[async_trait]
256+
impl Actor for TestActor {
257+
type Params = ();
258+
259+
async fn new(_params: Self::Params) -> Result<Self, anyhow::Error> {
260+
Ok(Self)
261+
}
262+
}
263+
264+
#[derive(Debug, Serialize, Deserialize, Named, Clone)]
265+
pub struct Wait();
266+
267+
#[async_trait]
268+
impl Handler<Wait> for TestActor {
269+
async fn handle(&mut self, _: &Instance<Self>, Wait(): Wait) -> Result<(), anyhow::Error> {
270+
loop {
271+
#[allow(clippy::disallowed_methods)]
272+
tokio::time::sleep(Duration::from_secs(60)).await;
273+
}
274+
}
275+
}
276+
236277
/// Test wrapper around MockAlloc to allow us to block next() calls since
237278
/// mockall doesn't support returning futures.
238279
pub struct MockAllocWrapper {
@@ -302,12 +343,29 @@ pub mod test_utils {
302343

303344
#[cfg(test)]
304345
pub(crate) mod testing {
346+
use core::panic;
305347
use std::collections::HashMap;
306348
use std::collections::HashSet;
307-
349+
use std::time::Duration;
350+
351+
use hyperactor::Mailbox;
352+
use hyperactor::actor::remote::Remote;
353+
use hyperactor::channel;
354+
use hyperactor::mailbox;
355+
use hyperactor::mailbox::BoxedMailboxSender;
356+
use hyperactor::mailbox::DialMailboxRouter;
357+
use hyperactor::mailbox::IntoBoxedMailboxSender;
358+
use hyperactor::mailbox::MailboxServer;
359+
use hyperactor::mailbox::UndeliverableMailboxSender;
360+
use hyperactor::proc::Proc;
361+
use hyperactor::reference::Reference;
308362
use ndslice::shape;
363+
use tokio::process::Command;
309364

310365
use super::*;
366+
use crate::alloc::test_utils::TestActor;
367+
use crate::alloc::test_utils::Wait;
368+
use crate::proc_mesh::mesh_agent::MeshAgentMessageClient;
311369

312370
#[macro_export]
313371
macro_rules! alloc_test_suite {
@@ -367,4 +425,145 @@ pub(crate) mod testing {
367425
assert!(alloc.next().await.is_none());
368426
assert_eq!(stopped, running);
369427
}
428+
429+
async fn spawn_proc(alloc: &ProcessAlloc) -> (DialMailboxRouter, Mailbox, Proc, ChannelAddr) {
430+
let (router_channel_addr, router_rx) = channel::serve(ChannelAddr::any(alloc.transport()))
431+
.await
432+
.map_err(|err| AllocatorError::Other(err.into()))
433+
.unwrap();
434+
let router =
435+
DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed());
436+
router
437+
.clone()
438+
.serve(router_rx, mailbox::monitored_return_handle());
439+
440+
let client_proc_id = ProcId(WorldId(format!("test_{}", alloc.world_id().name())), 0);
441+
let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(alloc.transport()))
442+
.await
443+
.map_err(|err| AllocatorError::Other(err.into()))
444+
.unwrap();
445+
let client_proc = Proc::new(
446+
client_proc_id.clone(),
447+
BoxedMailboxSender::new(router.clone()),
448+
);
449+
client_proc
450+
.clone()
451+
.serve(client_rx, mailbox::monitored_return_handle());
452+
router.bind(client_proc_id.clone().into(), client_proc_addr);
453+
(
454+
router,
455+
client_proc.attach("test_proc").unwrap(),
456+
client_proc,
457+
router_channel_addr,
458+
)
459+
}
460+
461+
async fn spawn_test_actor(
462+
rank: usize,
463+
client_proc: &Proc,
464+
client: &Mailbox,
465+
router_channel_addr: ChannelAddr,
466+
mesh_agent: ActorRef<MeshAgent>,
467+
) -> ActorRef<TestActor> {
468+
let supervisor = client_proc.attach("supervisor").unwrap();
469+
let (supervison_port, _) = supervisor.open_port();
470+
let (config_handle, _) = client.open_port();
471+
mesh_agent
472+
.configure(
473+
client,
474+
rank,
475+
router_channel_addr,
476+
supervison_port.bind(),
477+
HashMap::new(),
478+
config_handle.bind(),
479+
)
480+
.await
481+
.unwrap();
482+
let remote = Remote::collect();
483+
let actor_type = remote
484+
.name_of::<TestActor>()
485+
.ok_or(anyhow::anyhow!("actor not registered"))
486+
.unwrap()
487+
.to_string();
488+
let params = &();
489+
let (completed_handle, mut completed_receiver) = mailbox::open_port(client);
490+
// gspawn actor
491+
mesh_agent
492+
.gspawn(
493+
client,
494+
actor_type,
495+
"Stuck".to_string(),
496+
bincode::serialize(params).unwrap(),
497+
completed_handle.bind(),
498+
)
499+
.await
500+
.unwrap();
501+
let (_, actor_id) = completed_receiver.recv().await.unwrap();
502+
ActorRef::<TestActor>::attest(actor_id)
503+
}
504+
505+
#[timed_test::async_timed_test(timeout_secs = 120)]
506+
async fn test_allocator_stuck_task() {
507+
// Override config.
508+
// Use temporary config for this test
509+
let config = hyperactor::config::global::lock();
510+
let _guard = config.override_key(
511+
hyperactor::config::PROCESS_EXIT_TIMEOUT,
512+
Duration::from_secs(1),
513+
);
514+
515+
let mut command =
516+
Command::new(buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap());
517+
command.arg("300000");
518+
519+
let mut allocator = ProcessAllocator::new(command);
520+
let mut alloc = allocator
521+
.allocate(AllocSpec {
522+
shape: shape! { replica = 1 },
523+
constraints: Default::default(),
524+
})
525+
.await
526+
.unwrap();
527+
528+
// Get everything up into running state. We require that we get
529+
let mut procs = HashMap::new();
530+
let mut running = HashSet::new();
531+
let mut actor_ref = None;
532+
let (router, client, client_proc, router_addr) = spawn_proc(&alloc).await;
533+
while running.is_empty() {
534+
match alloc.next().await.unwrap() {
535+
ProcState::Created { proc_id, coords } => {
536+
procs.insert(proc_id, coords);
537+
}
538+
ProcState::Running {
539+
proc_id,
540+
mesh_agent,
541+
addr,
542+
} => {
543+
router.bind(Reference::Proc(proc_id.clone()), addr.clone());
544+
545+
assert!(procs.contains_key(&proc_id));
546+
assert!(!running.contains(&proc_id));
547+
548+
actor_ref = Some(
549+
spawn_test_actor(0, &client_proc, &client, router_addr, mesh_agent).await,
550+
);
551+
running.insert(proc_id);
552+
break;
553+
}
554+
event => panic!("unexpected event: {:?}", event),
555+
}
556+
}
557+
assert!(actor_ref.unwrap().send(&client, Wait()).is_ok());
558+
559+
// There is a stuck actor! We should get a watchdog failure.
560+
alloc.stop().await.unwrap();
561+
let mut stopped = HashSet::new();
562+
while let Some(ProcState::Stopped { proc_id, reason }) = alloc.next().await {
563+
assert_eq!(reason, ProcStopReason::Watchdog);
564+
stopped.insert(proc_id);
565+
}
566+
assert!(alloc.next().await.is_none());
567+
assert_eq!(stopped, running);
568+
}
370569
}

hyperactor_mesh/src/alloc/process.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use hyperactor::channel::ChannelTx;
2828
use hyperactor::channel::Rx;
2929
use hyperactor::channel::Tx;
3030
use hyperactor::channel::TxStatus;
31+
use hyperactor::sync::flag;
3132
use hyperactor::sync::monitor;
3233
use ndslice::Shape;
3334
use tokio::io;
@@ -130,8 +131,9 @@ enum ChannelState {
130131
struct Child {
131132
channel: ChannelState,
132133
group: monitor::Group,
133-
stdout: LogTailer,
134+
exit_flag: Option<flag::Flag>,
134135
stderr: LogTailer,
136+
stdout: LogTailer,
135137
stop_reason: Arc<OnceLock<ProcStopReason>>,
136138
}
137139

@@ -140,6 +142,7 @@ impl Child {
140142
mut process: tokio::process::Child,
141143
) -> (Self, impl Future<Output = ProcStopReason>) {
142144
let (group, handle) = monitor::group();
145+
let (exit_flag, exit_guard) = flag::guarded();
143146

144147
let stdout = LogTailer::tee(
145148
MAX_TAIL_LOG_LINES,
@@ -156,6 +159,7 @@ impl Child {
156159
let child = Self {
157160
channel: ChannelState::NotConnected,
158161
group,
162+
exit_flag: Some(exit_flag),
159163
stdout,
160164
stderr,
161165
stop_reason: Arc::clone(&stop_reason),
@@ -178,6 +182,8 @@ impl Child {
178182
}
179183
result = process.wait() => Self::exit_status_to_reason(result),
180184
};
185+
exit_guard.signal();
186+
181187
stop_reason.get_or_init(|| reason).clone()
182188
};
183189

@@ -234,10 +240,23 @@ impl Child {
234240
true
235241
}
236242

243+
fn spawn_watchdog(&mut self) {
244+
if let Some(exit_flag) = self.exit_flag.take() {
245+
let group = self.group.clone();
246+
let stop_reason = self.stop_reason.clone();
247+
tokio::spawn(async move {
248+
let exit_timeout =
249+
hyperactor::config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
250+
if tokio::time::timeout(exit_timeout, exit_flag).await.is_err() {
251+
let _ = stop_reason.set(ProcStopReason::Watchdog);
252+
group.fail();
253+
}
254+
});
255+
}
256+
}
257+
237258
#[hyperactor::instrument_infallible]
238259
fn post(&mut self, message: Allocator2Process) {
239-
// We're here simply assuming that if we're not connected, we're about to
240-
// be killed.
241260
if let ChannelState::Connected(channel) = &mut self.channel {
242261
channel.post(message);
243262
} else {
@@ -439,6 +458,7 @@ impl Alloc for ProcessAlloc {
439458
// for liveness.
440459
for (_index, child) in self.active.iter_mut() {
441460
child.post(Allocator2Process::StopAndExit(0));
461+
child.spawn_watchdog();
442462
}
443463

444464
self.running = false;

hyperactor_mesh/src/alloc/remoteprocess.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,7 +1053,7 @@ mod test {
10531053
}
10541054
}
10551055

1056-
fn set_procstate_execptations(alloc: &mut MockAlloc, shape: Shape) {
1056+
fn set_procstate_expectations(alloc: &mut MockAlloc, shape: Shape) {
10571057
let alloc_len = shape.slice().len();
10581058
alloc.expect_shape().return_const(shape.clone());
10591059
for i in 0..alloc_len {
@@ -1108,7 +1108,7 @@ mod test {
11081108
alloc.expect_world_id().return_const(world_id.clone());
11091109
alloc.expect_shape().return_const(spec.shape.clone());
11101110

1111-
set_procstate_execptations(&mut alloc, spec.shape.clone());
1111+
set_procstate_expectations(&mut alloc, spec.shape.clone());
11121112

11131113
// final none
11141114
alloc.expect_next().return_const(None);
@@ -1247,7 +1247,7 @@ mod test {
12471247
alloc.alloc.expect_world_id().return_const(world_id.clone());
12481248
alloc.alloc.expect_shape().return_const(spec.shape.clone());
12491249

1250-
set_procstate_execptations(&mut alloc.alloc, spec.shape.clone());
1250+
set_procstate_expectations(&mut alloc.alloc, spec.shape.clone());
12511251

12521252
alloc.alloc.expect_next().return_const(None);
12531253
alloc.alloc.expect_stop().times(1).return_once(|| Ok(()));
@@ -1324,7 +1324,7 @@ mod test {
13241324
.return_const(world_id.clone());
13251325
alloc1.alloc.expect_shape().return_const(spec.shape.clone());
13261326

1327-
set_procstate_execptations(&mut alloc1.alloc, spec.shape.clone());
1327+
set_procstate_expectations(&mut alloc1.alloc, spec.shape.clone());
13281328
alloc1.alloc.expect_next().return_const(None);
13291329
alloc1.alloc.expect_stop().times(1).return_once(|| Ok(()));
13301330
// second allocation
@@ -1339,7 +1339,7 @@ mod test {
13391339
.expect_world_id()
13401340
.return_const(world_id.clone());
13411341
alloc2.alloc.expect_shape().return_const(spec.shape.clone());
1342-
set_procstate_execptations(&mut alloc2.alloc, spec.shape.clone());
1342+
set_procstate_expectations(&mut alloc2.alloc, spec.shape.clone());
13431343
alloc2.alloc.expect_next().return_const(None);
13441344
alloc2.alloc.expect_stop().times(1).return_once(|| Ok(()));
13451345

@@ -1444,7 +1444,7 @@ mod test {
14441444
alloc.alloc.expect_world_id().return_const(world_id.clone());
14451445
alloc.alloc.expect_shape().return_const(spec.shape.clone());
14461446

1447-
set_procstate_execptations(&mut alloc.alloc, spec.shape.clone());
1447+
set_procstate_expectations(&mut alloc.alloc, spec.shape.clone());
14481448

14491449
alloc.alloc.expect_next().return_const(None);
14501450
// we expect a stop due to the failure

0 commit comments

Comments
 (0)