Skip to content

Commit b96f22e

Browse files
vidhyavfacebook-github-bot
authored andcommitted
Added watchdog timeout when stop is issued for stuck actors. (#336)
Summary: Pull Request resolved: #336 Allocators have a timeout to kill unresponsive procs when stop() is issued. Reviewed By: suo Differential Revision: D77303504
1 parent acd4b25 commit b96f22e

File tree

10 files changed

+321
-16
lines changed

10 files changed

+321
-16
lines changed

hyperactor/src/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ declare_attrs! {
3232
/// Message delivery timeout
3333
pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
3434

35+
/// Process exit timeout
36+
pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
37+
38+
/// Process exit timeout
39+
pub attr ACTOR_EXIT_TIMEOUT: Duration = Duration::from_millis(10);
40+
3541
/// Message acknowledgment interval
3642
pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
3743

@@ -86,6 +92,13 @@ pub fn from_env() -> Attrs {
8692

8793
config[IS_MANAGED_SUBPROCESS] = env::var("HYPERACTOR_MANAGED_SUBPROCESS").is_ok();
8894

95+
// Load message ack time interval
96+
if let Ok(val) = env::var("ACTOR_EXIT_TIMEOUT_MS") {
97+
if let Ok(parsed) = val.parse::<u64>() {
98+
config[ACTOR_EXIT_TIMEOUT] = Duration::from_millis(parsed);
99+
}
100+
}
101+
89102
config
90103
}
91104

@@ -112,6 +125,9 @@ pub fn merge(config: &mut Attrs, other: &Attrs) {
112125
if other.contains_key(MESSAGE_DELIVERY_TIMEOUT) {
113126
config[MESSAGE_DELIVERY_TIMEOUT] = other[MESSAGE_DELIVERY_TIMEOUT];
114127
}
128+
if other.contains_key(ACTOR_EXIT_TIMEOUT) {
129+
config[ACTOR_EXIT_TIMEOUT] = other[ACTOR_EXIT_TIMEOUT];
130+
}
115131
if other.contains_key(MESSAGE_ACK_TIME_INTERVAL) {
116132
config[MESSAGE_ACK_TIME_INTERVAL] = other[MESSAGE_ACK_TIME_INTERVAL];
117133
}

hyperactor/src/sync/monitor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl IntoFuture for Handle {
103103
/// the group will be aborted if any task fails or if the group is aborted.
104104
///
105105
/// The group is also aborted if the group itself is dropped.
106+
#[derive(Clone)]
106107
pub struct Group(Arc<Mutex<State>>);
107108

108109
/// The status of a group. Groups start out in [`Status::Running`]

hyperactor_mesh/src/alloc/mod.rs

Lines changed: 202 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,51 @@ pub trait Alloc {
228228
}
229229

230230
pub mod test_utils {
231+
use std::time::Duration;
232+
233+
use hyperactor::Actor;
234+
use hyperactor::Handler;
235+
use hyperactor::Instance;
236+
use hyperactor::Named;
231237
use tokio::sync::broadcast::Receiver;
232238
use tokio::sync::broadcast::Sender;
233239

234240
use super::*;
235241

242+
// This can't be defined under a `#[cfg(test)]` because there needs to
243+
// be an entry in the spawnable actor registry in the executable
244+
// 'hyperactor_mesh_test_bootstrap' for the `tests::process` actor
245+
// mesh test suite.
246+
#[derive(Debug)]
247+
#[hyperactor::export(
248+
spawn = true,
249+
handlers = [
250+
Wait
251+
],
252+
)]
253+
pub struct TestActor;
254+
255+
#[async_trait]
256+
impl Actor for TestActor {
257+
type Params = ();
258+
259+
async fn new(_params: Self::Params) -> Result<Self, anyhow::Error> {
260+
Ok(Self)
261+
}
262+
}
263+
264+
#[derive(Debug, Serialize, Deserialize, Named, Clone)]
265+
pub struct Wait();
266+
267+
#[async_trait]
268+
impl Handler<Wait> for TestActor {
269+
async fn handle(&mut self, _: &Instance<Self>, Wait(): Wait) -> Result<(), anyhow::Error> {
270+
loop {
271+
std::thread::sleep(Duration::from_secs(60));
272+
}
273+
}
274+
}
275+
236276
/// Test wrapper around MockAlloc to allow us to block next() calls since
237277
/// mockall doesn't support returning futures.
238278
pub struct MockAllocWrapper {
@@ -302,12 +342,29 @@ pub mod test_utils {
302342

303343
#[cfg(test)]
304344
pub(crate) mod testing {
345+
use core::panic;
305346
use std::collections::HashMap;
306347
use std::collections::HashSet;
307-
348+
use std::time::Duration;
349+
350+
use hyperactor::Mailbox;
351+
use hyperactor::actor::remote::Remote;
352+
use hyperactor::channel;
353+
use hyperactor::mailbox;
354+
use hyperactor::mailbox::BoxedMailboxSender;
355+
use hyperactor::mailbox::DialMailboxRouter;
356+
use hyperactor::mailbox::IntoBoxedMailboxSender;
357+
use hyperactor::mailbox::MailboxServer;
358+
use hyperactor::mailbox::UndeliverableMailboxSender;
359+
use hyperactor::proc::Proc;
360+
use hyperactor::reference::Reference;
308361
use ndslice::shape;
362+
use tokio::process::Command;
309363

310364
use super::*;
365+
use crate::alloc::test_utils::TestActor;
366+
use crate::alloc::test_utils::Wait;
367+
use crate::proc_mesh::mesh_agent::MeshAgentMessageClient;
311368

312369
#[macro_export]
313370
macro_rules! alloc_test_suite {
@@ -367,4 +424,148 @@ pub(crate) mod testing {
367424
assert!(alloc.next().await.is_none());
368425
assert_eq!(stopped, running);
369426
}
427+
428+
async fn spawn_proc(alloc: &ProcessAlloc) -> (DialMailboxRouter, Mailbox, Proc, ChannelAddr) {
429+
let (router_channel_addr, router_rx) = channel::serve(ChannelAddr::any(alloc.transport()))
430+
.await
431+
.map_err(|err| AllocatorError::Other(err.into()))
432+
.unwrap();
433+
let router =
434+
DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed());
435+
router
436+
.clone()
437+
.serve(router_rx, mailbox::monitored_return_handle());
438+
439+
let client_proc_id = ProcId(WorldId(format!("test_{}", alloc.world_id().name())), 0);
440+
let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(alloc.transport()))
441+
.await
442+
.map_err(|err| AllocatorError::Other(err.into()))
443+
.unwrap();
444+
let client_proc = Proc::new(
445+
client_proc_id.clone(),
446+
BoxedMailboxSender::new(router.clone()),
447+
);
448+
client_proc
449+
.clone()
450+
.serve(client_rx, mailbox::monitored_return_handle());
451+
router.bind(client_proc_id.clone().into(), client_proc_addr);
452+
(
453+
router,
454+
client_proc.attach("test_proc").unwrap(),
455+
client_proc,
456+
router_channel_addr,
457+
)
458+
}
459+
460+
async fn spawn_test_actor(
461+
rank: usize,
462+
client_proc: &Proc,
463+
client: &Mailbox,
464+
router_channel_addr: ChannelAddr,
465+
mesh_agent: ActorRef<MeshAgent>,
466+
) -> ActorRef<TestActor> {
467+
let supervisor = client_proc.attach("supervisor").unwrap();
468+
let (supervison_port, _) = supervisor.open_port();
469+
let (config_handle, _) = client.open_port();
470+
mesh_agent
471+
.configure(
472+
client,
473+
rank,
474+
router_channel_addr,
475+
supervison_port.bind(),
476+
HashMap::new(),
477+
config_handle.bind(),
478+
)
479+
.await
480+
.unwrap();
481+
let remote = Remote::collect();
482+
let actor_type = remote
483+
.name_of::<TestActor>()
484+
.ok_or(anyhow::anyhow!("actor not registered"))
485+
.unwrap()
486+
.to_string();
487+
let params = &();
488+
let (completed_handle, mut completed_receiver) = mailbox::open_port(client);
489+
// gspawn actor
490+
mesh_agent
491+
.gspawn(
492+
client,
493+
actor_type,
494+
"Stuck".to_string(),
495+
bincode::serialize(params).unwrap(),
496+
completed_handle.bind(),
497+
)
498+
.await
499+
.unwrap();
500+
let (_, actor_id) = completed_receiver.recv().await.unwrap();
501+
ActorRef::<TestActor>::attest(actor_id)
502+
}
503+
504+
/// In order to simulate stuckness, we have to do two things:
505+
/// An actor that is blocked forever AND
506+
/// a proc that does not time out when it is asked to wait for
507+
/// a stuck actor.
508+
#[tokio::test]
509+
async fn test_allocator_stuck_task() {
510+
// Override config.
511+
// Use temporary config for this test
512+
let config = hyperactor::config::global::lock();
513+
let _guard = config.override_key(
514+
hyperactor::config::PROCESS_EXIT_TIMEOUT,
515+
Duration::from_secs(1),
516+
);
517+
518+
let mut command =
519+
Command::new(buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap());
520+
command.env("ACTOR_EXIT_TIMEOUT_MS", "300000");
521+
let mut allocator = ProcessAllocator::new(command);
522+
let mut alloc = allocator
523+
.allocate(AllocSpec {
524+
shape: shape! { replica = 1 },
525+
constraints: Default::default(),
526+
})
527+
.await
528+
.unwrap();
529+
530+
// Get everything up into running state. We require that we get
531+
let mut procs = HashMap::new();
532+
let mut running = HashSet::new();
533+
let mut actor_ref = None;
534+
let (router, client, client_proc, router_addr) = spawn_proc(&alloc).await;
535+
while running.is_empty() {
536+
match alloc.next().await.unwrap() {
537+
ProcState::Created { proc_id, coords } => {
538+
procs.insert(proc_id, coords);
539+
}
540+
ProcState::Running {
541+
proc_id,
542+
mesh_agent,
543+
addr,
544+
} => {
545+
router.bind(Reference::Proc(proc_id.clone()), addr.clone());
546+
547+
assert!(procs.contains_key(&proc_id));
548+
assert!(!running.contains(&proc_id));
549+
550+
actor_ref = Some(
551+
spawn_test_actor(0, &client_proc, &client, router_addr, mesh_agent).await,
552+
);
553+
running.insert(proc_id);
554+
break;
555+
}
556+
event => panic!("unexpected event: {:?}", event),
557+
}
558+
}
559+
assert!(actor_ref.unwrap().send(&client, Wait()).is_ok());
560+
561+
// There is a stuck actor! We should get a watchdog failure.
562+
alloc.stop().await.unwrap();
563+
let mut stopped = HashSet::new();
564+
while let Some(ProcState::Stopped { proc_id, reason }) = alloc.next().await {
565+
assert_eq!(reason, ProcStopReason::Watchdog);
566+
stopped.insert(proc_id);
567+
}
568+
assert!(alloc.next().await.is_none());
569+
assert_eq!(stopped, running);
570+
}
370571
}

hyperactor_mesh/src/alloc/process.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use hyperactor::channel::ChannelTx;
2828
use hyperactor::channel::Rx;
2929
use hyperactor::channel::Tx;
3030
use hyperactor::channel::TxStatus;
31+
use hyperactor::sync::flag;
3132
use hyperactor::sync::monitor;
3233
use ndslice::Shape;
3334
use tokio::io;
@@ -130,8 +131,9 @@ enum ChannelState {
130131
struct Child {
131132
channel: ChannelState,
132133
group: monitor::Group,
133-
stdout: LogTailer,
134+
exit_flag: Option<flag::Flag>,
134135
stderr: LogTailer,
136+
stdout: LogTailer,
135137
stop_reason: Arc<OnceLock<ProcStopReason>>,
136138
}
137139

@@ -140,6 +142,7 @@ impl Child {
140142
mut process: tokio::process::Child,
141143
) -> (Self, impl Future<Output = ProcStopReason>) {
142144
let (group, handle) = monitor::group();
145+
let (exit_flag, exit_guard) = flag::guarded();
143146

144147
let stdout = LogTailer::tee(
145148
MAX_TAIL_LOG_LINES,
@@ -156,6 +159,7 @@ impl Child {
156159
let child = Self {
157160
channel: ChannelState::NotConnected,
158161
group,
162+
exit_flag: Some(exit_flag),
159163
stdout,
160164
stderr,
161165
stop_reason: Arc::clone(&stop_reason),
@@ -178,6 +182,8 @@ impl Child {
178182
}
179183
result = process.wait() => Self::exit_status_to_reason(result),
180184
};
185+
exit_guard.signal();
186+
181187
stop_reason.get_or_init(|| reason).clone()
182188
};
183189

@@ -234,10 +240,23 @@ impl Child {
234240
true
235241
}
236242

243+
fn spawn_watchdog(&mut self) {
244+
if let Some(exit_flag) = self.exit_flag.take() {
245+
let group = self.group.clone();
246+
let stop_reason = self.stop_reason.clone();
247+
tokio::spawn(async move {
248+
let exit_timeout =
249+
hyperactor::config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
250+
if tokio::time::timeout(exit_timeout, exit_flag).await.is_err() {
251+
let _ = stop_reason.set(ProcStopReason::Watchdog);
252+
group.fail();
253+
}
254+
});
255+
}
256+
}
257+
237258
#[hyperactor::instrument_infallible]
238259
fn post(&mut self, message: Allocator2Process) {
239-
// We're here simply assuming that if we're not connected, we're about to
240-
// be killed.
241260
if let ChannelState::Connected(channel) = &mut self.channel {
242261
channel.post(message);
243262
} else {
@@ -439,6 +458,7 @@ impl Alloc for ProcessAlloc {
439458
// for liveness.
440459
for (_index, child) in self.active.iter_mut() {
441460
child.post(Allocator2Process::StopAndExit(0));
461+
child.spawn_watchdog();
442462
}
443463

444464
self.running = false;

0 commit comments

Comments
 (0)