Skip to content

Retry instances not running add new Execution list endpoint #793

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 1 addition & 31 deletions src/aleph/vm/controllers/firecracker/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,7 @@
from aleph.vm.hypervisors.firecracker.microvm import setfacl
from aleph.vm.network.interfaces import TapInterface
from aleph.vm.storage import create_devmapper, create_volume_file
from aleph.vm.utils import (
HostNotFoundError,
NotEnoughDiskSpaceError,
check_disk_space,
ping,
run_in_subprocess,
)
from aleph.vm.utils import NotEnoughDiskSpaceError, check_disk_space, run_in_subprocess

from .executable import (
AlephFirecrackerExecutable,
Expand Down Expand Up @@ -120,30 +114,6 @@ async def setup(self):
),
)

async def wait_for_init(self) -> None:
"""Wait for the init process of the instance to be ready."""
assert self.enable_networking and self.tap_interface, f"Network not enabled for VM {self.vm_id}"

ip = self.get_ip()
if not ip:
msg = "Host IP not available"
raise ValueError(msg)

ip = ip.split("/", 1)[0]

attempts = 30
timeout_seconds = 2

for attempt in range(attempts):
try:
await ping(ip, packets=1, timeout=timeout_seconds)
return
except HostNotFoundError:
if attempt < (attempts - 1):
continue
else:
raise

async def create_snapshot(self) -> CompressedDiskVolumeSnapshot:
"""Create a VM snapshot"""
volume_path = await create_volume_file(self.resources.message_content.rootfs, self.resources.namespace)
Expand Down
31 changes: 1 addition & 30 deletions src/aleph/vm/controllers/qemu/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import json
import logging
import shutil
from asyncio import Task
from asyncio.subprocess import Process
from pathlib import Path
from typing import Generic, TypeVar
Expand Down Expand Up @@ -32,7 +31,7 @@
from aleph.vm.network.interfaces import TapInterface
from aleph.vm.resources import HostGPU
from aleph.vm.storage import get_rootfs_base_path
from aleph.vm.utils import HostNotFoundError, ping, run_in_subprocess
from aleph.vm.utils import run_in_subprocess

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -232,41 +231,13 @@ async def start(self):
# Start via systemd not here
raise NotImplementedError()

async def wait_for_init(self) -> None:
"""Wait for the init process of the instance to be ready."""
assert self.enable_networking and self.tap_interface, f"Network not enabled for VM {self.vm_id}"

ip = self.get_ip()
if not ip:
msg = "Host IP not available"
raise ValueError(msg)
ip = ip.split("/", 1)[0]

attempts = 30
timeout_seconds = 2

for attempt in range(attempts):
try:
await ping(ip, packets=1, timeout=timeout_seconds)
return
except HostNotFoundError:
if attempt < (attempts - 1):
continue
else:
raise

async def start_guest_api(self):
pass

async def stop_guest_api(self):
pass

print_task: Task | None = None

async def teardown(self):
if self.print_task:
self.print_task.cancel()

if self.enable_networking:
teardown_nftables_for_vm(self.vm_id)
if self.tap_interface:
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/vm/hypervisors/firecracker/microvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@
except asyncio.TimeoutError:
# In Python < 3.11 wait_closed() was broken and returned immediatly
# It is supposedly fixed in Python 3.12.1, but it hangs indefinitely during tests.
logger.info("f{self} unix socket closing timeout")
logger.info("%s unix socket closing timeout", self)

Check warning on line 511 in src/aleph/vm/hypervisors/firecracker/microvm.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/hypervisors/firecracker/microvm.py#L511

Added line #L511 was not covered by tests

logger.debug("Removing files")
if self.config_file_path:
Expand Down
79 changes: 72 additions & 7 deletions src/aleph/vm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from aleph.vm.orchestrator.vm import AlephFirecrackerInstance
from aleph.vm.resources import GpuDevice, HostGPU
from aleph.vm.systemd import SystemDManager
from aleph.vm.utils import create_task_log_exceptions, dumps_for_json
from aleph.vm.utils import create_task_log_exceptions, dumps_for_json, is_pinging

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,12 +86,25 @@
stop_event: asyncio.Event
expire_task: asyncio.Task | None = None
update_task: asyncio.Task | None = None
init_task: asyncio.Task | None

snapshot_manager: SnapshotManager | None
systemd_manager: SystemDManager | None

persistent: bool = False

@property
def is_starting(self) -> bool:
return bool(self.times.starting_at and not self.times.started_at and not self.times.stopping_at)

Check warning on line 98 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L98

Added line #L98 was not covered by tests

@property
def is_controller_running(self):
return (
self.systemd_manager.is_service_active(self.controller_service)
if self.persistent and self.systemd_manager
else None
)

@property
def is_running(self) -> bool:
return (
Expand Down Expand Up @@ -162,6 +175,7 @@
systemd_manager: SystemDManager | None,
persistent: bool,
):
self.init_task = None
self.uuid = uuid.uuid1() # uuid1() includes the hardware address and timestamp
self.vm_hash = vm_hash
self.message = message
Expand Down Expand Up @@ -322,25 +336,71 @@
await self.vm.start_guest_api()

# Start VM and snapshots automatically
# If the execution is confidential, don't start it because we need to wait for the session certificate
# files, use the endpoint /control/machine/{ref}/confidential/initialize to get session files and start the VM
# If the execution is a confidential instance, it is start later in the process when the session certificate
# files are received from the client via the endpoint /control/machine/{ref}/confidential/initialize endpoint
if self.persistent and not self.is_confidential and self.systemd_manager:
self.systemd_manager.enable_and_start(self.controller_service)
await self.wait_for_init()
if self.is_program and self.vm:
await self.systemd_manager.enable_and_start(self.controller_service)

if self.is_program:
await self.wait_for_init()

Check warning on line 345 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L345

Added line #L345 was not covered by tests
await self.vm.load_configuration()
self.times.started_at = datetime.now(tz=timezone.utc)

Check warning on line 347 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L347

Added line #L347 was not covered by tests
else:
self.init_task = asyncio.create_task(self.non_blocking_wait_for_boot())

if self.vm and self.vm.support_snapshot and self.snapshot_manager:
await self.snapshot_manager.start_for(vm=self.vm)

self.times.started_at = datetime.now(tz=timezone.utc)
self.ready_event.set()
await self.save()
except Exception:
logger.exception("%s error during start, tearing down", self)

Check warning on line 357 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L357

Added line #L357 was not covered by tests
await self.vm.teardown()
await self.vm.stop_guest_api()
raise

async def wait_for_persistent_boot(self):
"""Determine if VM has booted by responding to ping and check if the process is still running"""
assert self.vm
assert self.vm.enable_networking and self.vm.tap_interface, f"Network not enabled for VM {self.vm.vm_id}"
ip = self.vm.get_ip()
if not ip:
msg = "Host IP not available"
raise ValueError(msg)

Check warning on line 369 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L368-L369

Added lines #L368 - L369 were not covered by tests

ip = ip.split("/", 1)[0]
max_attempt = 30
timeout_seconds = 2
attempt = 0
while True:
attempt += 1
if attempt > max_attempt:
logging.error("%s has not responded to ping after %s attempt", self, attempt)
raise Exception("Max attempt")

Check warning on line 379 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L378-L379

Added lines #L378 - L379 were not covered by tests

if not self.is_controller_running:
logging.error("%s process stopped running while waiting for boot", self)
raise Exception("Process is not running")

Check warning on line 383 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L382-L383

Added lines #L382 - L383 were not covered by tests
if await is_pinging(ip, packets=1, timeout=timeout_seconds):
break

