Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/murfey/instrument_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,30 @@
return {"success": True}


class RSyncerInfo(BaseModel):
source: str
num_files_transferred: int
num_files_in_queue: int
alive: bool
stopping: bool

Check warning on line 227 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L222-L227

Added lines #L222 - L227 were not covered by tests


@router.get("/sessions/{session_id}/rsyncer_info")
def get_rsyncer_info(session_id: MurfeySessionID) -> list[RSyncerInfo]:
info = []

Check warning on line 232 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L230-L232

Added lines #L230 - L232 were not covered by tests
for k, v in controllers[session_id].rsync_processes.items():
info.append(

Check warning on line 234 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L234

Added line #L234 was not covered by tests
RSyncerInfo(
source=str(k),
num_files_transferred=v._files_transferred,
num_files_in_queue=v.queue.qsize(),
alive=v.thread.is_alive(),
stopping=v._stopping,
)
)
return info

Check warning on line 243 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L243

Added line #L243 was not covered by tests


class ProcessingParameters(BaseModel):
gain_ref: str
dose_per_frame: Optional[float] = None
Expand Down
66 changes: 65 additions & 1 deletion src/murfey/server/api/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from murfey.server.murfey_db import murfey_db
from murfey.util import secure_path
from murfey.util.config import get_machine_config
from murfey.util.db import Session, SessionProcessingParameters
from murfey.util.db import RsyncInstance, Session, SessionProcessingParameters
from murfey.util.models import File, MultigridWatcherSetup

# Create APIRouter class object
Expand Down Expand Up @@ -407,3 +407,67 @@
) as resp:
data = await resp.json()
return data


class RSyncerInfo(BaseModel):
source: str
num_files_transferred: int
num_files_in_queue: int
alive: bool
stopping: bool
destination: str
tag: str
files_transferred: int
files_counted: int
transferring: bool
session_id: int


@router.get("/instruments/{instrument_name}/sessions/{session_id}/rsyncer_info")
async def get_rsyncer_info(
instrument_name: str, session_id: MurfeySessionID, db=murfey_db
) -> List[RSyncerInfo]:
data = []
machine_config = get_machine_config(instrument_name=instrument_name)[

Check warning on line 431 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L430-L431

Added lines #L430 - L431 were not covered by tests
instrument_name
]
rsync_instances = db.exec(

Check warning on line 434 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L434

Added line #L434 was not covered by tests
select(RsyncInstance).where(RsyncInstance.session_id == session_id)
).all()
if machine_config.instrument_server_url:
try:
async with lock:
token = instrument_server_tokens[session_id]["access_token"]
async with aiohttp.ClientSession() as clientsession:
async with clientsession.get(

Check warning on line 442 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L438-L442

Added lines #L438 - L442 were not covered by tests
f"{machine_config.instrument_server_url}/sessions/{session_id}/rsyncer_info",
headers={"Authorization": f"Bearer {token}"},
) as resp:
data = await resp.json()
except KeyError:
data = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a debug/warning log, in case it's not performing what we expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case this is to catch the case where the instrument server has disconnected. Then I want data to be [] so it can be handled below and lead to a display in the web UI that indicates the rsyncers aren't running. I can add one to catch other exceptions

except Exception:
log.warning(

Check warning on line 450 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L446-L450

Added lines #L446 - L450 were not covered by tests
"Exception encountered gathering rsyncer info from the instrument server",
exc_info=True,
)
combined_data = []
data_source_lookup = {d["source"]: d for d in data}

Check warning on line 455 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L454-L455

Added lines #L454 - L455 were not covered by tests
for ri in rsync_instances:
d = data_source_lookup.get(ri.source, {})
combined_data.append(

Check warning on line 458 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L457-L458

Added lines #L457 - L458 were not covered by tests
RSyncerInfo(
source=ri.source,
num_files_transferred=d.get("num_files_transferred", 0),
num_files_in_queue=d.get("num_files_in_queue", 0),
alive=d.get("alive", False),
stopping=d.get("stopping", True),
destination=ri.destination,
tag=ri.tag,
files_transferred=ri.files_transferred,
files_counted=ri.files_counted,
transferring=ri.transferring,
session_id=session_id,
)
)
return combined_data

Check warning on line 473 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L473

Added line #L473 was not covered by tests