Skip to content

Switch to using the local IPFS Kubo daemon for IPFS files #779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions packaging/aleph-vm/etc/ipfs/kubo.json

This file was deleted.

11 changes: 10 additions & 1 deletion packaging/aleph-vm/etc/systemd/system/ipfs.service
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ ProtectHome=true
RemoveIPC=true
RestrictSUIDSGID=true
CapabilityBoundingSet=CAP_NET_BIND_SERVICE
# set memory limit to avoid taking all the CRN ressource and getting OOM
# https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgrmaxmemory
Environment=GOMEMLIMIT=1900m
MemoryHigh=2G
MemoryMax=4G

# enable for 1-1024 port listening
#AmbientCapabilities=CAP_NET_BIND_SERVICE
Expand All @@ -76,7 +81,11 @@ Type=notify
User=ipfs
Group=ipfs
Environment=IPFS_PATH="/var/lib/ipfs"
ExecStart=/opt/kubo/ipfs daemon --init --migrate --init-profile=server --config-file /etc/ipfs/kubo.json
ExecStartPre=/opt/kubo/ipfs init
ExecStartPre=/opt/kubo/ipfs config --json Gateway.PublicGateways '{"localhost": {"UseSubdomains": false, "Paths": ["/ipfs", "/ipns"]}}'
ExecStartPre=/opt/kubo/ipfs config --json Reprovider.Strategy '"roots"'
ExecStartPre=/opt/kubo/ipfs config --json Swarm.ResourceMgr '{"MaxMemory" : "1GB"}'
ExecStart=/opt/kubo/ipfs daemon --migrate=true --init-profile=server
Restart=on-failure
KillSignal=SIGINT

Expand Down
1 change: 1 addition & 0 deletions src/aleph/vm/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class Settings(BaseSettings):
WATCH_FOR_UPDATES: bool = True

API_SERVER: str = "https://official.aleph.cloud"
IPFS_SERVER: Url = Url("http://localhost:8080/ipfs")
# Connect to the Quad9 VPN provider using their IPv4 and IPv6 addresses.
CONNECTIVITY_IPV4_URL: str = "https://9.9.9.9/"
CONNECTIVITY_IPV6_URL: str = "https://[2620:fe::fe]/"
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/vm/orchestrator/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
from aleph_message.status import MessageStatus

from aleph.vm.conf import settings
from aleph.vm.storage import get_latest_amend, get_message
from aleph.vm.storage import get_executable_message, get_latest_amend


async def try_get_message(ref: str) -> ExecutableMessage:
"""Get the message or raise an aiohttp HTTP error"""
try:
return await get_message(ref)
return await get_executable_message(ref)
except ClientConnectorError as error:
raise HTTPServiceUnavailable(reason="Aleph Connector unavailable") from error
except ClientResponseError as error:
Expand Down
90 changes: 66 additions & 24 deletions src/aleph/vm/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@
import logging
import re
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
from shutil import copy2, make_archive
from subprocess import CalledProcessError

import aiohttp
from aleph_message.models import (
AlephMessage,
InstanceMessage,
ItemHash,
ItemType,
ProgramMessage,
StoreMessage,
parse_message,
)
from aleph_message.models.execution.instance import RootfsVolume
Expand Down Expand Up @@ -133,6 +135,32 @@
tmp_path.unlink(missing_ok=True)


async def download_file_from_ipfs_or_connector(ref: str, cache_path: Path, filetype: str) -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async def download_file_from_ipfs_or_connector(ref: str, cache_path: Path, filetype: str) -> None:
async def download_file_from_ipfs_or_connector(item_hash: ItemHash, cache_path: Path, filetype: str) -> None:

"""Download a file from the IPFS Gateway if possible, else from the vm-connector."""

if cache_path.is_file():
logger.debug(f"File already exists: {cache_path}")
return

message: StoreMessage = await get_store_message(ref)