async def non_blocking_wait_for_boot(self):
"""Wait till the VM respond to ping and mark it as booted or not and clean up ressource if it fail

Used for instances"""
assert self.vm
try:
await self.wait_for_persistent_boot()
logger.info("%s responded to ping. Marking it as started.", self)
self.times.started_at = datetime.now(tz=timezone.utc)
return True
# await self.save()
except Exception as e:
logger.warning("%s failed to responded to ping or is not running, stopping it.: %s ", self, e)
assert self.vm
await self.stop()
return False

Check warning on line 402 in src/aleph/vm/models.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/models.py#L398-L402

Added lines #L398 - L402 were not covered by tests

async def wait_for_init(self):
assert self.vm, "The VM attribute has to be set before calling wait_for_init()"
await self.vm.wait_for_init()
Expand Down Expand Up @@ -383,12 +443,15 @@
async def stop(self) -> None:
"""Stop the VM and release resources"""
assert self.vm, "The VM attribute has to be set before calling stop()"
logger.info("%s stopping", self)

# Prevent concurrent calls to stop() using a Lock
async with self.stop_pending_lock:
if self.times.stopped_at is not None:
logger.debug(f"VM={self.vm.vm_id} already stopped")
return
if self.persistent and self.systemd_manager:
self.systemd_manager.stop_and_disable(self.controller_service)
self.times.stopping_at = datetime.now(tz=timezone.utc)
await self.all_runs_complete()
await self.record_usage()
Expand All @@ -400,6 +463,7 @@
if self.vm.support_snapshot and self.snapshot_manager:
await self.snapshot_manager.stop_for(self.vm_hash)
self.stop_event.set()
logger.info("%s stopped", self)

def start_watching_for_updates(self, pubsub: PubSub):
if not self.update_task:
Expand Down Expand Up @@ -430,6 +494,7 @@
await self.runs_done_event.wait()

async def save(self):
"""Save to DB"""
assert self.vm, "The VM attribute has to be set before calling save()"

pid_info = self.vm.to_dict() if self.vm else None
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/vm/network/firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def execute_json_nft_commands(commands: list[dict]) -> int:
logger.debug("Inserting nftables rules")
return_code, output, error = nft.json_cmd(commands_dict)
if return_code != 0:
logger.error(f"Failed to add nftables rules: {error}")
logger.error("Failed to add nftables rules: %s -- %s", error, json.dumps(commands, indent=4))

return return_code

Expand Down
4 changes: 2 additions & 2 deletions src/aleph/vm/orchestrator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
bench: list[float] = []

loop = asyncio.get_event_loop()
pool = VmPool(loop)
pool = VmPool()

Check warning on line 191 in src/aleph/vm/orchestrator/cli.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/cli.py#L191

Added line #L191 was not covered by tests
await pool.setup()

# Does not make sense in benchmarks
Expand Down Expand Up @@ -246,7 +246,7 @@
"""Run instances from a list of message identifiers."""
logger.info(f"Instances to run: {instances}")
loop = asyncio.get_event_loop()
pool = VmPool(loop)
pool = VmPool()

Check warning on line 249 in src/aleph/vm/orchestrator/cli.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/cli.py#L249

Added line #L249 was not covered by tests
# The main program uses a singleton pubsub instance in order to watch for updates.
# We create another instance here since that singleton is not initialized yet.
# Watching for updates on this instance will therefore not work.
Expand Down
27 changes: 14 additions & 13 deletions src/aleph/vm/orchestrator/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,24 @@


async def start_persistent_vm(vm_hash: ItemHash, pubsub: PubSub | None, pool: VmPool) -> VmExecution:
execution: VmExecution | None = pool.get_running_vm(vm_hash=vm_hash)
execution: VmExecution | None = pool.executions.get(vm_hash)

Check warning on line 261 in src/aleph/vm/orchestrator/run.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/run.py#L261

Added line #L261 was not covered by tests
if execution:
if execution.is_running:
logger.info(f"{vm_hash} is already running")

Check warning on line 264 in src/aleph/vm/orchestrator/run.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/run.py#L264

