Skip to content

WIP: Add support for IPv4 port publishing #238

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: hoh-persistent-vm
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/vm_supervisor-dev.dockerfile
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ RUN curl -fsSL -o /opt/firecracker/vmlinux.bin https://s3.amazonaws.com/spec.ccf
RUN ln /opt/firecracker/release-*/firecracker-v* /opt/firecracker/firecracker
RUN ln /opt/firecracker/release-*/jailer-v* /opt/firecracker/jailer

RUN pip3 install typing-extensions 'aleph-message>=0.1.19'
RUN pip3 install typing-extensions 'aleph-message>=0.2.0'

RUN mkdir -p /var/lib/aleph/vm/jailer

2 changes: 1 addition & 1 deletion examples/volumes/Dockerfile
Original file line number Diff line number Diff line change
@@ -6,6 +6,6 @@ RUN apt-get update && apt-get -y upgrade && apt-get install -y \
&& rm -rf /var/lib/apt/lists/*

RUN python3 -m venv /opt/venv
RUN /opt/venv/bin/pip install 'aleph-message>=0.1.19'
RUN /opt/venv/bin/pip install 'aleph-message>=0.2.0'

CMD mksquashfs /opt/venv /mnt/volume-venv.squashfs
17 changes: 17 additions & 0 deletions firecracker/microvm.py
Original file line number Diff line number Diff line change
@@ -11,6 +11,8 @@
from tempfile import NamedTemporaryFile
from typing import Optional, Tuple, List

from aleph_message.models.program import PortMapping

from .config import FirecrackerConfig
from .config import Drive

@@ -327,6 +329,17 @@ async def create_network_interface(self, interface: str = "eth0") -> str:

return host_dev_name

def publish_ports(self, port_mappings: List[PortMapping]):
interface = self.network_interface

for mapping in port_mappings:
system(f"iptables -A PREROUTING -t nat "
f"-i {interface} "
f"-p {mapping.protocol} "
f"--dport {mapping.public_port} "
f"-j DNAT "
f"--to {self.guest_ip}:{mapping.port}")

async def print_logs(self):
while not self.proc:
await asyncio.sleep(0.01) # Todo: Use signal here
@@ -444,6 +457,10 @@ async def teardown(self):
f"iptables -D FORWARD -i {self.network_tap} -o {self.network_interface} -j ACCEPT"
)

# system(
# ... # TODO remove nat rules for the VM
# )

if self._unix_socket:
logger.debug("Closing unix socket")
self._unix_socket.close()
2 changes: 1 addition & 1 deletion packaging/Makefile
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ debian-package-code:
cp ../examples/message_from_aleph.json ./aleph-vm/opt/aleph-vm/examples/message_from_aleph.json
cp -r ../examples/data ./aleph-vm/opt/aleph-vm/examples/data
mkdir -p ./aleph-vm/opt/aleph-vm/examples/volumes
pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message>=0.1.19'
pip3 install --target ./aleph-vm/opt/aleph-vm/ 'aleph-message==0.2.0'
python3 -m compileall ./aleph-vm/opt/aleph-vm/

debian-package-resources: firecracker-bins vmlinux
2 changes: 1 addition & 1 deletion vm_supervisor/README.md
Original file line number Diff line number Diff line change
@@ -87,7 +87,7 @@ is used to parse and validate Aleph messages.
```shell
apt install -y --no-install-recommends --no-install-suggests python3-pip
pip3 install pydantic[dotenv]
pip3 install aleph-message
pip3 install 'aleph-message==0.2.0'
```

### 2.f. Create the jailer working directory:
3 changes: 3 additions & 0 deletions vm_supervisor/conf.py
Original file line number Diff line number Diff line change
@@ -105,6 +105,9 @@ class Settings(BaseSettings):
MAX_PROGRAM_ARCHIVE_SIZE = 10_000_000 # 10 MB
MAX_DATA_ARCHIVE_SIZE = 10_000_000 # 10 MB

# hashlib.sha256(b"secret-token").hexdigest()
ALLOCATION_TOKEN_HASH = "151ba92f2eb90bce67e912af2f7a5c17d8654b3d29895b042107ea312a7eebda"

FAKE_DATA_PROGRAM: Optional[Path] = None
BENCHMARK_FAKE_DATA_PROGRAM = Path(
abspath(join(__file__, "../../examples/example_fastapi"))
19 changes: 15 additions & 4 deletions vm_supervisor/models.py
Original file line number Diff line number Diff line change
@@ -5,16 +5,17 @@
from asyncio import Task
from dataclasses import dataclass
from datetime import datetime
from typing import NewType, Optional, Dict
from typing import NewType, Optional, Dict, List

from aleph_message.models import ProgramContent
from aleph_message.models.program import PortMapping

from .conf import settings
from .metrics import save_record, save_execution_data, ExecutionRecord
from .pubsub import PubSub
from .utils import dumps_for_json
from .vm import AlephFirecrackerVM
from .vm.firecracker_microvm import AlephFirecrackerResources
from .conf import settings

logger = logging.getLogger(__name__)

@@ -57,6 +58,9 @@ class VmExecution:
expire_task: Optional[asyncio.Task] = None
update_task: Optional[asyncio.Task] = None

marked_as_long_running: bool = False
port_mappings: List[PortMapping]

@property
def is_running(self):
return self.times.starting_at and not self.times.stopping_at
@@ -66,7 +70,8 @@ def becomes_ready(self):
return self.ready_event.wait

def __init__(
self, vm_hash: VmHash, program: ProgramContent, original: ProgramContent
self, vm_hash: VmHash, program: ProgramContent, original: ProgramContent,
port_mappings: Optional[List[PortMapping]] = None
):
self.uuid = uuid.uuid1() # uuid1() includes the hardware address and timestamp
self.vm_hash = vm_hash
@@ -76,6 +81,7 @@ def __init__(
self.ready_event = asyncio.Event()
self.concurrent_runs = 0
self.runs_done_event = asyncio.Event()
self.port_mappings = port_mappings or []

def to_dict(self) -> Dict:
return {
@@ -107,6 +113,7 @@ async def create(self, vm_id: int) -> AlephFirecrackerVM:
)
try:
await vm.setup()
await vm.publish_ports(self.port_mappings)
await vm.start()
await vm.configure()
await vm.start_guest_api()
@@ -117,7 +124,11 @@ async def create(self, vm_id: int) -> AlephFirecrackerVM:
await vm.teardown()
raise

def stop_after_timeout(self, timeout: float = 5.0) -> Task:
def stop_after_timeout(self, timeout: float = 5.0) -> Optional[Task]:
if self.marked_as_long_running:
logger.debug("VM marked as long running. Ignoring timeout.")
return

if self.expire_task:
logger.debug("VM already has a timeout. Extending it.")
self.expire_task.cancel()
13 changes: 10 additions & 3 deletions vm_supervisor/pool.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import logging
from typing import Dict, Optional, Iterable
from typing import Dict, Optional, Iterable, List

from aleph_message.models import ProgramContent, ProgramMessage
from aleph_message.models.program import PortMapping

from .conf import settings
from .models import VmHash, VmExecution
@@ -28,10 +29,11 @@ def __init__(self):
self.executions = {}

async def create_a_vm(
self, vm_hash: VmHash, program: ProgramContent, original: ProgramContent
self, vm_hash: VmHash, program: ProgramContent, original: ProgramContent,
port_mappings: Optional[List[PortMapping]] = None,
) -> VmExecution:
"""Create a new Aleph Firecracker VM from an Aleph function message."""
execution = VmExecution(vm_hash=vm_hash, program=program, original=original)
execution = VmExecution(vm_hash=vm_hash, program=program, original=original, port_mappings=port_mappings)
self.executions[vm_hash] = execution
await execution.prepare()
vm_id = self.get_unique_vm_id()
@@ -90,3 +92,8 @@ async def stop(self):
await asyncio.gather(
*(execution.stop() for vm_hash, execution in self.executions.items())
)

def get_long_running_executions(self) -> Iterable[VmExecution]:
for vm_hash, execution in self.executions.items():
if execution.marked_as_long_running and execution.is_running:
yield execution
16 changes: 15 additions & 1 deletion vm_supervisor/resources.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@

from datetime import datetime, timezone
from functools import lru_cache
from typing import Set, Optional, List
from typing import Tuple

import cpuinfo
import psutil
from aiohttp import web
from aleph_message.models.program import CpuProperties
from aleph_message.models.program import CpuProperties, PortMapping
from pydantic import BaseModel

from .conf import settings
from .models import VmHash


class Period(BaseModel):
@@ -117,3 +120,14 @@ async def about_system_usage(request: web.Request):
return web.json_response(
text=usage.json(exclude_none=True),
)


class ProgramAllocation(BaseModel):
program_id: VmHash
port_mappings: Optional[List[PortMapping]] = None


class Allocation(BaseModel):
long_running_vms: Set[str]
on_demand_vms: Optional[Set[str]] = None
jobs: Optional[Set] = None
50 changes: 47 additions & 3 deletions vm_supervisor/run.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import asyncio
import logging
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, List

import msgpack
from aiohttp import web
from aiohttp.web_exceptions import HTTPBadRequest, HTTPInternalServerError
from aiohttp.web_exceptions import (
HTTPBadRequest,
HTTPInternalServerError,
)
from aleph_message.models.program import PortMapping
from msgpack import UnpackValueError

from firecracker.microvm import MicroVMFailedInit
@@ -13,6 +17,7 @@
from .models import VmHash, VmExecution
from .pool import VmPool
from .pubsub import PubSub
from .resources import ProgramAllocation
from .vm.firecracker_microvm import (
ResourceDownloadError,
VmSetupError,
@@ -44,15 +49,25 @@ async def build_event_scope(event) -> Dict[str, Any]:
}


async def create_vm_execution(vm_hash: VmHash) -> VmExecution:
async def create_vm_execution(vm_hash: VmHash, port_mappings: Optional[List[PortMapping]] = None) -> VmExecution:

# try:
# message, original_message = try_load_updated_message(vm_hash, attempts=5)
# except HTTPNotFound as error:
# raise HTTPServiceUnavailable(text=f"Message could not be loaded {error.args}")

message, original_message = await load_updated_message(vm_hash)

# TODO: Verify that port mappings match ports published

pool.message_cache[vm_hash] = message

try:
execution = await pool.create_a_vm(
vm_hash=vm_hash,
program=message.content,
original=original_message.content,
port_mappings=port_mappings,
)
except ResourceDownloadError as error:
logger.exception(error)
@@ -213,3 +228,32 @@ async def run_code_on_event(vm_hash: VmHash, event, pubsub: PubSub):
execution.stop_after_timeout(timeout=settings.REUSE_TIMEOUT)
else:
await execution.stop()


async def start_long_running(program_allocation: ProgramAllocation, pubsub: PubSub) -> VmExecution:
execution: Optional[VmExecution] = await pool.get_running_vm(vm_hash=program_allocation.program_id)

if not execution:
execution = await create_vm_execution(vm_hash=program_allocation.program_id,
port_mappings=program_allocation.port_mappings)

# If the VM was already running in lambda mode, it should not expire
# as long as it is also scheduled as long-running
execution.marked_as_long_running = True
execution.cancel_expiration()

await execution.becomes_ready()

# if settings.WATCH_FOR_UPDATES:
# # FIXME: Is this added for every request ?
# execution.start_watching_for_updates(pubsub=request.app["pubsub"])

return execution


async def stop_long_running(vm_hash: VmHash) -> Optional[VmExecution]:
logger.info(f"Stopping long running {vm_hash}")
execution = await pool.get_running_vm(vm_hash)
if execution:
await execution.stop()
return execution
5 changes: 4 additions & 1 deletion vm_supervisor/supervisor.py
Original file line number Diff line number Diff line change
@@ -24,7 +24,9 @@
about_executions,
about_config,
status_check_fastapi,
about_execution_records, status_check_version,
about_execution_records,
status_check_version,
update_allocations,
)

logger = logging.getLogger(__name__)
@@ -52,6 +54,7 @@ async def server_version_middleware(
web.get("/about/executions/records", about_execution_records),
web.get("/about/usage/system", about_system_usage),
web.get("/about/config", about_config),
web.post("/control/allocations", update_allocations),
web.get("/status/check/fastapi", status_check_fastapi),
web.get("/status/check/version", status_check_version),
web.route("*", "/vm/{ref}{suffix:.*}", run_code_from_path),
56 changes: 54 additions & 2 deletions vm_supervisor/views.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import binascii
import logging
import os.path
from hashlib import sha256
from string import Template
from typing import Awaitable, Optional
from packaging.version import Version, InvalidVersion

import aiodns
import aiohttp
from aiohttp import web
from aiohttp.web_exceptions import HTTPNotFound
from packaging.version import Version, InvalidVersion
from pydantic import ValidationError

from . import status, __version__
from .conf import settings
from .metrics import get_execution_records
from .models import VmHash
from .run import run_code_on_request, pool
from .resources import Allocation
from .run import run_code_on_request, pool, start_long_running
from .utils import b32_to_b16, get_ref_from_dns, dumps_for_json

logger = logging.getLogger(__name__)
@@ -166,3 +169,52 @@ async def status_check_version(request: web.Request):
)
else:
return web.HTTPForbidden(text=f"Outdated: version {current} < {reference}")


def authenticate_api_request(request: web.Request) -> bool:
"""Authenticate an API request to update the VM allocations.
"""
signature: bytes = request.headers.get('X-Auth-Signature').encode()
# body: bytes = await request.read()
if not signature:
raise web.HTTPUnauthorized(text="Authentication token is missing")

# Use a simple authentication method: the hash of the signature should match the value in the settings
if sha256(signature).hexdigest() != settings.ALLOCATION_TOKEN_HASH:
raise web.HTTPUnauthorized(text="Authentication token received is invalid")

return True



async def update_allocations(request: web.Request):
if not authenticate_api_request(request):
return web.HTTPUnauthorized(text="Invalid authentication")

try:
data = await request.json()
allocation = Allocation(**data)
except ValidationError as error:
return web.json_response(
data=error.json(), status=web.HTTPBadRequest.status_code
)

pubsub = request.app["pubsub"]

for vm_hash in allocation.long_running_vms:
vm_hash = VmHash(vm_hash)
logger.info(f"Starting long running VM {vm_hash}")
await start_long_running(vm_hash, pubsub)

for execution in pool.get_long_running_executions():
if execution.vm_hash not in allocation.long_running_vms:
logger.info(f"Stopping long running VM {execution.vm_hash}")
await execution.stop()
execution.marked_as_long_running = False

if allocation.on_demand_vms:
logger.info("Not supported yet: 'allocation.on_demand_vms'")
if allocation.jobs:
logger.info("Not supported yet: 'allocation.on_demand_vms'")

return web.json_response(data={"success": True})
7 changes: 6 additions & 1 deletion vm_supervisor/vm/firecracker_microvm.py
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
from aiohttp import ClientResponseError

from aleph_message.models import ProgramContent
from aleph_message.models.program import MachineResources, Encoding
from aleph_message.models.program import MachineResources, Encoding, PortMapping
from firecracker.config import (
BootSource,
Drive,
@@ -320,6 +320,11 @@ async def setup(self):
await fvm.teardown()
raise

async def publish_ports(self, port_mappings: List[PortMapping]):
if not self.fvm:
raise ValueError("MicroVM is not defined yet")
return self.fvm.publish_ports(port_mappings=port_mappings)

async def start(self):
logger.debug(f"starting vm {self.vm_id}")
if not self.fvm: