Skip to content

Commit 2dd8505

Browse files
vidhyavfacebook-github-bot
authored andcommitted
Added watchdog timeout when stop is issued for stuck actors. (#336)
Summary: Pull Request resolved: #336 Allocators have a timeout to kill unresponsive procs when stop() is issued. Differential Revision: D77303504
1 parent 6067347 commit 2dd8505

File tree

6 files changed

+254
-8
lines changed

6 files changed

+254
-8
lines changed

hyperactor/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ declare_attrs! {
3232
/// Message delivery timeout
3333
pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);
3434

35+
/// Process exit timeout
36+
pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);
37+
3538
/// Message acknowledgment interval
3639
pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);
3740

hyperactor/src/sync/monitor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ impl IntoFuture for Handle {
9999
}
100100
}
101101

102+
#[derive(Clone)]
102103
/// A group of tasks that share a common fate. Any tasks that are spawned onto
103104
/// the group will be aborted if any task fails or if the group is aborted.
104105
///

hyperactor_mesh/src/alloc/mod.rs

Lines changed: 197 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,51 @@ pub trait Alloc {
228228
}
229229

230230
pub mod test_utils {
231+
use std::time::Duration;
232+
233+
use hyperactor::Actor;
234+
use hyperactor::Handler;
235+
use hyperactor::Instance;
236+
use hyperactor::Named;
231237
use tokio::sync::broadcast::Receiver;
232238
use tokio::sync::broadcast::Sender;
233239

234240
use super::*;
235241

242+
// This can't be defined under a `#[cfg(test)]` because there needs to
243+
// be an entry in the spawnable actor registry in the executable
244+
// 'hyperactor_mesh_test_bootstrap' for the `tests::process` actor
245+
// mesh test suite.
246+
#[derive(Debug)]
247+
#[hyperactor::export(
248+
spawn = true,
249+
handlers = [
250+
Wait
251+
],
252+
)]
253+
pub struct TestActor;
254+
255+
#[async_trait]
256+
impl Actor for TestActor {
257+
type Params = ();
258+
259+
async fn new(_params: Self::Params) -> Result<Self, anyhow::Error> {
260+
Ok(Self)
261+
}
262+
}
263+
264+
#[derive(Debug, Serialize, Deserialize, Named, Clone)]
265+
pub struct Wait();
266+
267+
#[async_trait]
268+
impl Handler<Wait> for TestActor {
269+
async fn handle(&mut self, _: &Instance<Self>, Wait(): Wait) -> Result<(), anyhow::Error> {
270+
#[allow(clippy::disallowed_methods)]
271+
tokio::time::sleep(Duration::from_secs(60)).await;
272+
Ok(())
273+
}
274+
}
275+
236276
/// Test wrapper around MockAlloc to allow us to block next() calls since
237277
/// mockall doesn't support returning futures.
238278
pub struct MockAllocWrapper {
@@ -302,12 +342,29 @@ pub mod test_utils {
302342

303343
#[cfg(test)]
304344
pub(crate) mod testing {
345+
use core::panic;
305346
use std::collections::HashMap;
306347
use std::collections::HashSet;
307-
348+
use std::time::Duration;
349+
350+
use hyperactor::Mailbox;
351+
use hyperactor::actor::remote::Remote;
352+
use hyperactor::channel;
353+
use hyperactor::mailbox;
354+
use hyperactor::mailbox::BoxedMailboxSender;
355+
use hyperactor::mailbox::DialMailboxRouter;
356+
use hyperactor::mailbox::IntoBoxedMailboxSender;
357+
use hyperactor::mailbox::MailboxServer;
358+
use hyperactor::mailbox::UndeliverableMailboxSender;
359+
use hyperactor::proc::Proc;
360+
use hyperactor::reference::Reference;
308361
use ndslice::shape;
362+
use tokio::process::Command;
309363

310364
use super::*;
365+
use crate::alloc::test_utils::TestActor;
366+
use crate::alloc::test_utils::Wait;
367+
use crate::proc_mesh::mesh_agent::MeshAgentMessageClient;
311368

312369
#[macro_export]
313370
macro_rules! alloc_test_suite {
@@ -367,4 +424,143 @@ pub(crate) mod testing {
367424
assert!(alloc.next().await.is_none());
368425
assert_eq!(stopped, running);
369426
}
427+
428+
async fn spawn_proc(alloc: &ProcessAlloc) -> (DialMailboxRouter, Mailbox, Proc, ChannelAddr) {
429+
let (router_channel_addr, router_rx) = channel::serve(ChannelAddr::any(alloc.transport()))
430+
.await
431+
.map_err(|err| AllocatorError::Other(err.into()))
432+
.unwrap();
433+
let router =
434+
DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed());
435+
router
436+
.clone()
437+
.serve(router_rx, mailbox::monitored_return_handle());
438+
439+
let client_proc_id = ProcId(WorldId(format!("test_{}", alloc.world_id().name())), 0);
440+
let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(alloc.transport()))
441+
.await
442+
.map_err(|err| AllocatorError::Other(err.into()))
443+
.unwrap();
444+
let client_proc = Proc::new(
445+
client_proc_id.clone(),
446+
BoxedMailboxSender::new(router.clone()),
447+
);
448+
client_proc
449+
.clone()
450+
.serve(client_rx, mailbox::monitored_return_handle());
451+
router.bind(client_proc_id.clone().into(), client_proc_addr);
452+
(
453+
router,
454+
client_proc.attach("test_proc").unwrap(),
455+
client_proc,
456+
router_channel_addr,
457+
)
458+
}
459+
460+
async fn spawn_test_actor(
461+
rank: usize,
462+
client_proc: &Proc,
463+
client: &Mailbox,
464+
router_channel_addr: ChannelAddr,
465+
mesh_agent: ActorRef<MeshAgent>,
466+
) -> ActorRef<TestActor> {
467+
let supervisor = client_proc.attach("supervisor").unwrap();
468+
let (supervison_port, _) = supervisor.open_port();
469+
let (config_handle, _) = client.open_port();
470+
mesh_agent
471+
.configure(
472+
client,
473+
rank,
474+
router_channel_addr,
475+
supervison_port.bind(),
476+
HashMap::new(),
477+
config_handle.bind(),
478+
)
479+
.await
480+
.unwrap();
481+
let remote = Remote::collect();
482+
let actor_type = remote
483+
.name_of::<TestActor>()
484+
.ok_or(anyhow::anyhow!("actor not registered"))
485+
.unwrap()
486+
.to_string();
487+
let params = &();
488+
let (completed_handle, mut completed_receiver) = mailbox::open_port(client);
489+
// gspawn actor
490+
mesh_agent
491+
.gspawn(
492+
client,
493+
actor_type,
494+
"Stuck".to_string(),
495+
bincode::serialize(params).unwrap(),
496+
completed_handle.bind(),
497+
)
498+
.await
499+
.unwrap();
500+
let (_, actor_id) = completed_receiver.recv().await.unwrap();
501+
ActorRef::<TestActor>::attest(actor_id)
502+
}
503+
504+
#[tokio::test]
505+
async fn test_allocator_stuck_task() {
506+
let mut allocator = ProcessAllocator::new(Command::new(
507+
buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap(),
508+
));
509+
// Override config.
510+
// Use temporary config for this test
511+
let config = hyperactor::config::global::lock();
512+
let _guard = config.override_key(
513+
hyperactor::config::PROCESS_EXIT_TIMEOUT,
514+
Duration::from_secs(1),
515+
);
516+
517+
let mut alloc = allocator
518+
.allocate(AllocSpec {
519+
shape: shape! { replica = 1 },
520+
constraints: Default::default(),
521+
})
522+
.await
523+
.unwrap();
524+
525+
// Get everything up into running state. We require that we get
526+
let mut procs = HashMap::new();
527+
let mut running = HashSet::new();
528+
let mut actor_ref = None;
529+
let (router, client, client_proc, router_addr) = spawn_proc(&alloc).await;
530+
while running.is_empty() {
531+
match alloc.next().await.unwrap() {
532+
ProcState::Created { proc_id, coords } => {
533+
procs.insert(proc_id, coords);
534+
}
535+
ProcState::Running {
536+
proc_id,
537+
mesh_agent,
538+
addr,
539+
} => {
540+
router.bind(Reference::Proc(proc_id.clone()), addr.clone());
541+
542+
assert!(procs.contains_key(&proc_id));
543+
assert!(!running.contains(&proc_id));
544+
545+
actor_ref = Some(
546+
spawn_test_actor(0, &client_proc, &client, router_addr, mesh_agent).await,
547+
);
548+
running.insert(proc_id);
549+
break;
550+
}
551+
event => panic!("unexpected event: {:?}", event),
552+
}
553+
}
554+
assert!(actor_ref.unwrap().send(&client, Wait()).is_ok());
555+
556+
// There is a stuck actor! We should get a watchdog failure.
557+
alloc.stop().await.unwrap();
558+
let mut stopped = HashSet::new();
559+
while let Some(ProcState::Stopped { proc_id, reason }) = alloc.next().await {
560+
assert_eq!(reason, ProcStopReason::Watchdog);
561+
stopped.insert(proc_id);
562+
}
563+
assert!(alloc.next().await.is_none());
564+
assert_eq!(stopped, running);
565+
}
370566
}

hyperactor_mesh/src/alloc/process.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::sync::OnceLock;
1818

1919
use async_trait::async_trait;
2020
use enum_as_inner::EnumAsInner;
21+
use futures::executor;
2122
use hyperactor::ProcId;
2223
use hyperactor::WorldId;
2324
use hyperactor::channel;
@@ -28,8 +29,10 @@ use hyperactor::channel::ChannelTx;
2829
use hyperactor::channel::Rx;
2930
use hyperactor::channel::Tx;
3031
use hyperactor::channel::TxStatus;
32+
use hyperactor::sync::flag;
3133
use hyperactor::sync::monitor;
3234
use ndslice::Shape;
35+
use nix::libc::exit;
3336
use tokio::io;
3437
use tokio::process::Command;
3538
use tokio::sync::Mutex;
@@ -130,8 +133,9 @@ enum ChannelState {
130133
struct Child {
131134
channel: ChannelState,
132135
group: monitor::Group,
133-
stdout: LogTailer,
136+
exit_flag: Option<flag::Flag>,
134137
stderr: LogTailer,
138+
stdout: LogTailer,
135139
stop_reason: Arc<OnceLock<ProcStopReason>>,
136140
}
137141

@@ -140,6 +144,7 @@ impl Child {
140144
mut process: tokio::process::Child,
141145
) -> (Self, impl Future<Output = ProcStopReason>) {
142146
let (group, handle) = monitor::group();
147+
let (exit_flag, exit_guard) = flag::guarded();
143148

144149
let stdout = LogTailer::tee(
145150
MAX_TAIL_LOG_LINES,
@@ -156,6 +161,7 @@ impl Child {
156161
let child = Self {
157162
channel: ChannelState::NotConnected,
158163
group,
164+
exit_flag: Some(exit_flag),
159165
stdout,
160166
stderr,
161167
stop_reason: Arc::clone(&stop_reason),
@@ -178,6 +184,7 @@ impl Child {
178184
}
179185
result = process.wait() => Self::exit_status_to_reason(result),
180186
};
187+
exit_guard.signal();
181188
stop_reason.get_or_init(|| reason).clone()
182189
};
183190

@@ -240,6 +247,18 @@ impl Child {
240247
// be killed.
241248
if let ChannelState::Connected(channel) = &mut self.channel {
242249
channel.post(message);
250+
if let Some(exit_flag) = self.exit_flag.take() {
251+
let group = self.group.clone();
252+
let stop_reason = self.stop_reason.clone();
253+
tokio::spawn(async move {
254+
let exit_timeout =
255+
hyperactor::config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
256+
if tokio::time::timeout(exit_timeout, exit_flag).await.is_err() {
257+
let _ = stop_reason.set(ProcStopReason::Watchdog);
258+
group.fail();
259+
}
260+
});
261+
}
243262
} else {
244263
self.stop(ProcStopReason::Watchdog);
245264
}

hyperactor_mesh/src/alloc/remoteprocess.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,7 +1053,7 @@ mod test {
10531053
}
10541054
}
10551055

1056-
fn set_procstate_execptations(alloc: &mut MockAlloc, shape: Shape) {
1056+
fn set_procstate_expectations(alloc: &mut MockAlloc, shape: Shape) {
10571057
let alloc_len = shape.slice().len();
10581058
alloc.expect_shape().return_const(shape.clone());
10591059
for i in 0..alloc_len {
@@ -1108,7 +1108,7 @@ mod test {
11081108
alloc.expect_world_id().return_const(world_id.clone());
11091109
alloc.expect_shape().return_const(spec.shape.clone());
11101110

1111-
set_procstate_execptations(&mut alloc, spec.shape.clone());
1111+
set_procstate_expectations(&mut alloc, spec.shape.clone());
11121112

11131113
// final none
11141114
alloc.expect_next().return_const(None);
@@ -1247,7 +1247,7 @@ mod test {
12471247
alloc.alloc.expect_world_id().return_const(world_id.clone());
12481248
alloc.alloc.expect_shape().return_const(spec.shape.clone());
12491249

1250-
set_procstate_execptations(&mut alloc.alloc, spec.shape.clone());
1250+
set_procstate_expectations(&mut alloc.alloc, spec.shape.clone());
12511251

12521252
alloc.alloc.expect_next().return_const(None);
12531253
alloc.alloc.expect_stop().times(1).return_once(|| Ok(()));
@@ -1324,7 +1324,7 @@ mod test {
13241324
.return_const(world_id.clone());
13251325
alloc1.alloc.expect_shape().return_const(spec.shape.clone());
13261326

1327-
set_procstate_execptations(&mut alloc1.alloc, spec.shape.clone());
1327+
set_procstate_expectations(&mut alloc1.alloc, spec.shape.clone());
13281328
alloc1.alloc.expect_next().return_const(None);
13291329
alloc1.alloc.expect_stop().times(1).return_once(|| Ok(()));
13301330
// second allocation
@@ -1339,7 +1339,7 @@ mod test {
13391339
.expect_world_id()
13401340
.return_const(world_id.clone());
13411341
alloc2.alloc.expect_shape().return_const(spec.shape.clone());
1342-
set_procstate_execptations(&mut alloc2.alloc, spec.shape.clone());
1342+
set_procstate_expectations(&mut alloc2.alloc, spec.shape.clone());
13431343
alloc2.alloc.expect_next().return_const(None);
13441344
alloc2.alloc.expect_stop().times(1).return_once(|| Ok(()));
13451345

@@ -1444,7 +1444,7 @@ mod test {
14441444
alloc.alloc.expect_world_id().return_const(world_id.clone());
14451445
alloc.alloc.expect_shape().return_const(spec.shape.clone());
14461446

1447-
set_procstate_execptations(&mut alloc.alloc, spec.shape.clone());
1447+
set_procstate_expectations(&mut alloc.alloc, spec.shape.clone());
14481448

14491449
alloc.alloc.expect_next().return_const(None);
14501450
// we expect a stop due to the failure

0 commit comments

Comments
 (0)