Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion docs/reference/openapi.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
components:
schemas:
BasicAuthentication:
additionalProperties: false
description: User credentials for basic authentication
properties:
password:
description: Password to verify user's identity
title: Password
type: string
username:
description: Unique identifier for user
title: Username
type: string
required:
- username
- password
title: BasicAuthentication
type: object
DeviceModel:
additionalProperties: false
description: Representation of a device
Expand Down Expand Up @@ -224,6 +241,28 @@ components:
- new_state
title: StateChangeRequest
type: object
StompConfig:
additionalProperties: false
description: Config for connecting to stomp broker
properties:
auth:
$ref: '#/components/schemas/BasicAuthentication'
description: Auth information for communicating with STOMP broker, if required
title: Auth
enabled:
default: false
description: True if blueapi should connect to stomp for asynchronous event
publishing
title: Enabled
type: boolean
url:
default: tcp://localhost:61613
format: uri
minLength: 1
title: Url
type: string
title: StompConfig
type: object
Task:
additionalProperties: false
description: Task that will run a plan
Expand Down Expand Up @@ -382,7 +421,7 @@ info:
name: Apache 2.0
url: https://www.apache.org/licenses/LICENSE-2.0.html
title: BlueAPI Control
version: 1.1.3
version: 1.2.0
openapi: 3.1.0
paths:
/config/oidc:
Expand All @@ -401,6 +440,22 @@ paths:
summary: Get Oidc Config
tags:
- Meta
/config/stomp:
get:
description: Retrieve the stomp configuration for the server.
operationId: get_stomp_config_config_stomp_get
responses:
'200':
content:
application/json:
schema:
$ref: '#/components/schemas/StompConfig'
description: Successful Response
'204':
description: No Stomp configured
summary: Get Stomp Config
tags:
- Meta
/devices:
get:
description: Retrieve information about all available devices.
Expand Down
12 changes: 3 additions & 9 deletions helm/blueapi/config_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -453,16 +453,10 @@
"type": "string"
},
"auth": {
"anyOf": [
{
"$ref": "BasicAuthentication"
},
{
"type": "null"
}
],
"$ref": "BasicAuthentication",
"default": null,
"description": "Auth information for communicating with STOMP broker, if required"
"description": "Auth information for communicating with STOMP broker, if required",
"title": "Auth"
}
},
"title": "StompConfig",
Expand Down
10 changes: 2 additions & 8 deletions helm/blueapi/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -862,15 +862,9 @@
"type": "object",
"properties": {
"auth": {
"title": "Auth",
"description": "Auth information for communicating with STOMP broker, if required",
"anyOf": [
{
"$ref": "BasicAuthentication"
},
{
"type": "null"
}
]
"$ref": "BasicAuthentication"
},
"enabled": {
"title": "Enabled",
Expand Down
18 changes: 9 additions & 9 deletions src/blueapi/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,30 +49,30 @@
invoke_without_command=True, context_settings={"auto_envvar_prefix": "BLUEAPI"}
)
@click.version_option(version=__version__, prog_name="blueapi")
@click.option("-H", "--host", type=str)
@click.option(
"-c", "--config", type=Path, help="Path to configuration YAML file", multiple=True
)
@click.pass_context
def main(ctx: click.Context, config: Path | None | tuple[Path, ...]) -> None:
def main(ctx: click.Context, config: tuple[Path, ...], host: str | None):
# if no command is supplied, run with the options passed

# Set umask to DLS standard
os.umask(stat.S_IWOTH)

config_loader = ConfigLoader(ApplicationConfig)
if config is not None:
configs = (config,) if isinstance(config, Path) else config
for path in configs:
if path.exists():
config_loader.use_values_from_yaml(path)
else:
raise FileNotFoundError(f"Cannot find file: {path}")
try:
config_loader.use_values_from_yaml(*config)
except FileNotFoundError as fnfe:
raise ClickException(f"Config file not found: {fnfe.filename}") from fnfe
if host:
config_loader.use_values({"api": {"url": host}})

ctx.ensure_object(dict)
loaded_config: ApplicationConfig = config_loader.load()

set_up_logging(loaded_config.logging)

ctx.ensure_object(dict)
ctx.obj["config"] = loaded_config

if ctx.invoked_subcommand is None:
Expand Down
41 changes: 16 additions & 25 deletions src/blueapi/client/client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import time
from concurrent.futures import Future

from bluesky_stomp.messaging import MessageContext, StompClient
from bluesky_stomp.models import Broker
from observability_utils.tracing import (
get_tracer,
start_as_current_span,
)
from bluesky_stomp.messaging import MessageContext
from observability_utils.tracing import get_tracer, start_as_current_span

from blueapi.config import ApplicationConfig, MissingStompConfigurationError
from blueapi.core.bluesky_types import DataEvent
Expand Down Expand Up @@ -38,15 +34,15 @@ class BlueapiClient:
"""Unified client for controlling blueapi"""

_rest: BlueapiRestClient
_events: EventBusClient | None
_event_bus_client: EventBusClient | None

def __init__(
self,
rest: BlueapiRestClient,
events: EventBusClient | None = None,
):
self._rest = rest
self._events = events
self._event_bus_client = events

@classmethod
def from_config(cls, config: ApplicationConfig) -> "BlueapiClient":
Expand All @@ -56,20 +52,8 @@ def from_config(cls, config: ApplicationConfig) -> "BlueapiClient":
except Exception:
... # Swallow exceptions
rest = BlueapiRestClient(config.api, session_manager=session_manager)
if config.stomp.enabled:
assert config.stomp.url.host is not None, "Stomp URL missing host"
assert config.stomp.url.port is not None, "Stomp URL missing port"
client = StompClient.for_broker(
broker=Broker(
host=config.stomp.url.host,
port=config.stomp.url.port,
auth=config.stomp.auth,
)
)
events = EventBusClient(client)
return cls(rest, events)
else:
return cls(rest)
event_bus = EventBusClient.from_stomp_config(config.stomp)
return cls(rest, event_bus)

@start_as_current_span(TRACER)
def get_plans(self) -> PlanResponse:
Expand Down Expand Up @@ -216,7 +200,7 @@ def run_task(
of task execution.
"""

if self._events is None:
if (event_bus := self._event_bus()) is None:
raise MissingStompConfigurationError(
"Stomp configuration required to run plans is missing or disabled"
)
Expand Down Expand Up @@ -253,8 +237,8 @@ def inner_on_event(event: AnyEvent, ctx: MessageContext) -> None:
else:
complete.set_result(event)

with self._events:
self._events.subscribe_to_all_events(inner_on_event)
with event_bus:
event_bus.subscribe_to_all_events(inner_on_event)
self.start_task(WorkerTask(task_id=task_id))
return complete.result(timeout=timeout)

Expand Down Expand Up @@ -457,3 +441,10 @@ def get_python_env(
"""

return self._rest.get_python_environment(name=name, source=source)

def _event_bus(self) -> EventBusClient | None:
if not self._event_bus_client:
if stomp_config := self._rest.get_stomp_config():
self._event_bus_client = EventBusClient.from_stomp_config(stomp_config)

return self._event_bus_client
16 changes: 15 additions & 1 deletion src/blueapi/client/event_bus.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from collections.abc import Callable
from typing import Self

from bluesky_stomp.messaging import MessageContext, StompClient
from bluesky_stomp.messaging import Broker, MessageContext, StompClient
from bluesky_stomp.models import MessageTopic

from blueapi.config import StompConfig
from blueapi.core import DataEvent
from blueapi.worker import ProgressEvent, WorkerEvent

Expand Down Expand Up @@ -45,3 +47,15 @@ def subscribe_to_all_events(
raise BlueskyStreamingError(
"Unable to subscribe to messages from blueapi"
) from err

@classmethod
def from_stomp_config(cls, config: StompConfig) -> Self | None:
if config.enabled:
assert config.url.host is not None, "Stomp URL missing host"
assert config.url.port is not None, "Stomp URL missing port"
client = StompClient.for_broker(
broker=Broker(
host=config.url.host, port=config.url.port, auth=config.auth
)
)
return cls(client)
10 changes: 9 additions & 1 deletion src/blueapi/client/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
)
from pydantic import BaseModel, TypeAdapter, ValidationError

from blueapi.config import RestConfig
from blueapi.config import RestConfig, StompConfig
from blueapi.service.authentication import JWTAuth, SessionManager
from blueapi.service.model import (
DeviceModel,
Expand Down Expand Up @@ -230,6 +230,14 @@ def get_oidc_config(self) -> OIDCConfig | None:
# Server is not using authentication
return None

def get_stomp_config(self) -> StompConfig | None:
try:
return self._request_and_deserialize("/config/stomp", StompConfig)
except (NoContentError, KeyError):
# Older versions of the server may not have the endpoint implemented so
# treat 404s as no configuration.
return None

def get_python_environment(
self, name: str | None = None, source: SourceInfo | None = None
) -> PythonEnvironmentResponse:
Expand Down
21 changes: 14 additions & 7 deletions src/blueapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ValidationError,
field_validator,
)
from pydantic.json_schema import SkipJsonSchema

from blueapi.utils import BlueapiBaseModel, InvalidConfigError

Expand Down Expand Up @@ -100,7 +101,7 @@ class StompConfig(BlueapiBaseModel):
default=False,
)
url: TcpUrl = TcpUrl("tcp://localhost:61613")
auth: BasicAuthentication | None = Field(
auth: BasicAuthentication | SkipJsonSchema[None] = Field(
description="Auth information for communicating with STOMP broker, if required",
default=None,
)
Expand Down Expand Up @@ -275,7 +276,7 @@ class ApplicationConfig(BlueapiBaseModel):
"""

#: API version to publish in OpenAPI schema
REST_API_VERSION: ClassVar[str] = "1.1.3"
REST_API_VERSION: ClassVar[str] = "1.2.0"

LICENSE_INFO: ClassVar[dict[str, str]] = {
"name": "Apache 2.0",
Expand Down Expand Up @@ -358,19 +359,25 @@ def recursively_update_map(old: dict[str, Any], new: Mapping[str, Any]) -> None:

recursively_update_map(self._values, values)

def use_values_from_yaml(self, path: Path) -> None:
def use_values_from_yaml(self, *paths: Path) -> None:
"""
Use all values provided in a YAML/JSON file in the
Use all values provided in a YAML/JSON files in the
config, override any defaults and values set by
previous calls into this class.

Args:
path (Path): Path to YAML/JSON file
"""

with path.open("r") as stream:
values = yaml.load(stream, yaml.Loader)
self.use_values(values)
# Split reading and loading so that a missing file does not leave
# config in partially loaded state
configs = []
for path in paths:
with path.open("r") as stream:
configs.append(yaml.load(stream, yaml.Loader))

for values in configs:
self.use_values(values)

def load(self) -> C:
"""
Expand Down
4 changes: 4 additions & 0 deletions src/blueapi/service/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ def get_oidc_config() -> OIDCConfig | None:
return config().oidc


def get_stomp_config() -> StompConfig | None:
return config().stomp


def get_python_env(
name: str | None = None, source: SourceInfo | None = None
) -> PythonEnvironmentResponse:
Expand Down
Loading
Loading