Skip to content

Commit 9418c5e

Browse files
committed
kraken: add support for zenoh queryables
1 parent cf15a2b commit 9418c5e

File tree

6 files changed

+74
-11
lines changed

6 files changed

+74
-11
lines changed

core/services/kraken/api/app.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
from os import path
22

3+
from contextlib import asynccontextmanager
4+
from typing import AsyncGenerator
35
from commonwealth.utils.apis import GenericErrorHandlingRoute
6+
from commonwealth.utils.zenoh_helper import ZenohRouter, zenoh_session
47
from fastapi import FastAPI
58
from fastapi.responses import RedirectResponse
69
from fastapi.staticfiles import StaticFiles
@@ -14,8 +17,19 @@
1417
index_router_v2,
1518
jobs_router_v2,
1619
manifest_router_v2,
20+
zenoh_container_router,
21+
zenoh_extension_router,
22+
zenoh_jobs_router,
23+
zenoh_manifest_router,
1724
)
1825

26+
27+
@asynccontextmanager
28+
async def lifespan(fastapi_app: FastAPI) -> AsyncGenerator[None, None]: # pylint: disable=unused-argument
29+
yield
30+
zenoh_session.close()
31+
32+
1933
application = FastAPI(
2034
title="Kraken API",
2135
description="Kraken is the BlueOS service responsible for installing and managing extensions.",
@@ -33,7 +47,16 @@
3347
application.include_router(jobs_router_v2)
3448
application.include_router(manifest_router_v2)
3549

36-
application = VersionedFastAPI(application, prefix_format="/v{major}.{minor}", enable_latest=True)
50+
application = VersionedFastAPI(application, prefix_format="/v{major}.{minor}", enable_latest=True, lifespan=lifespan)
51+
52+
# Zenoh
53+
zenoh_router = ZenohRouter("kraken")
54+
zenoh_router.include_router(zenoh_container_router)
55+
zenoh_router.include_router(zenoh_extension_router)
56+
zenoh_router.include_router(zenoh_jobs_router)
57+
zenoh_router.include_router(zenoh_manifest_router)
58+
59+
zenoh_router.declare()
3760

3861

3962
@application.get("/", status_code=200)
Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,18 @@
11
# pylint: disable=W0406
2-
from .container import container_router_v2
3-
from .extension import extension_router_v2
2+
from .container import container_router_v2, zenoh_container_router
3+
from .extension import extension_router_v2, zenoh_extension_router
44
from .index import index_router_v2
5-
from .jobs import jobs_router_v2
6-
from .manifest import manifest_router_v2
5+
from .jobs import jobs_router_v2, zenoh_jobs_router
6+
from .manifest import manifest_router_v2, zenoh_manifest_router
77

8-
__all__ = ["container_router_v2", "extension_router_v2", "index_router_v2", "jobs_router_v2", "manifest_router_v2"]
8+
__all__ = [
9+
"container_router_v2",
10+
"extension_router_v2",
11+
"index_router_v2",
12+
"jobs_router_v2",
13+
"manifest_router_v2",
14+
"zenoh_container_router",
15+
"zenoh_extension_router",
16+
"zenoh_jobs_router",
17+
"zenoh_manifest_router",
18+
]

core/services/kraken/api/v2/routers/container.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Any, Callable, Optional, Tuple
33

44
from commonwealth.utils.streaming import streamer, timeout_streamer
5+
from commonwealth.utils.zenoh_helper import ZenohRouter, apply_route_decorator
56
from fastapi import APIRouter, HTTPException, status
67
from fastapi.responses import StreamingResponse
78
from fastapi_versioning import versioned_api_route
@@ -10,12 +11,15 @@
1011
from harbor.exceptions import ContainerNotFound
1112
from harbor.models import ContainerModel, ContainerUsageModel
1213

13-
container_router_v2 = APIRouter(
14+
original_container_router_v2 = APIRouter(
1415
prefix="/container",
1516
tags=["container_v2"],
1617
route_class=versioned_api_route(2, 0),
1718
responses={status.HTTP_404_NOT_FOUND: {"description": "Not found"}},
1819
)
20+
container_router_v2 = apply_route_decorator(original_container_router_v2)
21+
22+
zenoh_container_router = ZenohRouter("container")
1923

2024

2125
def container_to_http_exception(endpoint: Callable[..., Any]) -> Callable[..., Any]:
@@ -31,6 +35,7 @@ async def wrapper(*args: Tuple[Any], **kwargs: dict[str, Any]) -> Any:
3135
return wrapper
3236

3337

38+
@zenoh_container_router.queryable()
3439
@container_router_v2.get("/", status_code=status.HTTP_200_OK)
3540
@container_to_http_exception
3641
async def list_container() -> list[ContainerModel]:
@@ -40,6 +45,7 @@ async def list_container() -> list[ContainerModel]:
4045
return await ContainerManager.get_running_containers()
4146

4247

48+
@zenoh_container_router.queryable()
4349
@container_router_v2.get("/{container_name}/details", status_code=status.HTTP_200_OK)
4450
@container_to_http_exception
4551
async def fetch_container(container_name: str) -> ContainerModel:
@@ -49,6 +55,7 @@ async def fetch_container(container_name: str) -> ContainerModel:
4955
return await ContainerManager.get_running_container_by_name(container_name)
5056

5157

58+
@zenoh_container_router.queryable()
5259
@container_router_v2.get("/{container_name}/log", status_code=status.HTTP_200_OK)
5360
@container_to_http_exception
5461
async def fetch_log_by_container_name(container_name: str, timeout: Optional[int] = None) -> StreamingResponse:
@@ -64,6 +71,7 @@ async def fetch_log_by_container_name(container_name: str, timeout: Optional[int
6471
return StreamingResponse(streamer(stream, heartbeats=0.1), media_type="text/plain")
6572

6673

74+
@zenoh_container_router.queryable()
6775
@container_router_v2.get("/stats", status_code=status.HTTP_200_OK)
6876
@container_to_http_exception
6977
async def list_stats() -> dict[str, ContainerUsageModel]:
@@ -73,6 +81,7 @@ async def list_stats() -> dict[str, ContainerUsageModel]:
7381
return await ContainerManager.get_containers_stats()
7482

7583

84+
@zenoh_container_router.queryable()
7685
@container_router_v2.get("/{container_name}/stats", status_code=status.HTTP_200_OK)
7786
@container_to_http_exception
7887
async def fetch_stats_by_container_name(container_name: str) -> ContainerUsageModel:

core/services/kraken/api/v2/routers/extension.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Any, Callable, List, Tuple, cast
44

55
from commonwealth.utils.streaming import streamer
6+
from commonwealth.utils.zenoh_helper import ZenohRouter, apply_route_decorator
67
from fastapi import APIRouter, HTTPException, status
78
from fastapi.responses import Response, StreamingResponse
89
from fastapi_versioning import versioned_api_route
@@ -15,12 +16,15 @@
1516
from extension.extension import Extension
1617
from extension.models import ExtensionSource
1718

18-
extension_router_v2 = APIRouter(
19+
original_extension_router_v2 = APIRouter(
1920
prefix="/extension",
2021
tags=["extension_v2"],
2122
route_class=versioned_api_route(2, 0),
2223
responses={status.HTTP_404_NOT_FOUND: {"description": "Not found"}},
2324
)
25+
extension_router_v2 = apply_route_decorator(original_extension_router_v2)
26+
27+
zenoh_extension_router = ZenohRouter("extension")
2428

2529

2630
def extension_to_http_exception(endpoint: Callable[..., Any]) -> Callable[..., Any]:
@@ -42,6 +46,7 @@ async def wrapper(*args: Tuple[Any], **kwargs: dict[str, Any]) -> Any:
4246
return wrapper
4347

4448

49+
@zenoh_extension_router.queryable()
4550
@extension_router_v2.get("/", status_code=status.HTTP_200_OK)
4651
@extension_to_http_exception
4752
async def fetch() -> list[ExtensionSource]:
@@ -52,6 +57,7 @@ async def fetch() -> list[ExtensionSource]:
5257
return [ext.source for ext in extensions]
5358

5459

60+
@zenoh_extension_router.queryable()
5561
@extension_router_v2.get("/{identifier}/details", status_code=status.HTTP_200_OK)
5662
@extension_to_http_exception
5763
async def fetch_by_identifier(identifier: str) -> list[ExtensionSource]:
@@ -62,6 +68,7 @@ async def fetch_by_identifier(identifier: str) -> list[ExtensionSource]:
6268
return [ext.source for ext in extensions]
6369

6470

71+
@zenoh_extension_router.queryable()
6572
@extension_router_v2.get("/{identifier}/{tag}/details", status_code=status.HTTP_200_OK)
6673
@extension_to_http_exception
6774
async def fetch_by_identifier_and_tag(identifier: str, tag: str) -> ExtensionSource:

core/services/kraken/api/v2/routers/jobs.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,21 @@
44

55
from fastapi import APIRouter, Body, HTTPException, status
66
from fastapi_versioning import versioned_api_route
7+
from commonwealth.utils.zenoh_helper import ZenohRouter, apply_route_decorator
78

89
from jobs import JobsManager
910
from jobs.exceptions import JobNotFound
1011
from jobs.models import Job, JobMethod
1112

12-
jobs_router_v2 = APIRouter(
13+
original_jobs_router_v2 = APIRouter(
1314
prefix="/jobs",
1415
tags=["jobs_v2"],
1516
route_class=versioned_api_route(2, 0),
1617
responses={status.HTTP_404_NOT_FOUND: {"description": "Not found"}},
1718
)
19+
jobs_router_v2 = apply_route_decorator(original_jobs_router_v2)
20+
21+
zenoh_jobs_router = ZenohRouter("jobs")
1822

1923

2024
def jobs_to_http_exception(endpoint: Callable[..., Any]) -> Callable[..., Any]:
@@ -40,12 +44,14 @@ async def create(
4044
return job
4145

4246

47+
@zenoh_jobs_router.queryable()
4348
@jobs_router_v2.get("/", status_code=status.HTTP_200_OK)
4449
@jobs_to_http_exception
4550
async def fetch() -> List[Job]:
4651
return JobsManager.get()
4752

4853

54+
@zenoh_jobs_router.queryable()
4955
@jobs_router_v2.get("/{identifier}", status_code=status.HTTP_200_OK)
5056
@jobs_to_http_exception
5157
async def fetch_by_identifier(identifier: str) -> Job:

core/services/kraken/api/v2/routers/manifest.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from fastapi import APIRouter, HTTPException, status
55
from fastapi_versioning import versioned_api_route
6+
from commonwealth.utils.zenoh_helper import ZenohRouter, apply_route_decorator
67

78
from manifest import ManifestManager
89
from manifest.exceptions import (
@@ -19,15 +20,17 @@
1920
UpdateManifestSource,
2021
)
2122

22-
manifest_router_v2 = APIRouter(
23+
original_manifest_router_v2 = APIRouter(
2324
prefix="/manifest",
2425
tags=["manifest_v2"],
2526
route_class=versioned_api_route(2, 0),
2627
responses={status.HTTP_404_NOT_FOUND: {"description": "Not found"}},
2728
)
28-
29+
manifest_router_v2 = apply_route_decorator(original_manifest_router_v2)
2930
manifest_manager = ManifestManager.instance()
3031

32+
zenoh_manifest_router = ZenohRouter("manifest")
33+
3134

3235
def manifest_to_http_exception(endpoint: Callable[..., Any]) -> Callable[..., Any]:
3336
@wraps(endpoint)
@@ -46,6 +49,7 @@ async def wrapper(*args: Tuple[Any], **kwargs: dict[str, Any]) -> Any:
4649
return wrapper
4750

4851

52+
@zenoh_manifest_router.queryable()
4953
@manifest_router_v2.get("/", status_code=status.HTTP_200_OK)
5054
@manifest_to_http_exception
5155
async def fetch(data: bool = True, enabled: bool = False) -> list[Manifest]:
@@ -56,6 +60,7 @@ async def fetch(data: bool = True, enabled: bool = False) -> list[Manifest]:
5660
return await manifest_manager.fetch(data, enabled)
5761

5862

63+
@zenoh_manifest_router.queryable()
5964
@manifest_router_v2.get("/{identifier}/details", status_code=status.HTTP_200_OK)
6065
@manifest_to_http_exception
6166
async def fetch_by_identifier(identifier: str, data: bool = True) -> Manifest:
@@ -65,6 +70,7 @@ async def fetch_by_identifier(identifier: str, data: bool = True) -> Manifest:
6570
return await manifest_manager.fetch_by_identifier(identifier, data)
6671

6772

73+
@zenoh_manifest_router.queryable()
6874
@manifest_router_v2.get("/consolidated", status_code=status.HTTP_200_OK)
6975
@manifest_to_http_exception
7076
async def fetch_consolidated() -> list[RepositoryEntry]:
@@ -75,6 +81,7 @@ async def fetch_consolidated() -> list[RepositoryEntry]:
7581
return await manifest_manager.fetch_consolidated()
7682

7783

84+
@zenoh_manifest_router.queryable()
7885
@manifest_router_v2.get("/tags/{manifest_identifier}/{extension_identifier}/", status_code=status.HTTP_200_OK)
7986
@manifest_to_http_exception
8087
async def fetch_ext_tags_from_manifest(
@@ -88,6 +95,7 @@ async def fetch_ext_tags_from_manifest(
8895
return [str(version) for version in versions]
8996

9097

98+
@zenoh_manifest_router.queryable()
9199
@manifest_router_v2.get("/tags/{extension_identifier}", status_code=status.HTTP_200_OK)
92100
@manifest_to_http_exception
93101
async def fetch_ext_tags_from_consolidated(extension_identifier: str, stable: bool = False) -> list[str]:

0 commit comments

Comments
 (0)