if message.content.item_type == ItemType.ipfs:
# Download IPFS files from the IPFS gateway directly
cid = message.content.item_hash
url = f"{settings.IPFS_SERVER}/{cid}"
await download_file(url, cache_path)
else:
# Download via the vm-connector
path_mapping = {
"runtime": "/download/runtime",
"code": "/download/code",
"data": "/download/data",
}
path = path_mapping[filetype]
url = f"{settings.CONNECTOR_URL}{path}/{ref}"
await download_file(url, cache_path)


async def get_latest_amend(item_hash: str) -> str:
if settings.FAKE_DATA_PROGRAM:
return item_hash
Expand All @@ -146,7 +174,26 @@
return result or item_hash


async def get_message(ref: str) -> ProgramMessage | InstanceMessage:
async def load_message(path: Path) -> AlephMessage:
"""Load a message from the cache on disk."""
with open(path) as cache_file:
msg = json.load(cache_file)

if path in (settings.FAKE_DATA_MESSAGE, settings.FAKE_INSTANCE_MESSAGE):
# Ensure validation passes while tweaking message content
msg = fix_message_validation(msg)

return parse_message(message_dict=msg)


async def get_message(ref: str) -> AlephMessage:
cache_path = (Path(settings.MESSAGE_CACHE) / ref).with_suffix(".json")
url = f"{settings.CONNECTOR_URL}/download/message/{ref}"
await download_file(url, cache_path)
return await load_message(cache_path)


async def get_executable_message(ref: str) -> ProgramMessage | InstanceMessage:
if ref == settings.FAKE_INSTANCE_ID:
logger.debug("Using the fake instance message since the ref matches")
cache_path = settings.FAKE_INSTANCE_MESSAGE
Expand All @@ -158,23 +205,22 @@
url = f"{settings.CONNECTOR_URL}/download/message/{ref}"
await download_file(url, cache_path)

with open(cache_path) as cache_file:
msg = json.load(cache_file)
return await load_message(cache_path)

if cache_path in (settings.FAKE_DATA_MESSAGE, settings.FAKE_INSTANCE_MESSAGE):
# Ensure validation passes while tweaking message content
msg = fix_message_validation(msg)

result = parse_message(message_dict=msg)
assert isinstance(result, InstanceMessage | ProgramMessage), "Parsed message is not executable"
return result
async def get_store_message(ref: str) -> StoreMessage:
message = await get_message(ref)
if not isinstance(message, StoreMessage):
msg = f"Expected a store message, got {message.type}"
raise ValueError(msg)

Check warning on line 215 in src/aleph/vm/storage.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/storage.py#L214-L215

Added lines #L214 - L215 were not covered by tests
return message


async def get_code_path(ref: str) -> Path:
if settings.FAKE_DATA_PROGRAM:
archive_path = Path(settings.FAKE_DATA_PROGRAM)

encoding: Encoding = (await get_message(ref="fake-message")).content.code.encoding
encoding: Encoding = (await get_executable_message(ref="fake-message")).content.code.encoding
if encoding == Encoding.squashfs:
squashfs_path = Path(archive_path.name + ".squashfs")
squashfs_path.unlink(missing_ok=True)
Expand All @@ -191,8 +237,7 @@
raise ValueError(msg)

cache_path = Path(settings.CODE_CACHE) / ref
url = f"{settings.CONNECTOR_URL}/download/code/{ref}"
await download_file(url, cache_path)
await download_file_from_ipfs_or_connector(ref, cache_path, "code")
return cache_path


Expand All @@ -203,8 +248,7 @@
return Path(f"{data_dir}.zip")

cache_path = Path(settings.DATA_CACHE) / ref
url = f"{settings.CONNECTOR_URL}/download/data/{ref}"
await download_file(url, cache_path)
await download_file_from_ipfs_or_connector(ref, cache_path, "data")

Check warning on line 251 in src/aleph/vm/storage.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/storage.py#L251

Added line #L251 was not covered by tests
return cache_path


Expand All @@ -224,11 +268,7 @@
return Path(settings.FAKE_DATA_RUNTIME)

cache_path = Path(settings.RUNTIME_CACHE) / ref
url = f"{settings.CONNECTOR_URL}/download/runtime/{ref}"

