Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions src/murfey/client/multigrid_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,18 @@
register_client=False,
)

# Calculate the time offset between the client and the server
current_time = datetime.now()
server_timestamp = requests.get(

Check warning on line 109 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L108-L109

Added lines #L108 - L109 were not covered by tests
f"{self.murfey_url}{url_path_for('session_control.router', 'get_current_timestamp')}"
).json()["timestamp"]
self.server_time_offset = current_time - datetime.fromtimestamp(

Check warning on line 112 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L112

Added line #L112 was not covered by tests
server_timestamp
)

# Store the visit end time in the current device's equivalent time
if self.visit_end_time:
current_time = datetime.now()
server_timestamp = requests.get(
f"{self.murfey_url}{url_path_for('session_control.router', 'get_current_timestamp')}"
).json()["timestamp"]
self.visit_end_time += current_time - datetime.fromtimestamp(
server_timestamp
)
self.visit_end_time += self.server_time_offset

Check warning on line 118 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L118

Added line #L118 was not covered by tests

def _multigrid_watcher_finalised(self):
self.multigrid_watcher_active = False
Expand Down Expand Up @@ -153,9 +157,10 @@
self._finalise_rsyncer(p)

def update_visit_time(self, new_end_time: datetime):
self.visit_end_time = new_end_time
# Convert the received server timestamp into the local equivalent
self.visit_end_time = new_end_time + self.server_time_offset

Check warning on line 161 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L161

Added line #L161 was not covered by tests
for rp in self.rsync_processes.values():
rp._end_time = new_end_time
rp._end_time = self.visit_end_time

Check warning on line 163 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L163

Added line #L163 was not covered by tests

def _start_rsyncer_multigrid(
self,
Expand Down
5 changes: 3 additions & 2 deletions src/murfey/client/rsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@
self.queue.put(absolute_path)

def flush_skipped(self):
self._end_time = datetime.now()
for f in self._skipped_files:
self.queue.put(f)
self._skipped_files = []
Expand Down Expand Up @@ -561,7 +560,9 @@
success = False

if result is None:
logger.error(f"No rsync process ran for files: {files}")
# Only log this as an error if files were scheduled for transfer
if files:
logger.error(f"No rsync process ran for files: {files}")

Check warning on line 565 in src/murfey/client/rsync.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/rsync.py#L565

Added line #L565 was not covered by tests
else:
logger.log(
logging.WARNING if result.returncode else logging.DEBUG,
Expand Down
3 changes: 2 additions & 1 deletion src/murfey/instrument_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,18 +212,19 @@
@router.delete("/sessions/{session_id}/multigrid_watcher/{label}")
def stop_multigrid_watcher(session_id: MurfeySessionID, label: str):
watchers[label].request_stop()
return {"success": True}

Check warning on line 215 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L215

Added line #L215 was not covered by tests


@router.post("/sessions/{session_id}/multigrid_controller/visit_end_time")
def update_multigrid_controller_visit_end_time(
session_id: MurfeySessionID, end_time: datetime
):
controllers[session_id].update_visit_time(end_time)
return {"success": True}

Check warning on line 223 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L223

Added line #L223 was not covered by tests


class RsyncerSource(BaseModel):
source: Path
label: str


@router.post("/sessions/{session_id}/stop_rsyncer")
Expand Down
42 changes: 31 additions & 11 deletions src/murfey/server/api/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import asyncio
import datetime
import logging
import urllib
from pathlib import Path
from typing import Annotated, List, Optional
from urllib.parse import quote

import aiohttp
from fastapi import APIRouter, Depends
Expand Down Expand Up @@ -340,7 +340,6 @@
async with clientsession.post(
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'stop_rsyncer', session_id=session_id)}",
json={
"label": session_id,
"source": str(secure_path(Path(rsyncer_source.source))),
},
headers={
Expand All @@ -367,7 +366,6 @@
async with clientsession.post(
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'finalise_rsyncer', session_id=session_id)}",
json={
"label": session_id,
"source": str(secure_path(Path(rsyncer_source.source))),
},
headers={
Expand Down Expand Up @@ -420,7 +418,7 @@
if machine_config.instrument_server_url:
async with aiohttp.ClientSession() as clientsession:
async with clientsession.post(
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={urllib.parse.quote(end_time.isoformat())}",
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={quote(end_time.isoformat())}",
headers={
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
},
Expand Down Expand Up @@ -467,7 +465,6 @@
async with clientsession.post(
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'remove_rsyncer', session_id=session_id)}",
json={
"label": session_id,
"source": str(secure_path(Path(rsyncer_source.source))),
},
headers={
Expand Down Expand Up @@ -495,7 +492,6 @@
async with clientsession.post(
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'restart_rsyncer', session_id=session_id)}",
json={
"label": session_id,
"source": str(secure_path(Path(rsyncer_source.source))),
},
headers={
Expand All @@ -510,20 +506,44 @@
async def flush_skipped_rsyncer(
session_id: MurfeySessionID, rsyncer_source: RsyncerSource, db=murfey_db
):
data = {}
instrument_name = (
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
)
# Load data for session
session_entry = db.exec(select(Session).where(Session.id == session_id)).one()
instrument_name = session_entry.instrument_name

Check warning on line 511 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L510-L511

Added lines #L510 - L511 were not covered by tests

# Define a new visit end time that's slightly ahead of current time
new_end_time = datetime.datetime.now().replace(

Check warning on line 514 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L514

Added line #L514 was not covered by tests
second=0, microsecond=0
) + datetime.timedelta(minutes=5)
# Update the stored visit end time if the new one exceeds it
if session_entry.visit_end_time:
if new_end_time > session_entry.visit_end_time:
session_entry.visit_end_time = new_end_time
db.add(session_entry)
db.commit()

Check warning on line 522 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L520-L522

Added lines #L520 - L522 were not covered by tests

# Send request to flush rsyncer
data: dict = {}
update_result: dict = {}

Check warning on line 526 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L525-L526

Added lines #L525 - L526 were not covered by tests
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
if isinstance(session_id, int):
if machine_config.instrument_server_url:
async with aiohttp.ClientSession() as clientsession:
# Send request to instrument server to update multigrid controller
async with clientsession.post(

Check warning on line 534 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L534

Added line #L534 was not covered by tests
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={quote(session_entry.visit_end_time.isoformat())}",
headers={
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
},
) as resp:
update_result = await resp.json()

Check warning on line 540 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L540

Added line #L540 was not covered by tests
if not update_result.get("success", False):
return {"success": False}

Check warning on line 542 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L542

Added line #L542 was not covered by tests
# Send request to flush the rsyncer
async with clientsession.post(
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'flush_skipped_rsyncer', session_id=session_id)}",
json={
"label": session_id,
"source": str(secure_path(Path(rsyncer_source.source))),
},
headers={
Expand Down