Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions src/murfey/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
import webbrowser
from datetime import datetime
from pathlib import Path
from pprint import pprint
from queue import Queue
from typing import List, Literal
from typing import Literal
from urllib.parse import ParseResult, urlparse

import requests
Expand Down Expand Up @@ -40,7 +41,7 @@


def main_loop(
source_watchers: List[murfey.client.watchdir.DirWatcher],
source_watchers: list[murfey.client.watchdir.DirWatcher],
appearance_time: float,
transfer_all: bool,
):
Expand Down Expand Up @@ -91,6 +92,7 @@


def run():
# Load client config and server information
config = read_config()
instrument_name = config["Murfey"]["instrument_name"]
try:
Expand All @@ -109,6 +111,7 @@
else:
known_server = config["Murfey"].get("server")

# Set up argument parser with dynamic defaults based on client config
parser = argparse.ArgumentParser(description="Start the Murfey client")
parser.add_argument(
"--server",
Expand Down Expand Up @@ -194,23 +197,23 @@
default=False,
help="Do not trigger processing for any data directories currently on disk (you may have started processing for them in a previous murfey run)",
)

args = parser.parse_args()

# Logic to exit early based on parsed args
if not args.server:
exit("Murfey server not set. Please run with --server host:port")
if not args.server.startswith(("http://", "https://")):
if "://" in args.server:
exit("Unknown server protocol. Only http:// and https:// are allowed")
args.server = f"http://{args.server}"

if args.remove_files:
remove_prompt = Confirm.ask(
f"Are you sure you want to remove files from {args.source or Path('.').absolute()}?"
)
if not remove_prompt:
exit("Exiting")

# If a new server URL is provided, save info to config file
murfey_url = urlparse(args.server, allow_fragments=False)
if args.server != known_server:
# New server specified. Verify that it is real
Expand All @@ -232,8 +235,7 @@
if args.no_transfer:
log.info("No files will be transferred as --no-transfer flag was specified")

from pprint import pprint

# Check ISPyB (if set up) for ongoing visits
ongoing_visits = []
if args.visit:
ongoing_visits = [args.visit]
Expand All @@ -250,35 +252,38 @@

_enable_webbrowser_in_cygwin()

# Set up additional log handlers
log.setLevel(logging.DEBUG)
log_queue = Queue()
input_queue = Queue()

# rich_handler = DirectableRichHandler(log_queue, enable_link_path=False)
# Rich-based console handler
rich_handler = DirectableRichHandler(enable_link_path=False)
rich_handler.setLevel(logging.DEBUG if args.debug else logging.INFO)

# Set up websocket app and handler
client_id = requests.get(f"{murfey_url.geturl()}/new_client_id/").json()
ws = murfey.client.websocket.WSApp(
server=args.server,
id=client_id["new_id"],
)
ws_handler = CustomHandler(ws.send)

Check warning on line 270 in src/murfey/client/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/__init__.py#L270

Added line #L270 was not covered by tests

# Add additional handlers and set logging levels
logging.getLogger().addHandler(rich_handler)
handler = CustomHandler(ws.send)
logging.getLogger().addHandler(handler)
logging.getLogger().addHandler(ws_handler)

Check warning on line 274 in src/murfey/client/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/__init__.py#L274

Added line #L274 was not covered by tests
logging.getLogger("murfey").setLevel(logging.INFO)
logging.getLogger("websocket").setLevel(logging.WARNING)

log.info("Starting Websocket connection")

status_bar = StatusBar()

# Load machine data for subsequent sections
machine_data = requests.get(
f"{murfey_url.geturl()}/instruments/{instrument_name}/machine"
).json()
gain_ref: Path | None = None

# Set up Murfey environment instance and map it to websocket app
instance_environment = MurfeyInstanceEnvironment(
url=murfey_url,
client_id=ws.id,
Expand All @@ -295,9 +300,10 @@
else ""
),
)

ws.environment = instance_environment

# Set up and run Murfey TUI app
status_bar = StatusBar()

Check warning on line 306 in src/murfey/client/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/__init__.py#L306