if not cache_path.is_file():
# File does not exist, download it
await download_file(url, cache_path)
await download_file_from_ipfs_or_connector(ref, cache_path, "runtime")

await check_squashfs_integrity(cache_path)
await chown_to_jailman(cache_path)
Expand All @@ -242,8 +282,10 @@
return Path(settings.FAKE_INSTANCE_BASE)

cache_path = Path(settings.RUNTIME_CACHE) / ref
url = f"{settings.CONNECTOR_URL}/download/runtime/{ref}"
await download_file(url, cache_path)

# if not cache_path.is_file():
await download_file_from_ipfs_or_connector(ref, cache_path, "runtime")

Check warning on line 287 in src/aleph/vm/storage.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/storage.py#L287

Added line #L287 was not covered by tests

await chown_to_jailman(cache_path)
return cache_path

Expand Down Expand Up @@ -364,8 +406,8 @@
return Path(settings.FAKE_DATA_VOLUME)

cache_path = Path(settings.DATA_CACHE) / ref
url = f"{settings.CONNECTOR_URL}/download/data/{ref}"
await download_file(url, cache_path)
await download_file_from_ipfs_or_connector(ref, cache_path, "data")

Check warning on line 409 in src/aleph/vm/storage.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/storage.py#L409

Added line #L409 was not covered by tests

await chown_to_jailman(cache_path)
return cache_path

Expand Down
1 change: 1 addition & 0 deletions tests/supervisor/test_checkpayment.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ async def compute_required_flow(executions):
async def test_not_enough_flow(mocker, fake_instance_content):
mocker.patch.object(settings, "ALLOW_VM_NETWORKING", False)
mocker.patch.object(settings, "PAYMENT_RECEIVER_ADDRESS", "0xD39C335404a78E0BDCf6D50F29B86EFd57924288")
mocker.patch.object(settings, "IPFS_SERVER", "https://ipfs.io/ipfs")
mock_community_wallet_address = "0x23C7A99d7AbebeD245d044685F1893aeA4b5Da90"
mocker.patch("aleph.vm.orchestrator.tasks.get_community_wallet_address", return_value=mock_community_wallet_address)

Expand Down
6 changes: 4 additions & 2 deletions tests/supervisor/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from aleph.vm.models import VmExecution
from aleph.vm.orchestrator import metrics
from aleph.vm.orchestrator.messages import load_updated_message
from aleph.vm.storage import get_message
from aleph.vm.storage import get_executable_message
from aleph.vm.utils import fix_message_validation


Expand All @@ -33,6 +33,7 @@ async def test_create_execution(mocker):
mock_settings.FAKE_DATA_PROGRAM = mock_settings.BENCHMARK_FAKE_DATA_PROGRAM
mock_settings.ALLOW_VM_NETWORKING = False
mock_settings.USE_JAILER = False
mock_settings.IPFS_SERVER = "https://ipfs.io/ipfs"

logging.basicConfig(level=logging.DEBUG)
mock_settings.PRINT_SYSTEM_LOGS = True
Expand All @@ -46,7 +47,7 @@ async def test_create_execution(mocker):
await metrics.create_tables(engine)

vm_hash = ItemHash("cafecafecafecafecafecafecafecafecafecafecafecafecafecafecafecafe")
message = await get_message(ref=vm_hash)
message = await get_executable_message(ref=vm_hash)

