Skip to content

Commit 223fd93

Browse files
committed
Fix: Trusted 3rd party IPFS server
IPFS Kubo is now present on all CRNs but unused. Downloading files from this service would be more reliable than using third party and provide content integrity checks. This bypasses the vm-connector when fetching resources from IPFS.
1 parent 73013a2 commit 223fd93

File tree

8 files changed

+87
-39
lines changed

8 files changed

+87
-39
lines changed

src/aleph/vm/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ class Settings(BaseSettings):
126126
WATCH_FOR_UPDATES: bool = True
127127

128128
API_SERVER: str = "https://official.aleph.cloud"
129+
IPFS_SERVER: Url = Url("http://localhost:8080/ipfs")
129130
# Connect to the Quad9 VPN provider using their IPv4 and IPv6 addresses.
130131
CONNECTIVITY_IPV4_URL: str = "https://9.9.9.9/"
131132
CONNECTIVITY_IPV6_URL: str = "https://[2620:fe::fe]/"

src/aleph/vm/orchestrator/messages.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
from aleph_message.status import MessageStatus
88

99
from aleph.vm.conf import settings
10-
from aleph.vm.storage import get_latest_amend, get_message
10+
from aleph.vm.storage import get_executable_message, get_latest_amend
1111

1212

1313
async def try_get_message(ref: str) -> ExecutableMessage:
1414
"""Get the message or raise an aiohttp HTTP error"""
1515
try:
16-
return await get_message(ref)
16+
return await get_executable_message(ref)
1717
except ClientConnectorError as error:
1818
raise HTTPServiceUnavailable(reason="Aleph Connector unavailable") from error
1919
except ClientResponseError as error:

src/aleph/vm/storage.py

Lines changed: 66 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,19 @@
1010
import logging
1111
import re
1212
import sys
13-
import uuid
1413
from datetime import datetime, timezone
1514
from pathlib import Path
1615
from shutil import copy2, make_archive
1716
from subprocess import CalledProcessError
1817

1918
import aiohttp
2019
from aleph_message.models import (
20+
AlephMessage,
2121
InstanceMessage,
2222
ItemHash,
23+
ItemType,
2324
ProgramMessage,
25+
StoreMessage,
2426
parse_message,
2527
)
2628
from aleph_message.models.execution.instance import RootfsVolume
@@ -133,6 +135,32 @@ async def download_file(url: str, local_path: Path) -> None:
133135
tmp_path.unlink(missing_ok=True)
134136

135137

138+
async def download_file_from_ipfs_or_connector(ref: str, cache_path: Path, filetype: str) -> None:
139+
"""Download a file from the IPFS Gateway if possible, else from the vm-connector."""
140+
141+
if cache_path.is_file():
142+
logger.debug(f"File already exists: {cache_path}")
143+
return
144+
145+
message: StoreMessage = await get_store_message(ref)
146+
147+
if message.content.item_type == ItemType.ipfs:
148+
# Download IPFS files from the IPFS gateway directly
149+
cid = message.content.item_hash
150+
url = f"{settings.IPFS_SERVER}/{cid}"
151+
await download_file(url, cache_path)
152+
else:
153+
# Download via the vm-connector
154+
path_mapping = {
155+
"runtime": "/download/runtime",
156+
"code": "/download/code",
157+
"data": "/download/data",
158+
}
159+
path = path_mapping[filetype]
160+
url = f"{settings.CONNECTOR_URL}{path}/{ref}"
161+
await download_file(url, cache_path)
162+
163+
136164
async def get_latest_amend(item_hash: str) -> str:
137165
if settings.FAKE_DATA_PROGRAM:
138166
return item_hash
@@ -146,7 +174,26 @@ async def get_latest_amend(item_hash: str) -> str:
146174
return result or item_hash
147175

148176

149-
async def get_message(ref: str) -> ProgramMessage | InstanceMessage:
177+
async def load_message(path: Path) -> AlephMessage:
178+
"""Load a message from the cache on disk."""
179+
with open(path) as cache_file:
180+
msg = json.load(cache_file)
181+
182+
if path in (settings.FAKE_DATA_MESSAGE, settings.FAKE_INSTANCE_MESSAGE):
183+
# Ensure validation passes while tweaking message content
184+
msg = fix_message_validation(msg)
185+
186+
return parse_message(message_dict=msg)
187+
188+
189+
async def get_message(ref: str) -> AlephMessage:
190+
cache_path = (Path(settings.MESSAGE_CACHE) / ref).with_suffix(".json")
191+
url = f"{settings.CONNECTOR_URL}/download/message/{ref}"
192+
await download_file(url, cache_path)
193+
return await load_message(cache_path)
194+
195+
196+
async def get_executable_message(ref: str) -> ProgramMessage | InstanceMessage:
150197
if ref == settings.FAKE_INSTANCE_ID:
151198
logger.debug("Using the fake instance message since the ref matches")
152199
cache_path = settings.FAKE_INSTANCE_MESSAGE
@@ -158,23 +205,22 @@ async def get_message(ref: str) -> ProgramMessage | InstanceMessage:
158205
url = f"{settings.CONNECTOR_URL}/download/message/{ref}"
159206
await download_file(url, cache_path)
160207

161-
with open(cache_path) as cache_file:
162-
msg = json.load(cache_file)
208+
return await load_message(cache_path)
163209

164-
if cache_path in (settings.FAKE_DATA_MESSAGE, settings.FAKE_INSTANCE_MESSAGE):
165-
# Ensure validation passes while tweaking message content
166-
msg = fix_message_validation(msg)
167210

168-
result = parse_message(message_dict=msg)
169-
assert isinstance(result, InstanceMessage | ProgramMessage), "Parsed message is not executable"
170-
return result
211+
async def get_store_message(ref: str) -> StoreMessage:
212+
message = await get_message(ref)
213+
if not isinstance(message, StoreMessage):
214+
msg = f"Expected a store message, got {message.type}"
215+
raise ValueError(msg)
216+
return message
171217

172218

173219
async def get_code_path(ref: str) -> Path:
174220
if settings.FAKE_DATA_PROGRAM:
175221
archive_path = Path(settings.FAKE_DATA_PROGRAM)
176222

177-
encoding: Encoding = (await get_message(ref="fake-message")).content.code.encoding
223+
encoding: Encoding = (await get_executable_message(ref="fake-message")).content.code.encoding
178224
if encoding == Encoding.squashfs:
179225
squashfs_path = Path(archive_path.name + ".squashfs")
180226
squashfs_path.unlink(missing_ok=True)
@@ -191,8 +237,7 @@ async def get_code_path(ref: str) -> Path:
191237
raise ValueError(msg)
192238

193239
cache_path = Path(settings.CODE_CACHE) / ref
194-
url = f"{settings.CONNECTOR_URL}/download/code/{ref}"
195-
await download_file(url, cache_path)
240+
await download_file_from_ipfs_or_connector(ref, cache_path, "code")
196241
return cache_path
197242

198243

@@ -203,8 +248,7 @@ async def get_data_path(ref: str) -> Path:
203248
return Path(f"{data_dir}.zip")
204249

205250
cache_path = Path(settings.DATA_CACHE) / ref
206-
url = f"{settings.CONNECTOR_URL}/download/data/{ref}"
207-
await download_file(url, cache_path)
251+
await download_file_from_ipfs_or_connector(ref, cache_path, "data")
208252
return cache_path
209253

210254

@@ -224,11 +268,7 @@ async def get_runtime_path(ref: str) -> Path:
224268
return Path(settings.FAKE_DATA_RUNTIME)
225269

226270
cache_path = Path(settings.RUNTIME_CACHE) / ref
227-
url = f"{settings.CONNECTOR_URL}/download/runtime/{ref}"
228-
229-
if not cache_path.is_file():
230-
# File does not exist, download it
231-
await download_file(url, cache_path)
271+
await download_file_from_ipfs_or_connector(ref, cache_path, "runtime")
232272

233273
await check_squashfs_integrity(cache_path)
234274
await chown_to_jailman(cache_path)
@@ -242,8 +282,10 @@ async def get_rootfs_base_path(ref: ItemHash) -> Path:
242282
return Path(settings.FAKE_INSTANCE_BASE)
243283

