-
Notifications
You must be signed in to change notification settings - Fork 114
Add zenoh queryables to the kraken service #3657
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Reviewer's GuideIntroduce a Zenoh integration layer for the Kraken service by adding a reusable ZenohRouter helper, wiring it into the FastAPI app lifecycle, and exposing existing v2 HTTP endpoints (container, extension, jobs, manifest) as Zenoh queryables without changing their core business logic. Sequence diagram for handling a Zenoh query to a Kraken HTTP-backed endpointsequenceDiagram
actor QC as ZenohClient
participant ZN as ZenohNetwork
participant ZS as ZenohSession
participant ZR as ZenohRouter_manifest
participant EP as fetch_consolidated
QC->>ZN: send query
ZN->>ZS: deliver query for key_expr kraken/manifest/consolidated
ZS->>ZR: invoke registered queryable wrapper
activate ZR
ZR->>ZR: extract params from query
ZR->>EP: call async endpoint(**params)
activate EP
EP-->>ZR: list[RepositoryEntry]
deactivate EP
ZR->>ZS: query.reply(key_expr, json.dumps(response))
deactivate ZR
ZS->>ZN: send reply sample
ZN-->>QC: deliver response payload
Class diagram for Zenoh helper utilities and updated routersclassDiagram
class ZenohSession {
- zenoh.Session session
- zenoh.Config config
- ThreadPoolExecutor _executor
+ __init__() void
+ zenoh_config() void
+ close() void
}
class ZenohRouter {
- str prefix
- List~Tuple~str, Callable~~~ routes
+ __init__(prefix str)
+ queryable() Callable
+ declare() void
+ include_router(router ZenohRouter) void
}
class FastAPI {
}
class APIRouter {
}
class route_info_decorator {
<<function>>
}
class apply_route_decorator {
<<function>>
+ apply_route_decorator(app T) T
}
class original_container_router_v2 {
<<APIRouter>>
}
class container_router_v2 {
<<APIRouter (decorated)>>
}
class original_extension_router_v2 {
<<APIRouter>>
}
class extension_router_v2 {
<<APIRouter (decorated)>>
}
class original_jobs_router_v2 {
<<APIRouter>>
}
class jobs_router_v2 {
<<APIRouter (decorated)>>
}
class original_manifest_router_v2 {
<<APIRouter>>
}
class manifest_router_v2 {
<<APIRouter (decorated)>>
}
class zenoh_container_router {
<<ZenohRouter>>
}
class zenoh_extension_router {
<<ZenohRouter>>
}
class zenoh_jobs_router {
<<ZenohRouter>>
}
class zenoh_manifest_router {
<<ZenohRouter>>
}
class zenoh_router {
<<ZenohRouter kraken root>>
}
ZenohSession <.. ZenohRouter : uses zenoh_session
route_info_decorator --> APIRouter : wraps HTTP verb methods
apply_route_decorator --> route_info_decorator
apply_route_decorator --> original_container_router_v2 : decorates
original_container_router_v2 <|-- container_router_v2
apply_route_decorator --> original_extension_router_v2 : decorates
original_extension_router_v2 <|-- extension_router_v2
apply_route_decorator --> original_jobs_router_v2 : decorates
original_jobs_router_v2 <|-- jobs_router_v2
apply_route_decorator --> original_manifest_router_v2 : decorates
original_manifest_router_v2 <|-- manifest_router_v2
zenoh_container_router --> container_router_v2 : queryable() on endpoints
zenoh_extension_router --> extension_router_v2 : queryable() on endpoints
zenoh_jobs_router --> jobs_router_v2 : queryable() on endpoints
zenoh_manifest_router --> manifest_router_v2 : queryable() on endpoints
zenoh_router --> zenoh_container_router : include_router
zenoh_router --> zenoh_extension_router : include_router
zenoh_router --> zenoh_jobs_router : include_router
zenoh_router --> zenoh_manifest_router : include_router
FastAPI --> zenoh_router : declares queryables in lifespan
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- The interaction between
apply_route_decoratorand@zenoh_*_router.queryable()looks off:route_info_decoratorsets_route_pathon the original function but returns the FastAPI-decorated wrapper without that attribute, and since@zenoh_router.queryable()is applied last, it sees a function with no_route_pathso all Zenoh queryables end up with an empty path; consider propagating_route_pathonto the wrapper returned bydeco(...)or flipping the decorator order so the Zenoh decorator wraps the original function before FastAPI. - In
ZenohRouter.queryable, each query usesasyncio.runinside a thread, which creates a new event loop per call and assumes the handler is async; you may want to support both sync and async handlers and reuse an event loop (e.g., withasyncio.run_coroutine_threadsafeon a long-lived loop) to avoid the overhead and potential issues of repeatedasyncio.run. - The Zenoh configuration in
ZenohSession.zenoh_configis currently hardcoded (e.g.tcp/127.0.0.1:7447,mode: client); if this is expected to run in different environments, consider wiring these values to existing configuration mechanisms so the service can be pointed at different Zenoh deployments without code changes.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The interaction between `apply_route_decorator` and `@zenoh_*_router.queryable()` looks off: `route_info_decorator` sets `_route_path` on the original function but returns the FastAPI-decorated wrapper without that attribute, and since `@zenoh_router.queryable()` is applied last, it sees a function with no `_route_path` so all Zenoh queryables end up with an empty path; consider propagating `_route_path` onto the wrapper returned by `deco(...)` or flipping the decorator order so the Zenoh decorator wraps the original function before FastAPI.
- In `ZenohRouter.queryable`, each query uses `asyncio.run` inside a thread, which creates a new event loop per call and assumes the handler is async; you may want to support both sync and async handlers and reuse an event loop (e.g., with `asyncio.run_coroutine_threadsafe` on a long-lived loop) to avoid the overhead and potential issues of repeated `asyncio.run`.
- The Zenoh configuration in `ZenohSession.zenoh_config` is currently hardcoded (e.g. `tcp/127.0.0.1:7447`, `mode: client`); if this is expected to run in different environments, consider wiring these values to existing configuration mechanisms so the service can be pointed at different Zenoh deployments without code changes.
## Individual Comments
### Comment 1
<location> `core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py:19-21` </location>
<code_context>
+ config: zenoh.Config
+ _executor: concurrent.futures.ThreadPoolExecutor | None = None
+
+ def __init__(self) -> None:
+ self.zenoh_config()
+ self.session = zenoh.open(self.config)
+
+ self._executor = concurrent.futures.ThreadPoolExecutor(
</code_context>
<issue_to_address>
**issue (bug_risk):** Guard against failures when establishing the Zenoh session at import time.
Calling `zenoh.open(self.config)` in `__init__` means an unavailable broker or invalid config will raise immediately and can prevent the service from starting cleanly. Please wrap this in try/except, log the failure, and leave `self.session` as `None` so that downstream code (e.g. `declare()`) can detect the missing session and skip registration instead of crashing the process.
</issue_to_address>
### Comment 2
<location> `core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py:71-75` </location>
<code_context>
+ zenoh_path = ""
+
+ def wrapper(query: zenoh.Query) -> None:
+ params = dict(query.parameters) # type: ignore
+
+ async def _handle_async() -> None:
+ try:
+ response = await func(**params)
+ if response is not None:
+ query.reply(query.selector.key_expr, json.dumps(response, default=str))
</code_context>
<issue_to_address>
**question (bug_risk):** Passing raw Zenoh parameters directly to FastAPI endpoints may cause type/shape mismatches.
`query.parameters` will typically be strings/a flat mapping, but here they’re unpacked directly into the endpoint callable, skipping FastAPI’s normal parsing and validation. This risks runtime errors or incorrect types for parameters that should be ints/bools or have required/optional semantics. Consider adding an adapter that performs basic coercion/validation before calling the endpoint, or restricting which endpoints are exposed over Zenoh and validating their parameters explicitly.
</issue_to_address>
### Comment 3
<location> `core/libs/commonwealth/src/commonwealth/utils/zenoh_helper.py:65-69` </location>
<code_context>
def queryable(self) -> Callable[[Callable[..., Any]], Callable[[zenoh.Query], None]]:
def decorator(func: Callable[..., Any]) -> Callable[[zenoh.Query], None]:
route_path = getattr(func, "_route_path", None)
if route_path is not None:
zenoh_path = sanitize_route_path(route_path)
else:
zenoh_path = ""
def wrapper(query: zenoh.Query) -> None:
params = dict(query.parameters) # type: ignore
async def _handle_async() -> None:
try:
response = await func(**params)
if response is not None:
query.reply(query.selector.key_expr, json.dumps(response, default=str))
except Exception as e:
error_response = {"error": str(e)}
query.reply(query.selector.key_expr, json.dumps(error_response))
def run_async() -> None:
asyncio.run(_handle_async())
if zenoh_session._executor:
zenoh_session._executor.submit(run_async)
self.routes.append((zenoh_path, wrapper)) # type: ignore[arg-type]
return wrapper
return decorator
</code_context>
<issue_to_address>
**suggestion (code-quality):** Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))
```suggestion
zenoh_path = sanitize_route_path(route_path) if route_path is not None else ""
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
adf1cba to
9418c5e
Compare
| "mode": "client", | ||
| "connect/endpoints": ["tcp/127.0.0.1:7447"], | ||
| "adminspace": {"enabled": True}, | ||
| "metadata": {"name": "zenoh-queryables"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be the name of the service, otherwise all services will show as zenoh-queryables when fetching the metadata of each client, no ? (Check BlueOS zenoh network page)
also, can we somehow use the same client of the log ? or share the clients ?
We already have on in commonwealth. I believe that's better to organize it first and share a single client for both, not sure if it's the best idea, but it sounds better than having multiple clients per service.
| zenoh_router.include_router(zenoh_container_router) | ||
| zenoh_router.include_router(zenoh_extension_router) | ||
| zenoh_router.include_router(zenoh_jobs_router) | ||
| zenoh_router.include_router(zenoh_manifest_router) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This include_router sounds a bit unnecessary, you could extract the router via something like...
for route in app.router.routes:
if isinstance(route, APIRoute):
print(route.path)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But those are ZenohRouters, they are not included in the fastapi routers. I had to declare one ZenohRouter in each file that had an APIRoute.
Part of #3417
test image: nicoschmdt/blueos-core:queryables-kraken
Summary by Sourcery
Integrate zenoh queryables into the Kraken service and expose key v2 API endpoints over zenoh alongside existing HTTP routes.
New Features:
Enhancements: