Skip to content

Added watchdog timeout when stop is issued for stuck actors. #336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions hyperactor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ declare_attrs! {
/// Message delivery timeout
pub attr MESSAGE_DELIVERY_TIMEOUT: Duration = Duration::from_secs(30);

/// Timeout used by allocator for stopping a proc.
pub attr PROCESS_EXIT_TIMEOUT: Duration = Duration::from_secs(10);

/// Message acknowledgment interval
pub attr MESSAGE_ACK_TIME_INTERVAL: Duration = Duration::from_millis(500);

Expand Down
1 change: 1 addition & 0 deletions hyperactor/src/sync/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl IntoFuture for Handle {
/// the group will be aborted if any task fails or if the group is aborted.
///
/// The group is also aborted if the group itself is dropped.
#[derive(Clone)]
pub struct Group(Arc<Mutex<State>>);

/// The status of a group. Groups start out in [`Status::Running`]
Expand Down
1 change: 1 addition & 0 deletions hyperactor_mesh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ futures = { version = "0.3.30", features = ["async-await", "compat"] }
hyperactor = { version = "0.0.0", path = "../hyperactor" }
hyperactor_mesh_macros = { version = "0.0.0", path = "../hyperactor_mesh_macros" }
hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" }
libc = "0.2.139"
mockall = "0.13.1"
ndslice = { version = "0.0.0", path = "../ndslice" }
nix = { version = "0.29.0", features = ["dir", "event", "hostname", "inotify", "ioctl", "mman", "mount", "net", "poll", "ptrace", "reboot", "resource", "sched", "signal", "term", "time", "user", "zerocopy"] }
Expand Down
214 changes: 213 additions & 1 deletion hyperactor_mesh/src/alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,62 @@ pub trait Alloc {
}

pub mod test_utils {
use std::time::Duration;

use hyperactor::Actor;
use hyperactor::Context;
use hyperactor::Handler;
use hyperactor::Named;
use libc::atexit;
use tokio::sync::broadcast::Receiver;
use tokio::sync::broadcast::Sender;

use super::*;

extern "C" fn exit_handler() {
loop {
#[allow(clippy::disallowed_methods)]
std::thread::sleep(Duration::from_secs(60));
}
}

// This can't be defined under a `#[cfg(test)]` because there needs to
// be an entry in the spawnable actor registry in the executable
// 'hyperactor_mesh_test_bootstrap' for the `tests::process` actor
// mesh test suite.
#[derive(Debug)]
#[hyperactor::export(
spawn = true,
handlers = [
Wait
],
)]
pub struct TestActor;

#[async_trait]
impl Actor for TestActor {
type Params = ();

async fn new(_params: Self::Params) -> Result<Self, anyhow::Error> {
Ok(Self)
}
}

#[derive(Debug, Serialize, Deserialize, Named, Clone)]
pub struct Wait;

#[async_trait]
impl Handler<Wait> for TestActor {
async fn handle(&mut self, _: &Context<Self>, _: Wait) -> Result<(), anyhow::Error> {
// SAFETY:
// This is in order to simulate a process in tests that never exits.
unsafe {
atexit(exit_handler);
}
Ok(())
}
}

/// Test wrapper around MockAlloc to allow us to block next() calls since
/// mockall doesn't support returning futures.
pub struct MockAllocWrapper {
Expand Down Expand Up @@ -309,12 +360,29 @@ pub mod test_utils {

#[cfg(test)]
pub(crate) mod testing {
use core::panic;
use std::collections::HashMap;
use std::collections::HashSet;

use std::time::Duration;

use hyperactor::Mailbox;
use hyperactor::actor::remote::Remote;
use hyperactor::channel;
use hyperactor::mailbox;
use hyperactor::mailbox::BoxedMailboxSender;
use hyperactor::mailbox::DialMailboxRouter;
use hyperactor::mailbox::IntoBoxedMailboxSender;
use hyperactor::mailbox::MailboxServer;
use hyperactor::mailbox::UndeliverableMailboxSender;
use hyperactor::proc::Proc;
use hyperactor::reference::Reference;
use ndslice::shape;
use tokio::process::Command;

use super::*;
use crate::alloc::test_utils::TestActor;
use crate::alloc::test_utils::Wait;
use crate::proc_mesh::mesh_agent::MeshAgentMessageClient;

#[macro_export]
macro_rules! alloc_test_suite {
Expand Down Expand Up @@ -376,4 +444,148 @@ pub(crate) mod testing {
assert!(alloc.next().await.is_none());
assert_eq!(stopped, running);
}

async fn spawn_proc(
transport: ChannelTransport,
) -> (DialMailboxRouter, Mailbox, Proc, ChannelAddr) {
let (router_channel_addr, router_rx) = channel::serve(ChannelAddr::any(transport.clone()))
.await
.unwrap();
let router =
DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed());
router
.clone()
.serve(router_rx, mailbox::monitored_return_handle());

let client_proc_id = ProcId(WorldId("test_stuck".to_string()), 0);
let (client_proc_addr, client_rx) =
channel::serve(ChannelAddr::any(transport)).await.unwrap();
let client_proc = Proc::new(
client_proc_id.clone(),
BoxedMailboxSender::new(router.clone()),
);
client_proc
.clone()
.serve(client_rx, mailbox::monitored_return_handle());
router.bind(client_proc_id.clone().into(), client_proc_addr);
(
router,
client_proc.attach("test_proc").unwrap(),
client_proc,
router_channel_addr,
)
}

async fn spawn_test_actor(
rank: usize,
client_proc: &Proc,
client: &Mailbox,
router_channel_addr: ChannelAddr,
mesh_agent: ActorRef<MeshAgent>,
) -> ActorRef<TestActor> {
let supervisor = client_proc.attach("supervisor").unwrap();
let (supervison_port, _) = supervisor.open_port();
let (config_handle, _) = client.open_port();
mesh_agent
.configure(
client,
rank,
router_channel_addr,
supervison_port.bind(),
HashMap::new(),
config_handle.bind(),
)
.await
.unwrap();
let remote = Remote::collect();
let actor_type = remote
.name_of::<TestActor>()
.ok_or(anyhow::anyhow!("actor not registered"))
.unwrap()
.to_string();
let params = &();
let (completed_handle, mut completed_receiver) = mailbox::open_port(client);
// gspawn actor
mesh_agent
.gspawn(
client,
actor_type,
"Stuck".to_string(),
bincode::serialize(params).unwrap(),
completed_handle.bind(),
)
.await
.unwrap();
let (_, actor_id) = completed_receiver.recv().await.unwrap();
ActorRef::attest(actor_id)
}

/// In order to simulate stuckness, we have to do two things:
/// An actor that is blocked forever AND
/// a proc that does not time out when it is asked to wait for
/// a stuck actor.
#[tokio::test]
async fn test_allocator_stuck_task() {
// Override config.
// Use temporary config for this test
let config = hyperactor::config::global::lock();
let _guard = config.override_key(
hyperactor::config::PROCESS_EXIT_TIMEOUT,
Duration::from_secs(1),
);

let command =
Command::new(buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap());
let mut allocator = ProcessAllocator::new(command);
let mut alloc = allocator
.allocate(AllocSpec {
shape: shape! { replica = 1 },
constraints: Default::default(),
})
.await
.unwrap();

// Get everything up into running state. We require that we get
let mut procs = HashMap::new();
let mut running = HashSet::new();
let mut actor_ref = None;
let (router, client, client_proc, router_addr) = spawn_proc(alloc.transport()).await;
while running.is_empty() {
match alloc.next().await.unwrap() {
ProcState::Created {
proc_id, coords, ..
} => {
procs.insert(proc_id, coords);
}
ProcState::Running {
proc_id,
mesh_agent,
addr,
} => {
router.bind(Reference::Proc(proc_id.clone()), addr.clone());

assert!(procs.contains_key(&proc_id));
assert!(!running.contains(&proc_id));

actor_ref = Some(
spawn_test_actor(0, &client_proc, &client, router_addr, mesh_agent).await,
);
running.insert(proc_id);
break;
}
event => panic!("unexpected event: {:?}", event),
}
}
assert!(actor_ref.unwrap().send(&client, Wait).is_ok());

// There is a stuck actor! We should get a watchdog failure.
alloc.stop().await.unwrap();
let mut stopped = HashSet::new();
while let Some(ProcState::Stopped { proc_id, reason }) = alloc.next().await {
assert_eq!(reason, ProcStopReason::Watchdog);
stopped.insert(proc_id);
}
assert!(alloc.next().await.is_none());
assert_eq!(stopped, running);
}
}
25 changes: 23 additions & 2 deletions hyperactor_mesh/src/alloc/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use hyperactor::channel::ChannelTx;
use hyperactor::channel::Rx;
use hyperactor::channel::Tx;
use hyperactor::channel::TxStatus;
use hyperactor::sync::flag;
use hyperactor::sync::monitor;
use ndslice::Shape;
use tokio::io;
Expand Down Expand Up @@ -130,6 +131,7 @@ enum ChannelState {
struct Child {
channel: ChannelState,
group: monitor::Group,
exit_flag: Option<flag::Flag>,
stdout: LogTailer,
stderr: LogTailer,
stop_reason: Arc<OnceLock<ProcStopReason>>,
Expand All @@ -140,6 +142,7 @@ impl Child {
mut process: tokio::process::Child,
) -> (Self, impl Future<Output = ProcStopReason>) {
let (group, handle) = monitor::group();
let (exit_flag, exit_guard) = flag::guarded();

let stdout = LogTailer::tee(
MAX_TAIL_LOG_LINES,
Expand All @@ -156,6 +159,7 @@ impl Child {
let child = Self {
channel: ChannelState::NotConnected,
group,
exit_flag: Some(exit_flag),
stdout,
stderr,
stop_reason: Arc::clone(&stop_reason),
Expand All @@ -178,6 +182,8 @@ impl Child {
}
result = process.wait() => Self::exit_status_to_reason(result),
};
exit_guard.signal();

stop_reason.get_or_init(|| reason).clone()
};

Expand Down Expand Up @@ -234,10 +240,24 @@ impl Child {
true
}

fn spawn_watchdog(&mut self) {
let Some(exit_flag) = self.exit_flag.take() else {
return;
};
let group = self.group.clone();
let stop_reason = self.stop_reason.clone();
tokio::spawn(async move {
let exit_timeout =
hyperactor::config::global::get(hyperactor::config::PROCESS_EXIT_TIMEOUT);
if tokio::time::timeout(exit_timeout, exit_flag).await.is_err() {
let _ = stop_reason.set(ProcStopReason::Watchdog);
group.fail();
}
});
}

#[hyperactor::instrument_infallible]
fn post(&mut self, message: Allocator2Process) {
// We're here simply assuming that if we're not connected, we're about to
// be killed.
if let ChannelState::Connected(channel) = &mut self.channel {
channel.post(message);
} else {
Expand Down Expand Up @@ -446,6 +466,7 @@ impl Alloc for ProcessAlloc {
// for liveness.
for (_index, child) in self.active.iter_mut() {
child.post(Allocator2Process::StopAndExit(0));
child.spawn_watchdog();
}

self.running = false;
Expand Down
12 changes: 6 additions & 6 deletions hyperactor_mesh/src/alloc/remoteprocess.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ mod test {
}
}

fn set_procstate_execptations(alloc: &mut MockAlloc, shape: Shape) {
fn set_procstate_expectations(alloc: &mut MockAlloc, shape: Shape) {
let alloc_len = shape.slice().len();
alloc.expect_shape().return_const(shape.clone());
for i in 0..alloc_len {
Expand Down Expand Up @@ -1106,7 +1106,7 @@ mod test {
alloc.expect_world_id().return_const(world_id.clone());
alloc.expect_shape().return_const(spec.shape.clone());

set_procstate_execptations(&mut alloc, spec.shape.clone());
set_procstate_expectations(&mut alloc, spec.shape.clone());

// final none
alloc.expect_next().return_const(None);
Expand Down Expand Up @@ -1249,7 +1249,7 @@ mod test {
alloc.alloc.expect_world_id().return_const(world_id.clone());
alloc.alloc.expect_shape().return_const(spec.shape.clone());

set_procstate_execptations(&mut alloc.alloc, spec.shape.clone());
set_procstate_expectations(&mut alloc.alloc, spec.shape.clone());

alloc.alloc.expect_next().return_const(None);
alloc.alloc.expect_stop().times(1).return_once(|| Ok(()));
Expand Down Expand Up @@ -1326,7 +1326,7 @@ mod test {
.return_const(world_id.clone());
alloc1.alloc.expect_shape().return_const(spec.shape.clone());

set_procstate_execptations(&mut alloc1.alloc, spec.shape.clone());
set_procstate_expectations(&mut alloc1.alloc, spec.shape.clone());
alloc1.alloc.expect_next().return_const(None);
alloc1.alloc.expect_stop().times(1).return_once(|| Ok(()));
// second allocation
Expand All @@ -1341,7 +1341,7 @@ mod test {
.expect_world_id()
.return_const(world_id.clone());
alloc2.alloc.expect_shape().return_const(spec.shape.clone());
set_procstate_execptations(&mut alloc2.alloc, spec.shape.clone());
set_procstate_expectations(&mut alloc2.alloc, spec.shape.clone());
alloc2.alloc.expect_next().return_const(None);
alloc2.alloc.expect_stop().times(1).return_once(|| Ok(()));

Expand Down Expand Up @@ -1446,7 +1446,7 @@ mod test {
alloc.alloc.expect_world_id().return_const(world_id.clone());
alloc.alloc.expect_shape().return_const(spec.shape.clone());

set_procstate_execptations(&mut alloc.alloc, spec.shape.clone());
set_procstate_expectations(&mut alloc.alloc, spec.shape.clone());

alloc.alloc.expect_next().return_const(None);
// we expect a stop due to the failure
Expand Down
Loading