Added line #L264 was not covered by tests
elif execution.is_starting:
logger.info(f"{vm_hash} is already starting")

Check warning on line 266 in src/aleph/vm/orchestrator/run.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/run.py#L266

Added line #L266 was not covered by tests
elif execution.is_stopping:
logger.info(f"{vm_hash} is stopping, waiting for complete stop before restarting")
await execution.stop_event.wait()
execution = None

Check warning on line 270 in src/aleph/vm/orchestrator/run.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/run.py#L268-L270

Added lines #L268 - L270 were not covered by tests
else:
logger.info(f"{vm_hash} unknown execution state, stopping the vm")
await execution.stop()
execution = None

Check warning on line 274 in src/aleph/vm/orchestrator/run.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/run.py#L272-L274

Added lines #L272 - L274 were not covered by tests

if not execution:
logger.info(f"Starting persistent virtual machine with id: {vm_hash}")
execution = await create_vm_execution(vm_hash=vm_hash, pool=pool, persistent=True)
else:
logger.info(f"{vm_hash} is already running")

await execution.becomes_ready()

Expand All @@ -276,13 +287,3 @@
execution.start_watching_for_updates(pubsub=pubsub)

return execution


async def stop_persistent_vm(vm_hash: ItemHash, pool: VmPool) -> VmExecution | None:
logger.info(f"Stopping persistent VM {vm_hash}")
execution = pool.get_running_vm(vm_hash)

if execution:
await execution.stop()

return execution
18 changes: 13 additions & 5 deletions src/aleph/vm/orchestrator/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
about_executions,
about_login,
list_executions,
list_executions_v2,
notify_allocation,
operate_reserve_resources,
run_code_from_hostname,
Expand Down Expand Up @@ -94,14 +95,19 @@
response.headers["Server"] = f"aleph-vm/{__version__}"


async def http_not_found(request: web.Request):
async def http_not_found(request: web.Request): # noqa: ARG001
"""Return a 404 error for unknown URLs."""
return web.HTTPNotFound()


def setup_webapp():
def setup_webapp(pool: VmPool | None):
"""Create the webapp and set the VmPool

Only case where VmPool is None is in some tests that won't use it.
"""
app = web.Application(middlewares=[error_middleware])
app.on_response_prepare.append(on_prepare_server_version)
app["vm_pool"] = pool
cors = setup(
app,
defaults={
Expand All @@ -118,6 +124,7 @@
# /about APIs return information about the VM Orchestrator
web.get("/about/login", about_login),
web.get("/about/executions/list", list_executions),
web.get("/v2/about/executions/list", list_executions_v2),
web.get("/about/executions/details", about_executions),
web.get("/about/executions/records", about_execution_records),
web.get("/about/usage/system", about_system_usage),
Expand Down Expand Up @@ -175,18 +182,19 @@
settings.check()

loop = asyncio.new_event_loop()
pool = VmPool(loop)
pool = VmPool()

Check warning on line 185 in src/aleph/vm/orchestrator/supervisor.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/supervisor.py#L185

Added line #L185 was not covered by tests
asyncio.run(pool.setup())

hostname = settings.DOMAIN_NAME
protocol = "http" if hostname == "localhost" else "https"

# Require a random token to access /about APIs
secret_token = token_urlsafe(nbytes=32)
app = setup_webapp()
(settings.EXECUTION_ROOT / "login_token").write_text(secret_token)
(settings.EXECUTION_ROOT / "login_token").chmod(0o400)
app = setup_webapp(pool=pool)

Check warning on line 195 in src/aleph/vm/orchestrator/supervisor.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/supervisor.py#L193-L195

Added lines #L193 - L195 were not covered by tests
# Store app singletons. Note that app["pubsub"] will also be created.
app["secret_token"] = secret_token
app["vm_pool"] = pool

# Store sevctl app singleton only if confidential feature is enabled
if settings.ENABLE_CONFIDENTIAL_COMPUTING:
Expand Down
Loading