244284
cache_path = Path(settings.RUNTIME_CACHE) / ref
245-
url = f"{settings.CONNECTOR_URL}/download/runtime/{ref}"
246-
await download_file(url, cache_path)
285+
286+
# if not cache_path.is_file():
287+
await download_file_from_ipfs_or_connector(ref, cache_path, "runtime")
288+
247289
await chown_to_jailman(cache_path)
248290
return cache_path
249291

@@ -364,8 +406,8 @@ async def get_existing_file(ref: str) -> Path:
364406
return Path(settings.FAKE_DATA_VOLUME)
365407

366408
cache_path = Path(settings.DATA_CACHE) / ref
367-
url = f"{settings.CONNECTOR_URL}/download/data/{ref}"
368-
await download_file(url, cache_path)
409+
await download_file_from_ipfs_or_connector(ref, cache_path, "data")
410+
369411
await chown_to_jailman(cache_path)
370412
return cache_path
371413

tests/supervisor/test_checkpayment.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ async def compute_required_flow(executions):
149149
async def test_not_enough_flow(mocker, fake_instance_content):
150150
mocker.patch.object(settings, "ALLOW_VM_NETWORKING", False)
151151
mocker.patch.object(settings, "PAYMENT_RECEIVER_ADDRESS", "0xD39C335404a78E0BDCf6D50F29B86EFd57924288")
152+
mocker.patch.object(settings, "IPFS_SERVER", "https://ipfs.io/ipfs")
152153
mock_community_wallet_address = "0x23C7A99d7AbebeD245d044685F1893aeA4b5Da90"
153154
mocker.patch("aleph.vm.orchestrator.tasks.get_community_wallet_address", return_value=mock_community_wallet_address)
154155

tests/supervisor/test_execution.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from aleph.vm.models import VmExecution
1313
from aleph.vm.orchestrator import metrics
1414
from aleph.vm.orchestrator.messages import load_updated_message
15-
from aleph.vm.storage import get_message
15+
from aleph.vm.storage import get_executable_message
1616
from aleph.vm.utils import fix_message_validation
1717

1818

@@ -33,6 +33,7 @@ async def test_create_execution(mocker):
3333
mock_settings.FAKE_DATA_PROGRAM = mock_settings.BENCHMARK_FAKE_DATA_PROGRAM
3434
mock_settings.ALLOW_VM_NETWORKING = False
3535
mock_settings.USE_JAILER = False
36+
mock_settings.IPFS_SERVER = "https://ipfs.io/ipfs"
3637

3738
logging.basicConfig(level=logging.DEBUG)
3839
mock_settings.PRINT_SYSTEM_LOGS = True
@@ -46,7 +47,7 @@ async def test_create_execution(mocker):
4647
await metrics.create_tables(engine)
4748

4849
vm_hash = ItemHash("cafecafecafecafecafecafecafecafecafecafecafecafecafecafecafecafe")
49-
message = await get_message(ref=vm_hash)
50+
message = await get_executable_message(ref=vm_hash)
5051

