Skip to content

Commit 79511d0

Browse files
vidhyavfacebook-github-bot
authored andcommitted
Added watchdog timeout when stop is issued for stuck actors. (#336)
Summary: Allocators have a timeout to kill unresponsive procs when stop() is issued. Differential Revision: D77303504
1 parent 88fae60 commit 79511d0

File tree

11 files changed

+325
-16
lines changed

11 files changed

+325
-16
lines changed

hyperactor/src/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ declare_attrs! {
3232
/// Message delivery timeout
3333
pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
3434

35+
/// Process exit timeout
36+
pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
37+
38+
/// Process exit timeout
39+
pub attr ACTOR_EXIT_TIMEOUT: Duration = Duration::from_millis(10);
40+
3541
/// Message acknowledgment interval
3642
pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
3743

@@ -86,6 +92,13 @@ pub fn from_env() -> Attrs {
8692

8793
config[IS_MANAGED_SUBPROCESS] = env::var("HYPERACTOR_MANAGED_SUBPROCESS").is_ok();
8894

95+
// Load message ack time interval
96+
if let Ok(val) = env::var("ACTOR_EXIT_TIMEOUT_MS") {
97+
if let Ok(parsed) = val.parse::<u64>() {
98+
config[ACTOR_EXIT_TIMEOUT] = Duration::from_millis(parsed);
99+
}
100+
}
101+
89102
config
90103
}
91104

@@ -112,6 +125,9 @@ pub fn merge(config: &mut Attrs, other: &Attrs) {
112125
if other.contains_key(MESSAGE_DELIVERY_TIMEOUT) {
113126
config[MESSAGE_DELIVERY_TIMEOUT] = other[MESSAGE_DELIVERY_TIMEOUT];
114127
}
128+
if other.contains_key(ACTOR_EXIT_TIMEOUT) {
129+
config[ACTOR_EXIT_TIMEOUT] = other[ACTOR_EXIT_TIMEOUT];
130+
}
115131
if other.contains_key(MESSAGE_ACK_TIME_INTERVAL) {
116132
config[MESSAGE_ACK_TIME_INTERVAL] = other[MESSAGE_ACK_TIME_INTERVAL];
117133
}

hyperactor/src/sync/monitor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl IntoFuture for Handle {
103103
/// the group will be aborted if any task fails or if the group is aborted.
104104
///
105105
/// The group is also aborted if the group itself is dropped.
106+
#[derive(Clone)]
106107
pub struct Group(Arc<Mutex<State>>);
107108

108109
/// The status of a group. Groups start out in [`Status::Running`]

hyperactor_mesh/src/alloc/mod.rs

Lines changed: 203 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,52 @@ pub trait Alloc {
228228
}
229229

230230
pub mod test_utils {
231+
use std::time::Duration;
232+
233+
use hyperactor::Actor;
234+
use hyperactor::Handler;
235+
use hyperactor::Instance;
236+
use hyperactor::Named;
231237
use tokio::sync::broadcast::Receiver;
232238
use tokio::sync::broadcast::Sender;
233239

234240
use super::*;
235241

242+
// This can't be defined under a `#[cfg(test)]` because there needs to
243+
// be an entry in the spawnable actor registry in the executable
244+
// 'hyperactor_mesh_test_bootstrap' for the `tests::process` actor
245+
// mesh test suite.
246+
#[derive(Debug)]
247+
#[hyperactor::export(
248+
spawn = true,
249+
handlers = [
250+
Wait
251+
],
252+
)]
253+
pub struct TestActor;
254+
255+
#[async_trait]
256+
impl Actor for TestActor {
257+
type Params = ();
258+
259+
async fn new(_params: Self::Params) -> Result<Self, anyhow::Error> {
260+
Ok(Self)
261+
}
262+
}
263+
264+
#[derive(Debug, Serialize, Deserialize, Named, Clone)]
265+
pub struct Wait();
266+
267+
#[async_trait]
268+
impl Handler<Wait> for TestActor {
269+
async fn handle(&mut self, _: &Instance<Self>, Wait(): Wait) -> Result<(), anyhow::Error> {
270+
loop {
271+
#[allow(clippy::disallowed_methods)]
272+
tokio::time::sleep(Duration::from_secs(60)).await;
273+
}
274+
}
275+
}
276+
236277
/// Test wrapper around MockAlloc to allow us to block next() calls since
237278
/// mockall doesn't support returning futures.
238279
pub struct MockAllocWrapper {
@@ -302,12 +343,29 @@ pub mod test_utils {
302343

303344
#[cfg(test)]
304345
pub(crate) mod testing {
346+
use core::panic;
305347
use std::collections::HashMap;
306348
use std::collections::HashSet;
307-
349+
use std::time::Duration;
350+
351+
use hyperactor::Mailbox;
352+
use hyperactor::actor::remote::Remote;
353+
use hyperactor::channel;
354+
use hyperactor::mailbox;
355+
use hyperactor::mailbox::BoxedMailboxSender;
356+
use hyperactor::mailbox::DialMailboxRouter;
357+
use hyperactor::mailbox::IntoBoxedMailboxSender;
358+
use hyperactor::mailbox::MailboxServer;
359+
use hyperactor::mailbox::UndeliverableMailboxSender;
360+
use hyperactor::proc::Proc;
361+
use hyperactor::reference::Reference;
308362
use ndslice::shape;
363+
use tokio::process::Command;
309364

310365
use super::*;
366+
use crate::alloc::test_utils::TestActor;
367+
use crate::alloc::test_utils::Wait;
368+
use crate::proc_mesh::mesh_agent::MeshAgentMessageClient;
311369

312370
#[macro_export]
313371
macro_rules! alloc_test_suite {
@@ -367,4 +425,148 @@ pub(crate) mod testing {
367425
assert!(alloc.next().await.is_none());
368426
assert_eq!(stopped, running);
369427
}
428+
429+
async fn spawn_proc(alloc: &ProcessAlloc) -> (DialMailboxRouter, Mailbox, Proc, ChannelAddr) {
430+
let (router_channel_addr, router_rx) = channel::serve(ChannelAddr::any(alloc.transport()))
431+
.await
432+
.map_err(|err| AllocatorError::Other(err.into()))
433+
.unwrap();
434+
let router =
435+
DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed());
436+
router
437+
.clone()
438+
.serve(router_rx, mailbox::monitored_return_handle());
439+
440+
let client_proc_id = ProcId(WorldId(format!("test_{}", alloc.world_id().name())), 0);
441+
let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(alloc.transport()))
442+
.await
443+
.map_err(|err| AllocatorError::Other(err.into()))
444+
.unwrap();
445+
let client_proc = Proc::new(
446+
client_proc_id.clone(),
447+
BoxedMailboxSender::new(router.clone()),
448+
);
449+
client_proc
450+
.clone()
451+
.serve(client_rx, mailbox::monitored_return_handle());
452+
router.bind(client_proc_id.clone().into(), client_proc_addr);
453+
(
454+
router,
455+
client_proc.attach("test_proc").unwrap(),
456+
client_proc,
457+
router_channel_addr,
458+
)
459+
}
460+
461+
async fn spawn_test_actor(
462+
rank: usize,
463+
client_proc: &Proc,
464+
client: &Mailbox,
465+
router_channel_addr: ChannelAddr,
466+
mesh_agent: ActorRef<MeshAgent>,
467+
) -> ActorRef<TestActor> {
468+
let supervisor = client_proc.attach("supervisor").unwrap();
469+
let (supervison_port, _) = supervisor.open_port();
470+
let (config_handle, _) = client.open_port();
471+
mesh_agent
472+
.configure(
473+
client,
474+
rank,
475+
router_channel_addr,
476+
supervison_port.bind(),
477+
HashMap::new(),
478+
config_handle.bind(),
479+
)
480+
.await
481+
.unwrap();
482+
let remote = Remote::collect();
483+
let actor_type = remote
484+
.name_of::<TestActor>()
485+
.ok_or(anyhow::anyhow!("actor not registered"))
486+
.unwrap()
487+
.to_string();
488+
let params = &();
489+
let (completed_handle, mut completed_receiver) = mailbox::open_port(client);
490+
// gspawn actor
491+
mesh_agent
492+
.gspawn(
493+
client,
494+
actor_type,
495+
"Stuck".to_string(),
496+
bincode::serialize(params).unwrap(),
497+
completed_handle.bind(),
498+
)
499+
.await
500+
.unwrap();
501+
let (_, actor_id) = completed_receiver.recv().await.unwrap();
502+
ActorRef::<TestActor>::attest(actor_id)
503+
}
504+
505+
/// In order to simulate stuckness, we have to do two things:
506+
/// An actor that is blocked forever AND
507+
/// a proc that does not time out when it is asked to wait for
508+
/// a stuck actor.
509+
#[timed_test::async_timed_test(timeout_secs = 120)]
510+
async fn test_allocator_stuck_task() {
511+
// Override config.
512+
// Use temporary config for this test
513+
let config = hyperactor::config::global::lock();
514+
let _guard = config.override_key(
515+
hyperactor::config::PROCESS_EXIT_TIMEOUT,
516+
Duration::from_secs(1),
517+
);
518+
519+
let mut command =
520+
Command::new(buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap());
521+
command.env("ACTOR_EXIT_TIMEOUT_MS", "300000");
522+
let mut allocator = ProcessAllocator::new(command);
523+
let mut alloc = allocator
524+
.allocate(AllocSpec {
525+
shape: shape! { replica = 1 },
526+
constraints: Default::default(),
527+
})
528+
.await
529+
.unwrap();
530+
531+
// Get everything up into running state. We require that we get
532+
let mut procs = HashMap::new();
533+
let mut running = HashSet::new();
534+
let mut actor_ref = None;
535+
let (router, client, client_proc, router_addr) = spawn_proc(&alloc).await;
536+
while running.is_empty() {
537+
match alloc.next().await.unwrap() {
538+
ProcState::Created { proc_id, coords } => {
539+
procs.insert(proc_id, coords);
540+
}
541+
ProcState::Running {
542+
proc_id,
543+
mesh_agent,
544+
addr,
545+
} => {
546+
router.bind(Reference::Proc(proc_id.clone()), addr.clone());
547+
548+
assert!(procs.contains_key(&proc_id));
549+
assert!(!running.contains(&proc_id));
550+
551+
actor_ref = Some(
552+
spawn_test_actor(0, &client_proc, &client, router_addr, mesh_agent).await,
553+
);
554+
running.insert(proc_id);
555+
break;
556+
}
557+
event => panic!("unexpected event: {:?}", event),
558+
}
559+
}
560+
assert!(actor_ref.unwrap().send(&client, Wait()).is_ok());
561+
562+
// There is a stuck actor! We should get a watchdog failure.
563+
alloc.stop().await.unwrap();
564+
let mut stopped = HashSet::new();
565+
while let Some(ProcState::Stopped { proc_id, reason }) = alloc.next().await {
566+
assert_eq!(reason, ProcStopReason::Watchdog);
567+
stopped.insert(proc_id);
568+
}
569+
assert!(alloc.next().await.is_none());
570+
assert_eq!(stopped, running);
571+
}
370572
}

hyperactor_mesh/src/alloc/process.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use hyperactor::channel::ChannelTx;
2828
use hyperactor::channel::Rx;
2929
use hyperactor::channel::Tx;
3030
use hyperactor::channel::TxStatus;
31+
use hyperactor::sync::flag;
3132
use hyperactor::sync::monitor;
3233
use ndslice::Shape;
3334
use tokio::io;
@@ -130,8 +131,9 @@ enum ChannelState {
130131
struct Child {
131132
channel: ChannelState,
132133
group: monitor::Group,
133-
stdout: LogTailer,
134+
exit_flag: Option<flag::Flag>,
134135
stderr: LogTailer,
136+
stdout: LogTailer,
135137
stop_reason: Arc<OnceLock<ProcStopReason>>,
136138
}
137139

@@ -140,6 +142,7 @@ impl Child {
140142
mut process: tokio::process::Child,
141143
) -> (Self, impl Future<Output = ProcStopReason>) {
142144
let (group, handle) = monitor::group();
145+
let (exit_flag, exit_guard) = flag::guarded();
143146

144147
let stdout = LogTailer::tee(
145148
MAX_TAIL_LOG_LINES,
@@ -156,6 +159,7 @@ impl Child {
156159
let child = Self {
157160
channel: ChannelState::NotConnected,
158161
group,
162+
exit_flag: Some(exit_flag),
159163
stdout,
160164
stderr,
161165
stop_reason: Arc::clone(&stop_reason),
@@ -178,6 +182,8 @@ impl Child {
178182
}
179183
result = process.wait() => Self::exit_status_to_reason(result),
180184
};
185+
exit_guard.signal();
186+
181187
stop_reason.get_or_init(|| reason).clone()
182188
};
183189

@@ -234,10 +240,23 @@ impl Child {
234240
true
235241
}
236242

243+
fn spawn_watchdog(&mut self) {
244+
if let Some(exit_flag) = self.exit_flag.take() {
245+
let group = self.group.clone();
246+
let stop_reason = self.stop_reason.clone();
247+
tokio::spawn(async move {
248+
let exit_timeout =
249+
hyperactor::config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
250+
if tokio::time::timeout(exit_timeout, exit_flag).await.is_err() {
251+
let _ = stop_reason.set(ProcStopReason::Watchdog);
252+
group.fail();
253+
}
254+
});
255+
}
256+
}
257+
237258
#[hyperactor::instrument_infallible]
238259
fn post(&mut self, message: Allocator2Process) {
239-
// We're here simply assuming that if we're not connected, we're about to
240-
// be killed.
241260
if let ChannelState::Connected(channel) = &mut self.channel {
242261
channel.post(message);
243262
} else {
@@ -439,6 +458,7 @@ impl Alloc for ProcessAlloc {
439458
// for liveness.
440459
for (_index, child) in self.active.iter_mut() {
441460
child.post(Allocator2Process::StopAndExit(0));
461+
child.spawn_watchdog();
442462
}
443463

444464
self.running = false;

0 commit comments

Comments
 (0)