Skip to content

Commit fe0a267

Browse files
vidhyavfacebook-github-bot
authored andcommitted
Added watchdog timeout when stop is issued for stuck actors. (#336)
Summary: Pull Request resolved: #336 Allocators have a timeout to kill unresponsive procs when stop() is issued. Reviewed By: suo Differential Revision: D77303504
1 parent 3ad9b5d commit fe0a267

File tree

10 files changed

+324
-16
lines changed

10 files changed

+324
-16
lines changed

hyperactor/src/config.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ declare_attrs! {
3232
/// Message delivery timeout
3333
pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
3434

35+
/// Process exit timeout
36+
pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
37+
38+
/// Process exit timeout
39+
pub attr ACTOR_EXIT_TIMEOUT: Duration = Duration::from_millis(10);
40+
3541
/// Message acknowledgment interval
3642
pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
3743

@@ -81,6 +87,14 @@ pub fn from_env() -> Attrs {
8187
}
8288
}
8389

90+
// Load message ack time interval
91+
if let Ok(val) = env::var("ACTOR_EXIT_TIMEOUT_MS") {
92+
if let Ok(parsed) = val.parse::<u64>() {
93+
config[ACTOR_EXIT_TIMEOUT] = Duration::from_millis(parsed);
94+
}
95+
}
96+
97+
8498
config
8599
}
86100

