Skip to content

Error repro for tensor engine related errors (#56) #345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions monarch_extension/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,8 @@ impl<'a> MessageParser<'a> {
.try_iter()?
.map(|x| {
let v = x?;
let vr: PyResult<u64> = v.extract();
if let Ok(v) = vr {
Ok(v.into())
if v.is_instance_of::<pyo3::types::PyInt>() {
Ok(v.extract::<u64>()?.into())
} else {
create_ref(v)
}
Expand Down
10 changes: 10 additions & 0 deletions python/monarch/worker/_testing_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,3 +479,13 @@ def test_pdb_actor():
pdb_actor.send(DebuggerAction.Write(b"5678"))
assert isinstance(pdb_actor.receive(), DebuggerAction.Detach)
return torch.zeros(1)


def throw_python_exception():
d = {"a": torch.tensor(1)}
x = d["b"]
return x


def returns_a_string():
return "hello"
13 changes: 13 additions & 0 deletions python/monarch/worker/_testing_throws_on_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import torch

raise ImportError("could not import file")


def _an_unusable_function():
return torch.tensor([1, 2, 3])
95 changes: 92 additions & 3 deletions python/tests/error_test_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,24 @@
import asyncio
import ctypes
import sys
import time

import click
import monarch
import torch
from monarch._rust_bindings.hyperactor_extension.alloc import (
AllocConstraints,
AllocSpec,
)

from monarch._rust_bindings.monarch_extension.panic import panicking_function

from monarch.actor_mesh import Actor, endpoint, send
from monarch.proc_mesh import proc_mesh
from monarch.common import messages
from monarch.common._device_utils import _local_device_count
from monarch.common.remote import remote
from monarch.mesh_controller import spawn_tensor_engine
from monarch.proc_mesh import proc_mesh, ProcMesh


class ErrorActor(Actor):
Expand Down Expand Up @@ -92,8 +103,6 @@ def _run_error_test_sync(num_procs, sync_endpoint, endpoint_name):


def _run_error_test(num_procs, sync_endpoint, endpoint_name):
import asyncio

if sync_endpoint:
actor_class = ErrorActorSync
else:
Expand Down Expand Up @@ -124,6 +133,60 @@ async def run_test():
asyncio.run(run_test())


def python_throws_error_on_tensor_worker(_mesh):
throws = remote(
"monarch.worker._testing_function.throw_python_exception",
propagate=lambda: torch.tensor(1),
)
throws()
time.sleep(1)
torch.ones(())


def python_returns_unexpected_value_to_rust_on_tensor_worker(_mesh):
actually_returns_a_string = remote(
"monarch.worker._testing_function.returns_a_string",
propagate=lambda: torch.tensor(1),
)
client_thinks_im_a_tensor = actually_returns_a_string()
client_thinks_im_a_tensor += 1
time.sleep(2)
torch.ones(())


def rust_error_on_tensor_worker(mesh):
class _Recording:
def __init__(self):
self.ref = 0

# Trying to call a recording that doesn't exist will cause a rust error
# on the worker.
mesh._send(
messages.CallRecording(ident=0, recording=_Recording(), results=[], actuals=[])
)
time.sleep(1)
torch.rand(3, 4)


def rust_panic_on_tensor_worker(_mesh):
# __test_panic is a special invocation for testing inside StreamActor
# that forces a panic
panic = remote("__test_panic", propagate=lambda: torch.ones(()))
panic()
time.sleep(1)
torch.rand(3, 4)


def tensor_worker_remote_function_import_error(_mesh):
import_error = remote(
"monarch.worker._testing_throws_on_import._an_unusable_function",
propagate=lambda: torch.ones(()),
)
import_error()
time.sleep(1)
torch.ones(())


@click.group()
def main():
pass
Expand Down Expand Up @@ -176,5 +239,31 @@ def error_unmonitored():
asyncio.run(_error_unmonitored())


@main.command("error-client")
def error_client():
gpus = _local_device_count()
spec = AllocSpec(AllocConstraints(), gpus=gpus, hosts=1)
allocator = monarch.LocalAllocator()
alloc = allocator.allocate(spec).get()
ProcMesh.from_alloc(alloc).get()
# Attempting to reuse alloc for a new proc mesh will cause a rust error.
ProcMesh.from_alloc(alloc).get()


@main.command("error-tensor-engine")
@click.option("--num-procs", type=int, required=True)
@click.option("--test-name", type=str, required=True)
def error_tensor_engine(num_procs, test_name):
print(f"Running tensor engine test: {num_procs=} {test_name=}")
proc = proc_mesh(gpus=num_procs).get()
mesh = spawn_tensor_engine(proc)
with mesh.activate():
test_func = globals().get(test_name)
if not test_func:
raise ValueError(f"Function {test_name} not found in the current module.")
test_func(mesh)
mesh.exit()


if __name__ == "__main__":
main()
149 changes: 102 additions & 47 deletions python/tests/test_actor_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import asyncio
import importlib.resources
import subprocess

import pytest
from monarch.actor_mesh import Actor, ActorError, endpoint, send
from monarch.actor_mesh import Actor, ActorError, endpoint

from monarch.proc_mesh import proc_mesh

Expand Down Expand Up @@ -66,6 +65,24 @@ def __setstate__(self, state):
self.__dict__.update(state)


def _test_helper(cmd_args, timeout=180):
"""Helper function to run a subprocess test and check its output."""
test_bin = importlib.resources.files("monarch.python.tests").joinpath("test_bin")
cmd = [str(test_bin)] + cmd_args
try:
print("running cmd", " ".join(cmd))
process = subprocess.run(cmd, capture_output=True, timeout=timeout)
except subprocess.TimeoutExpired as e:
print("timeout expired")
if e.stdout is not None:
print(e.stdout.decode())
if e.stderr is not None:
print(e.stderr.decode())
raise

return process


@pytest.mark.parametrize(
"actor_class",
[ExceptionActor, ExceptionActorSync],
Expand Down Expand Up @@ -118,26 +135,14 @@ def test_actor_supervision(num_procs, sync_endpoint, sync_test_impl, endpoint_na
to exit with a non-zero code, so the only way we can test it is via a
subprocess harness.
"""
# Run the segfault test in a subprocess
test_bin = importlib.resources.files("monarch.python.tests").joinpath("test_bin")
cmd = [
str(test_bin),
cmd_args = [
"error-endpoint",
f"--num-procs={num_procs}",
f"--sync-endpoint={sync_endpoint}",
f"--sync-test-impl={sync_test_impl}",
f"--endpoint-name={endpoint_name}",
]
try:
print("running cmd", " ".join(cmd))
process = subprocess.run(cmd, capture_output=True, timeout=180)
except subprocess.TimeoutExpired as e:
print("timeout expired")
if e.stdout is not None:
print(e.stdout.decode())
if e.stderr is not None:
print(e.stderr.decode())
raise
process = _test_helper(cmd_args)

# Assert that the subprocess exited with a non-zero code
assert "I actually ran" in process.stdout.decode()
Expand All @@ -152,22 +157,7 @@ def test_proc_mesh_bootstrap_error():
"""
Test that attempts to spawn a ProcMesh with a failure during bootstrap.
"""
# Run the segfault test in a subprocess
test_bin = importlib.resources.files("monarch.python.tests").joinpath("test_bin")
cmd = [
str(test_bin),
"error-bootstrap",
]
try:
print("running cmd", " ".join(cmd))
process = subprocess.run(cmd, capture_output=True, timeout=180)
except subprocess.TimeoutExpired as e:
print("timeout expired")
if e.stdout is not None:
print(e.stdout.decode())
if e.stderr is not None:
print(e.stderr.decode())
raise
process = _test_helper(["error-bootstrap"])

# Assert that the subprocess exited with a non-zero code
assert "I actually ran" in process.stdout.decode()
Expand Down Expand Up @@ -217,24 +207,89 @@ async def test_broken_pickle_class(raise_on_getstate, raise_on_setstate, num_pro
@pytest.mark.oss_skip
async def test_exception_after_wait_unmonitored():
# Run the test in a subprocess
test_bin = importlib.resources.files("monarch.python.tests").joinpath("test_bin")
cmd = [
str(test_bin),
"error-unmonitored",
]
try:
print("running cmd", " ".join(cmd))
process = subprocess.run(cmd, capture_output=True, timeout=180)
except subprocess.TimeoutExpired as e:
print("timeout expired")
if e.stdout is not None:
print(e.stdout.decode())
if e.stderr is not None:
print(e.stderr.decode())
raise
process = _test_helper(["error-unmonitored"])

# Assert that the subprocess exited with a non-zero code
assert "I actually ran" in process.stdout.decode()
assert (
process.returncode != 0
), f"Expected non-zero exit code, got {process.returncode}"


# oss_skip: importlib not pulling resource correctly in git CI, needs to be revisited
@pytest.mark.oss_skip
async def test_rust_error_on_client():
# Run the test in a subprocess
process = _test_helper(["error-client"])

# Assert that the subprocess exited with a non-zero code
assert "Exception: Alloc object already been used" in process.stderr.decode()
assert (
process.returncode != 0
), f"Expected non-zero exit code, got {process.returncode}"


# oss_skip: importlib not pulling resource correctly in git CI, needs to be revisited
@pytest.mark.oss_skip
@pytest.mark.parametrize("num_procs", [1, 2])
@pytest.mark.parametrize(
"test_name_and_output",
[
(
"python_throws_error_on_tensor_worker",
[
"RuntimeError: remote function failed: Traceback (most recent call last):",
'x = d["b"]',
"KeyError: 'b'",
],
),
(
"python_returns_unexpected_value_to_rust_on_tensor_worker",
[
"Traceback of where the remote function was issued on controller (most recent call last):",
"in python_returns_unexpected_value_to_rust_on_tensor_worker",
"Traceback of where the remote function failed on worker (most recent call last):",
"in torch operator failed: torch operator error aten::add_() Expected a value of type 'Tensor' for argument 'self' but instead found type 'str'",
],
),
(
"rust_error_on_tensor_worker",
["processing error: could not find recording: Ref {"],
),
(
"rust_panic_on_tensor_worker",
[
"panic: __test_panic called",
"panicked at fbcode/monarch/monarch_messages/src/worker.rs",
],
),
(
"tensor_worker_remote_function_import_error",
[
"Traceback of where the remote function was issued on controller (most recent call last):",
"Traceback of where the remote function failed on worker (most recent call last):",
'in invalid remote function: failed to resolve function <function "monarch.worker._testing_throws_on_import._an_unusable_function">: Traceback (most recent call last):',
],
),
],
)
def test_tensor_engine_errors(num_procs, test_name_and_output):
"""
Test that an endpoint causing spontaenous process exit is handled by the supervisor.

Today, these events are delivered to the client and cause the client process
to exit with a non-zero code, so the only way we can test it is via a
subprocess harness.
"""
cmd_args = [
"error-tensor-engine",
f"--num-procs={num_procs}",
f"--test-name={test_name_and_output[0]}",
]
process = _test_helper(cmd_args)

for output in test_name_and_output[1]:
assert output in process.stderr.decode()
assert (
process.returncode != 0
), f"Expected non-zero exit code, got {process.returncode}"