5152
execution = VmExecution(
5253
vm_hash=vm_hash,
@@ -78,6 +79,7 @@ async def test_create_execution_online(vm_hash: ItemHash = None):
7879
"""
7980

8081
vm_hash = vm_hash or settings.CHECK_FASTAPI_VM_ID
82+
settings.IPFS_SERVER = "https://ipfs.io/ipfs"
8183

8284
# Ensure that the settings are correct and required files present.
8385
settings.setup()

tests/supervisor/test_instance.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from aleph.vm.models import VmExecution
1414
from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator
1515
from aleph.vm.orchestrator import metrics
16-
from aleph.vm.storage import get_message
16+
from aleph.vm.storage import get_executable_message
1717
from aleph.vm.systemd import SystemDManager
1818
from aleph.vm.vm_type import VmType
1919

@@ -55,6 +55,7 @@ async def test_create_instance():
5555
# settings.FAKE_INSTANCE_MESSAGE
5656
settings.ALLOW_VM_NETWORKING = True
5757
settings.USE_JAILER = True
58+
settings.IPFS_SERVER = "https://ipfs.io/ipfs"
5859

5960
logging.basicConfig(level=logging.DEBUG)
6061
settings.PRINT_SYSTEM_LOGS = True
@@ -70,7 +71,7 @@ async def test_create_instance():
7071
await metrics.create_tables(engine)
7172

7273
vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
73-
message = await get_message(ref=vm_hash)
74+
message = await get_executable_message(ref=vm_hash)
7475

7576
mock_systemd_manager = MockSystemDManager()
7677

tests/supervisor/test_qemu_instance.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from aleph.vm.models import VmExecution
1414
from aleph.vm.network.hostnetwork import Network, make_ipv6_allocator
1515
from aleph.vm.orchestrator import metrics
16-
from aleph.vm.storage import get_message
16+
from aleph.vm.storage import get_executable_message
1717
from aleph.vm.systemd import SystemDManager
1818
from aleph.vm.vm_type import VmType
1919

@@ -69,7 +69,7 @@ async def test_create_qemu_instance():
6969
await metrics.create_tables(engine)
7070

7171
vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
72-
message = await get_message(ref=vm_hash)
72+
message = await get_executable_message(ref=vm_hash)
7373

7474
mock_systemd_manager = MockSystemDManager()
7575

@@ -112,6 +112,7 @@ async def test_create_qemu_instance_online():
112112
settings.ENABLE_CONFIDENTIAL_COMPUTING = False
113113
settings.ALLOW_VM_NETWORKING = True
114114
settings.USE_JAILER = False
115+
settings.IPFS_SERVER = "https://ipfs.io/ipfs"
115116

116117
logging.basicConfig(level=logging.DEBUG)
117118

@@ -126,7 +127,7 @@ async def test_create_qemu_instance_online():
126127
await metrics.create_tables(engine)
127128

128129
vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
129-
message = await get_message(ref=vm_hash)
130+
message = await get_executable_message(ref=vm_hash)
130131

131132
mock_systemd_manager = MockSystemDManager()
132133

tests/supervisor/views/test_operator.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from aleph.vm.conf import settings
1515
from aleph.vm.orchestrator.metrics import ExecutionRecord
1616
from aleph.vm.orchestrator.supervisor import setup_webapp
17-
from aleph.vm.storage import get_message
17+
from aleph.vm.storage import get_executable_message
1818
from aleph.vm.utils.logs import EntryDict
1919
from aleph.vm.utils.test_helpers import (
2020
generate_signer_and_signed_headers_for_operation,
@@ -72,7 +72,7 @@ async def test_operator_confidential_initialize_already_running(aiohttp_client,
7272
settings.setup()
7373

7474
vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
75-
instance_message = await get_message(ref=vm_hash)
75+
instance_message = await get_executable_message(ref=vm_hash)
7676

7777
fake_vm_pool = mocker.Mock(
7878
executions={
@@ -115,7 +115,7 @@ async def test_operator_expire(aiohttp_client, mocker):
115115
settings.setup()
116116

117117
vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
118-
instance_message = await get_message(ref=vm_hash)
118+
instance_message = await get_executable_message(ref=vm_hash)
119119

120120
fake_vm_pool = mocker.Mock(
121121
executions={
@@ -154,7 +154,7 @@ async def test_operator_stop(aiohttp_client, mocker):
154154
settings.setup()
155155

156156
vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
157-
instance_message = await get_message(ref=vm_hash)
157+
instance_message = await get_executable_message(ref=vm_hash)
158158

159159
fake_vm_pool = mocker.AsyncMock(
160160
executions={
@@ -190,7 +190,7 @@ async def test_operator_confidential_initialize_not_confidential(aiohttp_client,
190190
settings.setup()
191191

192192
vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
193-
instance_message = await get_message(ref=vm_hash)
193+
instance_message = await get_executable_message(ref=vm_hash)
194194

195195
fake_vm_pool = mocker.Mock(
196196
executions={
@@ -232,7 +232,7 @@ async def test_operator_confidential_initialize(aiohttp_client):
232232
settings.setup()
233233

234234
vm_hash = ItemHash(settings.FAKE_INSTANCE_ID)
235-
instance_message = await get_message(ref=vm_hash)
235+
instance_message = await get_executable_message(ref=vm_hash)
236236

237237
class FakeExecution:
238238
message = instance_message.content

0 commit comments

Comments
 (0)