@@ -107,6 +121,9 @@ pub fn merge(config: &mut Attrs, other: &Attrs) {
107121
if other.contains_key(MESSAGE_DELIVERY_TIMEOUT) {
108122
config[MESSAGE_DELIVERY_TIMEOUT] = other[MESSAGE_DELIVERY_TIMEOUT];
109123
}
124+
if other.contains_key(ACTOR_EXIT_TIMEOUT) {
125+
config[ACTOR_EXIT_TIMEOUT] = other[ACTOR_EXIT_TIMEOUT];
126+
}
110127
if other.contains_key(MESSAGE_ACK_TIME_INTERVAL) {
111128
config[MESSAGE_ACK_TIME_INTERVAL] = other[MESSAGE_ACK_TIME_INTERVAL];
112129
}

hyperactor/src/sync/monitor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl IntoFuture for Handle {
103103
/// the group will be aborted if any task fails or if the group is aborted.
104104
///
105105
/// The group is also aborted if the group itself is dropped.
106+
#[derive(Clone)]
106107
pub struct Group(Arc<Mutex<State>>);
107108

108109
/// The status of a group. Groups start out in [`Status::Running`]

hyperactor_mesh/src/alloc/mod.rs

Lines changed: 204 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,51 @@ pub trait Alloc {
235235
}
236236

237237
pub mod test_utils {
238+
use std::time::Duration;
239+
240+
use hyperactor::Actor;
241+
use hyperactor::Handler;
242+
use hyperactor::Instance;
243+
use hyperactor::Named;
238244
use tokio::sync::broadcast::Receiver;
239245
use tokio::sync::broadcast::Sender;
240246

241247
use super::*;
242248

249+
// This can't be defined under a `#[cfg(test)]` because there needs to
250+
// be an entry in the spawnable actor registry in the executable
251+
// 'hyperactor_mesh_test_bootstrap' for the `tests::process` actor
252+
// mesh test suite.
253+
#[derive(Debug)]
254+
#[hyperactor::export(
255+
spawn = true,
256+
handlers = [
257+
Wait
258+
],
259+
)]
260+
pub struct TestActor;
261+
262+
#[async_trait]
263+
impl Actor for TestActor {
264+
type Params = ();
265+
266+
async fn new(_params: Self::Params) -> Result<Self, anyhow::Error> {
267+
Ok(Self)
268+
}
269+
}
270+
271+
#[derive(Debug, Serialize, Deserialize, Named, Clone)]
272+
pub struct Wait();
273+
274+
#[async_trait]
275+
impl Handler<Wait> for TestActor {
276+
async fn handle(&mut self, _: &Instance<Self>, Wait(): Wait) -> Result<(), anyhow::Error> {
277+
loop {
278+
std::thread::sleep(Duration::from_secs(60));
279+
}
280+
}
281+
}
282+
243283
/// Test wrapper around MockAlloc to allow us to block next() calls since
244284
/// mockall doesn't support returning futures.
245285
pub struct MockAllocWrapper {
@@ -309,12 +349,29 @@ pub mod test_utils {
309349

310350
#[cfg(test)]
311351
pub(crate) mod testing {
352+
use core::panic;
312353
use std::collections::HashMap;
313354
use std::collections::HashSet;
314-
355+
use std::time::Duration;
356+
357+
use hyperactor::Mailbox;
358+
use hyperactor::actor::remote::Remote;
359+
use hyperactor::channel;
360+
use hyperactor::mailbox;
361+
use hyperactor::mailbox::BoxedMailboxSender;
362+
use hyperactor::mailbox::DialMailboxRouter;
363+
use hyperactor::mailbox::IntoBoxedMailboxSender;
364+
use hyperactor::mailbox::MailboxServer;
365+
use hyperactor::mailbox::UndeliverableMailboxSender;
366+
use hyperactor::proc::Proc;
367+
use hyperactor::reference::Reference;
315368
use ndslice::shape;
369+
use tokio::process::Command;
316370

317371
use super::*;
372+
use crate::alloc::test_utils::TestActor;
373+
use crate::alloc::test_utils::Wait;
374+
use crate::proc_mesh::mesh_agent::MeshAgentMessageClient;
318375

319376
#[macro_export]
320377
macro_rules! alloc_test_suite {
@@ -376,4 +433,150 @@ pub(crate) mod testing {
376433
assert!(alloc.next().await.is_none());
377434
assert_eq!(stopped, running);
378435
}
436+
437+
async fn spawn_proc(alloc: &ProcessAlloc) -> (DialMailboxRouter, Mailbox, Proc, ChannelAddr) {
438+
let (router_channel_addr, router_rx) = channel::serve(ChannelAddr::any(alloc.transport()))
439+
.await
440+
.map_err(|err| AllocatorError::Other(err.into()))
441+
.unwrap();
442+
let router =
443+
DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed());
444+
router
445+
.clone()
446+
.serve(router_rx, mailbox::monitored_return_handle());
447+
448+
let client_proc_id = ProcId(WorldId(format!("test_{}", alloc.world_id().name())), 0);
449+
let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(alloc.transport()))
450+
.await
451+
.map_err(|err| AllocatorError::Other(err.into()))
452+
.unwrap();
453+
let client_proc = Proc::new(
454+
client_proc_id.clone(),
455+
BoxedMailboxSender::new(router.clone()),
456+
);
457+
client_proc
458+
.clone()
459+
.serve(client_rx, mailbox::monitored_return_handle());
460+
router.bind(client_proc_id.clone().into(), client_proc_addr);
461+
(
462+
router,
463+
client_proc.attach("test_proc").unwrap(),
464+
client_proc,
465+
router_channel_addr,
466+
)
467+
}
468+
469+
async fn spawn_test_actor(
470+
rank: usize,
471+
client_proc: &Proc,
472+
client: &Mailbox,
473+
router_channel_addr: ChannelAddr,
474+
mesh_agent: ActorRef<MeshAgent>,
475+
) -> ActorRef<TestActor> {
476+
let supervisor = client_proc.attach("supervisor").unwrap();
477+
let (supervison_port, _) = supervisor.open_port();
478+
let (config_handle, _) = client.open_port();
479+
mesh_agent
480+
.configure(
481+
client,
482+
rank,
483+
router_channel_addr,
484+
supervison_port.bind(),
485+
HashMap::new(),
486+
config_handle.bind(),
487+
)
488+
.await
489+
.unwrap();
490+
let remote = Remote::collect();
491+
let actor_type = remote
492+
.name_of::<TestActor>()
493+
.ok_or(anyhow::anyhow!("actor not registered"))
494+
.unwrap()
495+
.to_string();
496+
let params = &();
497+
let (completed_handle, mut completed_receiver) = mailbox::open_port(client);
498+
// gspawn actor
499+
mesh_agent
500+
.gspawn(
501+
client,
502+
actor_type,
503+
"Stuck".to_string(),
504+
bincode::serialize(params).unwrap(),
505+
completed_handle.bind(),
506+
)
507+
.await
508+
.unwrap();
509+
let (_, actor_id) = completed_receiver.recv().await.unwrap();
510+
ActorRef::<TestActor>::attest(actor_id)
511+
}
512+
513+
/// In order to simulate stuckness, we have to do two things:
514+
/// An actor that is blocked forever AND
515+
/// a proc that does not time out when it is asked to wait for
516+
/// a stuck actor.
517+
#[tokio::test]
518+
async fn test_allocator_stuck_task() {
519+
// Override config.
520+
// Use temporary config for this test
521+
let config = hyperactor::config::global::lock();
522+
let _guard = config.override_key(
523+
hyperactor::config::PROCESS_EXIT_TIMEOUT,
524+
Duration::from_secs(1),
525+
);
526+
527+
let mut command =
528+
Command::new(buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap());
529+
command.env("ACTOR_EXIT_TIMEOUT_MS", "300000");
530+
let mut allocator = ProcessAllocator::new(command);
531+
let mut alloc = allocator
532+
.allocate(AllocSpec {
533+
shape: shape! { replica = 1 },
534+
constraints: Default::default(),
535+
})
536+
.await
537+
.unwrap();
538+
539+
// Get everything up into running state. We require that we get
540+
let mut procs = HashMap::new();
541+
let mut running = HashSet::new();
542+
let mut actor_ref = None;
543+
let (router, client, client_proc, router_addr) = spawn_proc(&alloc).await;
544+
while running.is_empty() {
545+
match alloc.next().await.unwrap() {
546+
ProcState::Created {
547+
proc_id, coords, ..
548+
} => {
549+
procs.insert(proc_id, coords);
550+
}
551+
ProcState::Running {
552+
proc_id,
553+
mesh_agent,
554+
addr,
555+
} => {
556+
router.bind(Reference::Proc(proc_id.clone()), addr.clone());
557+
558+
assert!(procs.contains_key(&proc_id));
559+
assert!(!running.contains(&proc_id));
560+
561+
actor_ref = Some(
562+
spawn_test_actor(0, &client_proc, &client, router_addr, mesh_agent).await,
563+
);
564+
running.insert(proc_id);
565+
break;
566+
}
567+
event => panic!("unexpected event: {:?}", event),
568+
}
569+
}
570+
assert!(actor_ref.unwrap().send(&client, Wait()).is_ok());
571+
572+
// There is a stuck actor! We should get a watchdog failure.
573+
alloc.stop().await.unwrap();
574+
let mut stopped = HashSet::new();
575+
while let Some(ProcState::Stopped { proc_id, reason }) = alloc.next().await {
576+
assert_eq!(reason, ProcStopReason::Watchdog);
577+
stopped.insert(proc_id);
578+
}
579+
assert!(alloc.next().await.is_none());
580+
assert_eq!(stopped, running);
581+
}
379582
}

hyperactor_mesh/src/alloc/process.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use hyperactor::channel::ChannelTx;
2828
use hyperactor::channel::Rx;
2929
use hyperactor::channel::Tx;
3030
use hyperactor::channel::TxStatus;
31+
use hyperactor::sync::flag;
3132
use hyperactor::sync::monitor;
3233
use ndslice::Shape;
3334
use tokio::io;
@@ -130,8 +131,9 @@ enum ChannelState {
130131
struct Child {
131132
channel: ChannelState,
132133
group: monitor::Group,
133-
stdout: LogTailer,
134+
exit_flag: Option<flag::Flag>,
134135
stderr: LogTailer,
136+
stdout: LogTailer,
135137
stop_reason: Arc<OnceLock<ProcStopReason>>,
136138
}
137139

@@ -140,6 +142,7 @@ impl Child {
140142
mut process: tokio::process::Child,
141143
) -> (Self, impl Future<Output = ProcStopReason>) {
142144
let (group, handle) = monitor::group();
145+
let (exit_flag, exit_guard) = flag::guarded();
143146

144147
let stdout = LogTailer::tee(
145148
MAX_TAIL_LOG_LINES,
@@ -156,6 +159,7 @@ impl Child {
156159
let child = Self {
157160
channel: ChannelState::NotConnected,
158161
group,
162+
exit_flag: Some(exit_flag),
159163
stdout,
160164
stderr,
161165
stop_reason: Arc::clone(&stop_reason),
@@ -178,6 +182,8 @@ impl Child {
178182
}
179183
result = process.wait() => Self::exit_status_to_reason(result),
180184
};
185+
exit_guard.signal();
186+
181187
stop_reason.get_or_init(|| reason).clone()
182188
};
183189

@@ -234,10 +240,23 @@ impl Child {
234240
true
235241
}
236242

243+
fn spawn_watchdog(&mut self) {
244+
if let Some(exit_flag) = self.exit_flag.take() {
245+
let group = self.group.clone();
246+
let stop_reason = self.stop_reason.clone();
247+
tokio::spawn(async move {
248+
let exit_timeout =
249+
hyperactor::config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
250+
if tokio::time::timeout(exit_timeout, exit_flag).await.is_err() {
251+
let _ = stop_reason.set(ProcStopReason::Watchdog);
252+
group.fail();
253+
}
254+
});
255+
}
256+
}
257+
237258
#[hyperactor::instrument_infallible]
238259
fn post(&mut self, message: Allocator2Process) {
239-
// We're here simply assuming that if we're not connected, we're about to
240-
// be killed.
241260
if let ChannelState::Connected(channel) = &mut self.channel {
242261
channel.post(message);
243262
} else {
@@ -446,6 +465,7 @@ impl Alloc for ProcessAlloc {
446465
// for liveness.
447466
for (_index, child) in self.active.iter_mut() {
448467
child.post(Allocator2Process::StopAndExit(0));
468+
child.spawn_watchdog();
449469
}
450470

451471
self.running = false;

0 commit comments

Comments
 (0)