Added line #L306 was not covered by tests
rich_handler.redirect = True
app = MurfeyTUI(
environment=instance_environment,
Expand Down
12 changes: 9 additions & 3 deletions src/murfey/client/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
backoff = 0
while True:
attempt_start = time.perf_counter()
connection_failure = self._ws.run_forever()
connection_failure = self._ws.run_forever(ping_interval=30, ping_timeout=10)

Check warning on line 90 in src/murfey/client/websocket.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/websocket.py#L90

Added line #L90 was not covered by tests
if not connection_failure:
break
if (time.perf_counter() - attempt_start) < 5:
Expand All @@ -108,7 +108,10 @@
continue
while not self._ready:
time.sleep(0.3)
self._ws.send(element)
try:
self._ws.send(element)
except Exception:
log.error("Error sending message through websocket", exc_info=True)

Check warning on line 114 in src/murfey/client/websocket.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/websocket.py#L111-L114

Added lines #L111 - L114 were not covered by tests
self._send_queue.task_done()
log.debug("Websocket send-queue-feeder thread stopped")

Expand All @@ -135,7 +138,10 @@
self._send_queue.put(None)
self._feeder_thread.join()
self._receiver_thread.join()
self._ws.close()
try:
self._ws.close()
except Exception:
log.error("Error closing websocket connection", exc_info=True)

Check warning on line 144 in src/murfey/client/websocket.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/websocket.py#L141-L144

Added lines #L141 - L144 were not covered by tests

def on_message(self, ws: websocket.WebSocketApp, message: str):
self._receive_queue.put(message)
Expand Down
31 changes: 18 additions & 13 deletions src/murfey/server/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import json
import logging
from datetime import datetime
from typing import Any, Dict, TypeVar, Union
from typing import Any, TypeVar, Union

from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from sqlmodel import select
from sqlmodel import Session, select

import murfey.server.prometheus as prom
from murfey.server.murfey_db import get_murfey_db_session
Expand All @@ -22,10 +22,13 @@

class ConnectionManager:
def __init__(self):
self.active_connections: Dict[int | str, WebSocket] = {}
self.active_connections: dict[int | str, WebSocket] = {}

async def connect(
self, websocket: WebSocket, client_id: int | str, register_client: bool = True
self,
websocket: WebSocket,
client_id: Union[int, str],
register_client: bool = True,
):
await websocket.accept()
self.active_connections[client_id] = websocket
Expand All @@ -38,16 +41,17 @@

@staticmethod
def _register_new_client(client_id: int):
log.debug(f"Registering new client with ID {client_id}")

Check warning on line 44 in src/murfey/server/websocket.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/websocket.py#L44

Added line #L44 was not covered by tests
new_client = ClientEnvironment(client_id=client_id, connected=True)
murfey_db = next(get_murfey_db_session())
murfey_db: Session = next(get_murfey_db_session())

Check warning on line 46 in src/murfey/server/websocket.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/websocket.py#L46

Added line #L46 was not covered by tests
murfey_db.add(new_client)
murfey_db.commit()
murfey_db.close()

def disconnect(self, client_id: int | str, unregister_client: bool = True):
def disconnect(self, client_id: Union[int, str], unregister_client: bool = True):
self.active_connections.pop(client_id)
if unregister_client:
murfey_db = next(get_murfey_db_session())
murfey_db: Session = next(get_murfey_db_session())

Check warning on line 54 in src/murfey/server/websocket.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/websocket.py#L54

Added line #L54 was not covered by tests
client_env = murfey_db.exec(
select(ClientEnvironment).where(
ClientEnvironment.client_id == client_id
Expand All @@ -73,7 +77,7 @@
while True:
data = await websocket.receive_text()
try:
json_data = json.loads(data)
json_data: dict = json.loads(data)

Check warning on line 80 in src/murfey/server/websocket.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/websocket.py#L80

Added line #L80 was not covered by tests
if json_data["type"] == "log": # and isinstance(json_data, dict)
json_data.pop("type")
await forward_log(json_data, websocket)
Expand All @@ -92,15 +96,16 @@

@ws.websocket("/connect/{client_id}")
async def websocket_connection_endpoint(
websocket: WebSocket, client_id: Union[int, str]
websocket: WebSocket,
client_id: Union[int, str],
):
await manager.connect(websocket, client_id, register_client=False)
await manager.broadcast(f"Client {client_id} joined")
try:
while True:
data = await websocket.receive_text()
try:
json_data = json.loads(data)
json_data: dict = json.loads(data)

Check warning on line 108 in src/murfey/server/websocket.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/websocket.py#L108

Added line #L108 was not covered by tests
if json_data.get("type") == "log": # and isinstance(json_data, dict)
json_data.pop("type")
await forward_log(json_data, websocket)
Expand All @@ -115,12 +120,12 @@
await manager.broadcast(f"Client #{client_id} disconnected")


async def check_connections(active_connections):
async def check_connections(active_connections: list[WebSocket]):
log.info("Checking connections")
for connection in active_connections:
log.info("Checking response")
try:
await asyncio.wait_for(connection.receive(), timeout=6)
await asyncio.wait_for(connection.receive(), timeout=10)

Check warning on line 128 in src/murfey/server/websocket.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/websocket.py#L128

Added line #L128 was not covered by tests
except asyncio.TimeoutError:
log.info(f"Disconnecting Client {connection[0]}")
manager.disconnect(connection[0], connection[1])
Expand All @@ -139,7 +144,7 @@

@ws.delete("/test/{client_id}")
async def close_ws_connection(client_id: int):
murfey_db = next(get_murfey_db_session())
murfey_db: Session = next(get_murfey_db_session())

Check warning on line 147 in src/murfey/server/websocket.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/websocket.py#L147

Added line #L147 was not covered by tests
client_env = murfey_db.exec(
select(ClientEnvironment).where(ClientEnvironment.client_id == client_id)
).one()
Expand Down