execution = VmExecution(
vm_hash=vm_hash,
Expand Down Expand Up @@ -78,6 +79,7 @@ async def test_create_execution_online(vm_hash: ItemHash = None):
"""

vm_hash = vm_hash or settings.CHECK_FASTAPI_VM_ID
settings.IPFS_SERVER = "https://ipfs.io/ipfs"

# Ensure that the settings are correct and required files present.
settings.setup()
Expand Down
5 changes: 3 additions & 2 deletions tests/supervisor/test_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from aleph.vm.models import VmExecution
from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator
from aleph.vm.orchestrator import metrics
from aleph.vm.storage import get_message
from aleph.vm.storage import get_executable_message
from aleph.vm.systemd import SystemDManager
from aleph.vm.vm_type import VmType

Expand Down Expand Up @@ -55,6 +55,7 @@ async def test_create_instance():
# settings.FAKE_INSTANCE_MESSAGE
settings.ALLOW_VM_NETWORKING = True
settings.USE_JAILER = True
settings.IPFS_SERVER = "https://ipfs.io/ipfs"

logging.basicConfig(level=logging.DEBUG)
settings.PRINT_SYSTEM_LOGS = True
Expand All @@ -70,7 +71,7 @@ async def test_create_instance():
await metrics.create_tables(engine)

vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
message = await get_message(ref=vm_hash)
message = await get_executable_message(ref=vm_hash)

mock_systemd_manager = MockSystemDManager()

Expand Down
7 changes: 4 additions & 3 deletions tests/supervisor/test_qemu_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from aleph.vm.models import VmExecution
from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator
from aleph.vm.orchestrator import metrics
from aleph.vm.storage import get_message
from aleph.vm.storage import get_executable_message
from aleph.vm.systemd import SystemDManager
from aleph.vm.vm_type import VmType

Expand Down Expand Up @@ -69,7 +69,7 @@ async def test_create_qemu_instance():
await metrics.create_tables(engine)

vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
message = await get_message(ref=vm_hash)
message = await get_executable_message(ref=vm_hash)

mock_systemd_manager = MockSystemDManager()

Expand Down Expand Up @@ -112,6 +112,7 @@ async def test_create_qemu_instance_online():
settings.ENABLE_CONFIDENTIAL_COMPUTING = False
settings.ALLOW_VM_NETWORKING = True
settings.USE_JAILER = False
settings.IPFS_SERVER = "https://ipfs.io/ipfs"

logging.basicConfig(level=logging.DEBUG)

Expand All @@ -126,7 +127,7 @@ async def test_create_qemu_instance_online():
await metrics.create_tables(engine)

vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
message = await get_message(ref=vm_hash)
message = await get_executable_message(ref=vm_hash)

mock_systemd_manager = MockSystemDManager()

Expand Down
12 changes: 6 additions & 6 deletions tests/supervisor/views/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from aleph.vm.conf import settings
from aleph.vm.orchestrator.metrics import ExecutionRecord
from aleph.vm.orchestrator.supervisor import setup_webapp
from aleph.vm.storage import get_message
from aleph.vm.storage import get_executable_message
from aleph.vm.utils.logs import EntryDict
from aleph.vm.utils.test_helpers import (
generate_signer_and_signed_headers_for_operation,
Expand Down Expand Up @@ -72,7 +72,7 @@
settings.setup()

vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
instance_message = await get_message(ref=vm_hash)
instance_message = await get_executable_message(ref=vm_hash)

fake_vm_pool = mocker.Mock(
executions={
Expand Down Expand Up @@ -115,7 +115,7 @@
settings.setup()

vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
instance_message = await get_message(ref=vm_hash)
instance_message = await get_executable_message(ref=vm_hash)

Check warning on line 118 in tests/supervisor/views/test_operator.py

View check run for this annotation

Codecov / codecov/patch

tests/supervisor/views/test_operator.py#L118

Added line #L118 was not covered by tests

fake_vm_pool = mocker.Mock(
executions={
Expand Down Expand Up @@ -154,7 +154,7 @@
settings.setup()

vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
instance_message = await get_message(ref=vm_hash)
instance_message = await get_executable_message(ref=vm_hash)

fake_vm_pool = mocker.AsyncMock(
executions={
Expand Down Expand Up @@ -190,7 +190,7 @@
settings.setup()

vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
instance_message = await get_message(ref=vm_hash)
instance_message = await get_executable_message(ref=vm_hash)

fake_vm_pool = mocker.Mock(
executions={
Expand Down Expand Up @@ -232,7 +232,7 @@
settings.setup()

vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
instance_message = await get_message(ref=vm_hash)
instance_message = await get_executable_message(ref=vm_hash)

class FakeExecution:
message = instance_message.content
Expand Down
Loading