Skip to content

[monarch] delete extra runtime #363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: gh/suo/42/base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion books/hyperactor-book/src/mailboxes/mailbox_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<T: Message> Buffer<T> {
{
let (queue, mut next) = mpsc::unbounded_channel();
let (last_processed, processed) = watch::channel(0);
crate::init::RUNTIME.spawn(async move {
tokio::spawn(async move {
let mut seq = 0;
while let Some((msg, return_handle)) = next.recv().await {
process(msg, return_handle).await;
Expand Down
2 changes: 1 addition & 1 deletion hyperactor/src/channel/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl<M: RemoteMessage> NetTx<M> {
dest,
status,
};
crate::init::RUNTIME.spawn(Self::run(link, receiver, notify));
tokio::spawn(Self::run(link, receiver, notify));
tx
}

Expand Down
6 changes: 0 additions & 6 deletions hyperactor/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,11 @@

//! Utilities for launching hyperactor processes.

use std::sync::LazyLock;
use std::sync::OnceLock;

use crate::clock::ClockKind;
use crate::panic_handler;

/// A global runtime used in binding async and sync code. Do not use for executing long running or
/// compute intensive tasks.
pub(crate) static RUNTIME: LazyLock<tokio::runtime::Runtime> =
LazyLock::new(|| tokio::runtime::Runtime::new().expect("failed to create global runtime"));

/// Initialize the Hyperactor runtime. Specifically:
/// - Set up panic handling, so that we get consistent panic stack traces in Actors.
/// - Initialize logging defaults.
Expand Down
4 changes: 2 additions & 2 deletions hyperactor/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ impl<T: Message> Buffer<T> {
{
let (queue, mut next) = mpsc::unbounded_channel();
let (last_processed, processed) = watch::channel(0);
crate::init::RUNTIME.spawn(async move {
tokio::spawn(async move {
let mut seq = 0;
while let Some((msg, return_handle)) = next.recv().await {
process(msg, return_handle).await;
Expand Down Expand Up @@ -925,7 +925,7 @@ impl MailboxClient {
cancel_token: CancellationToken,
addr: ChannelAddr,
) {
crate::init::RUNTIME.spawn(async move {
tokio::spawn(async move {
loop {
tokio::select! {
changed = rx.changed() => {
Expand Down
4 changes: 2 additions & 2 deletions hyperactor/src/mailbox/undeliverable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
// Don't reuse `return_handle` for `h`: else it will never get
// dropped and the task will never return.
let (h, _) = new_undeliverable_port();
crate::init::RUNTIME.spawn(async move {
tokio::spawn(async move {
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
envelope.try_set_error(DeliveryError::BrokenLink(
"message returned to undeliverable port".to_string(),
Expand All @@ -80,7 +80,7 @@ pub(crate) fn return_undeliverable(
return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
envelope: MessageEnvelope,
) {
crate::init::RUNTIME.spawn(async move {
tokio::spawn(async move {
let envelope_copy = envelope.clone();
if (return_handle.send(Undeliverable(envelope))).is_err() {
UndeliverableMailboxSender.post(envelope_copy, /*unsued*/ return_handle)
Expand Down
43 changes: 29 additions & 14 deletions monarch_hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use pyo3::types::PyType;
use serde::Deserialize;
use serde::Serialize;
use serde_bytes::ByteBuf;
use tokio::runtime::Handle;
use tokio::sync::Mutex;
use tokio::sync::oneshot;

Expand Down Expand Up @@ -284,6 +285,8 @@ impl Actor for PythonActor {
type Params = PickledPyObject;

async fn new(actor_type: PickledPyObject) -> Result<Self, anyhow::Error> {
let runtime_handle = Handle::current();

Ok(Python::with_gil(|py| -> Result<Self, SerializablePyErr> {
let unpickled = actor_type.unpickle(py)?;
let class_type: &Bound<'_, PyType> = unpickled.downcast()?;
Expand All @@ -292,21 +295,33 @@ impl Actor for PythonActor {
// Release the GIL so that the thread spawned below can acquire it.
let task_locals = Python::allow_threads(py, || {
let (tx, rx) = std::sync::mpsc::channel();
let _ = std::thread::spawn(move || {
Python::with_gil(|py| {
let asyncio = Python::import(py, "asyncio").unwrap();
let event_loop = asyncio.call_method0("new_event_loop").unwrap();
asyncio
.call_method1("set_event_loop", (event_loop.clone(),))
.unwrap();

let task_locals = pyo3_async_runtimes::TaskLocals::new(event_loop.clone())
.copy_context(py)
.unwrap();
tx.send(task_locals).unwrap();
event_loop.call_method0("run_forever").unwrap();

// We spawn a new thread per PythonActor to drive an isolated
// asyncio event loop for it.
let _ = std::thread::Builder::new()
.name("PythonActor".to_string())
.spawn(move || {
Python::with_gil(|py| {
// Enter the tokio runtime context. This is necessary so
// that things like `tokio::spawn` will work if called
// from PythonActor message handlers.
let _guard = Handle::enter(&runtime_handle);

// Set up the event loop.
let asyncio = Python::import(py, "asyncio").unwrap();
let event_loop = asyncio.call_method0("new_event_loop").unwrap();
asyncio
.call_method1("set_event_loop", (event_loop.clone(),))
.unwrap();

let task_locals =
pyo3_async_runtimes::TaskLocals::new(event_loop.clone())
.copy_context(py)
.unwrap();
tx.send(task_locals).unwrap();
event_loop.call_method0("run_forever").unwrap();
});
});
});
rx.recv().unwrap()
});

Expand Down
7 changes: 6 additions & 1 deletion monarch_hyperactor/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ thread_local! {
}

pub fn initialize(py: Python) -> Result<()> {
pyo3_async_runtimes::tokio::init_with_runtime(get_tokio_runtime())
let runtime = get_tokio_runtime();
pyo3_async_runtimes::tokio::init_with_runtime(runtime)
.map_err(|_| anyhow!("failed to initialize py3 async runtime"))?;

// Initialize thread local state to identify the main Python thread.
Expand All @@ -48,6 +49,10 @@ pub fn initialize(py: Python) -> Result<()> {
);
IS_MAIN_THREAD.set(true);

// Enter into the tokio runtime context. This makes functionality like
// `tokio::spawn` available from this thread.
// Leak the context guard so that we just stay in the context forever.
let _ = Box::leak(Box::new(runtime.handle().enter()));
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion monarch_tensor_worker/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ impl Actor for StreamActor {
// use `block_in_place` for nested async-to-sync-to-async flows.
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_io()
.enable_all()
.build()
.unwrap();
let result = rt.block_on(async {
Expand Down