Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,16 @@ classifiers = [
dependencies = [
"backports.entry_points_selectable",
"defusedxml", # For safely parsing XML files
"pydantic<2", # Locked to <2 by zocalo
"pydantic<2", # Locked to <2 by cygwin terminal
"requests",
"rich",
"werkzeug",
]
[project.optional-dependencies]
cicd = [
"pytest-cov", # Used by Azure Pipelines for PyTest coverage reports
"pytest-cov", # Used for generating PyTest coverage reports
]
client = [
"procrunner",
"textual==0.42.0",
"websocket-client",
"xmltodict",
Expand All @@ -53,8 +52,12 @@ developer = [
"pre-commit", # Formatting, linting, type checking, etc.
"pytest", # Test code functionality
]
instrument-server = [
"fastapi[standard]",
"python-jose[cryptography]",
"uvicorn[standard]",
]
server = [
# "matplotlib", # For visual statistical analysis of images
"aiohttp",
"cryptography",
"fastapi[standard]",
Expand Down
4 changes: 2 additions & 2 deletions src/murfey/cli/transfer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations

import argparse
import subprocess

Check warning on line 4 in src/murfey/cli/transfer.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/cli/transfer.py#L4

Added line #L4 was not covered by tests
from pathlib import Path
from urllib.parse import urlparse

import procrunner
import requests
from rich.console import Console
from rich.prompt import Confirm
Expand Down Expand Up @@ -76,6 +76,6 @@
cmd.extend(list(Path(args.source or ".").glob("*")))
cmd.append(f"{murfey_url.hostname}::{args.destination}")

result = procrunner.run(cmd)
result = subprocess.run(cmd)

Check warning on line 79 in src/murfey/cli/transfer.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/cli/transfer.py#L79

Added line #L79 was not covered by tests
if result.returncode:
console.print(f"[red]rsync failed returning code {result.returncode}")
17 changes: 2 additions & 15 deletions src/murfey/client/contexts/tomo.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from murfey.util import authorised_requests, capture_post, get_machine_config_client
from murfey.util.mdoc import get_block, get_global_data, get_num_blocks
from murfey.util.tomo import midpoint

logger = logging.getLogger("murfey.client.contexts.tomo")

Expand Down Expand Up @@ -64,20 +65,6 @@
return "_".join(split_name[:-5])


def _midpoint(angles: List[float]) -> int:
if not angles:
return 0
if len(angles) <= 2:
return round(angles[0])
sorted_angles = sorted(angles)
return round(
sorted_angles[len(sorted_angles) // 2]
if sorted_angles[len(sorted_angles) // 2]
and sorted_angles[len(sorted_angles) // 2 + 1]
else 0
)


class ProcessFileIncomplete(BaseModel):
dest: Path
source: Path
Expand Down Expand Up @@ -738,7 +725,7 @@
if environment
else None
)
mdoc_metadata["manual_tilt_offset"] = -_midpoint(
mdoc_metadata["manual_tilt_offset"] = -midpoint(

Check warning on line 728 in src/murfey/client/contexts/tomo.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/contexts/tomo.py#L728

Added line #L728 was not covered by tests
[float(b["TiltAngle"]) for b in blocks]
)
mdoc_metadata["source"] = str(self._basepath)
Expand Down
19 changes: 8 additions & 11 deletions src/murfey/instrument_server/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import secrets
import subprocess

Check warning on line 4 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L4

Added line #L4 was not covered by tests
import time
from datetime import datetime
from functools import partial
Expand All @@ -9,7 +10,6 @@
from typing import Annotated, Dict, List, Optional, Union
from urllib.parse import urlparse

import procrunner
import requests
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
Expand All @@ -21,7 +21,7 @@
from murfey.client.multigrid_control import MultigridController
from murfey.client.rsync import RSyncer
from murfey.client.watchdir_multigrid import MultigridDirWatcher
from murfey.util import sanitise_nonpath, secure_path
from murfey.util import sanitise, sanitise_nonpath, secure_path

Check warning on line 24 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L24

Added line #L24 was not covered by tests
from murfey.util.instrument_models import MultigridWatcherSpec
from murfey.util.models import File, Token

Expand Down Expand Up @@ -278,19 +278,16 @@

@router.post("/sessions/{session_id}/upload_gain_reference")
def upload_gain_reference(session_id: MurfeySessionID, gain_reference: GainReference):
safe_gain_path = sanitise(str(gain_reference.gain_path))
safe_visit_path = sanitise(gain_reference.visit_path)
safe_destination_dir = sanitise(gain_reference.gain_destination_dir)

Check warning on line 283 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L281-L283

Added lines #L281 - L283 were not covered by tests
cmd = [
"rsync",
str(gain_reference.gain_path),
f"{urlparse(_get_murfey_url(), allow_fragments=False).hostname}::{gain_reference.visit_path}/{gain_reference.gain_destination_dir}/{secure_filename(gain_reference.gain_path.name)}",
safe_gain_path,
f"{urlparse(_get_murfey_url(), allow_fragments=False).hostname}::{safe_visit_path}/{safe_destination_dir}/{secure_filename(gain_reference.gain_path.name)}",
]
gain_rsync = procrunner.run(cmd)
gain_rsync = subprocess.run(cmd)

Check warning on line 289 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L289

Added line #L289 was not covered by tests
if gain_rsync.returncode:
safe_gain_path = (
str(gain_reference.gain_path).replace("\r\n", "").replace("\n", "")
)
safe_visit_path = gain_reference.visit_path.replace("\r\n", "").replace(
"\n", ""
)
logger.warning(
f"Gain reference file {safe_gain_path} was not successfully transferred to {safe_visit_path}/processing"
)
Expand Down
4 changes: 2 additions & 2 deletions src/murfey/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import murfey.server.prometheus as prom
import murfey.server.websocket
import murfey.util.db as db
from murfey.client.contexts.tomo import _midpoint
from murfey.server.murfey_db import url # murfey_db
from murfey.util import LogFilter
from murfey.util.config import (
Expand All @@ -60,6 +59,7 @@
)
from murfey.util.processing_params import default_spa_parameters
from murfey.util.state import global_state
from murfey.util.tomo import midpoint

try:
from murfey.server.ispyb import TransportManager # Session
Expand Down Expand Up @@ -2576,7 +2576,7 @@
)
if not stack_file.parent.exists():
stack_file.parent.mkdir(parents=True)
tilt_offset = _midpoint([float(get_angle(t)) for t in tilts])
tilt_offset = midpoint([float(get_angle(t)) for t in tilts])

Check warning on line 2579 in src/murfey/server/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/__init__.py#L2579

Added line #L2579 was not covered by tests
zocalo_message = {
"recipes": ["em-tomo-align"],
"parameters": {
Expand Down
4 changes: 2 additions & 2 deletions src/murfey/server/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import murfey.server.websocket as ws
import murfey.util.eer
from murfey.server import (
_midpoint,
_murfey_id,
_transport_object,
check_tilt_series_mc,
Expand Down Expand Up @@ -108,6 +107,7 @@
)
from murfey.util.processing_params import default_spa_parameters
from murfey.util.state import global_state
from murfey.util.tomo import midpoint

log = logging.getLogger("murfey.server.api")

Expand Down Expand Up @@ -840,7 +840,7 @@
)
if not stack_file.parent.exists():
stack_file.parent.mkdir(parents=True)
tilt_offset = _midpoint([float(get_angle(t)) for t in tilts])
tilt_offset = midpoint([float(get_angle(t)) for t in tilts])

Check warning on line 843 in src/murfey/server/api/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/__init__.py#L843

Added line #L843 was not covered by tests
zocalo_message = {
"recipes": ["em-tomo-align"],
"parameters": {
Expand Down
90 changes: 42 additions & 48 deletions src/murfey/util/rsync.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from __future__ import annotations

import logging
import subprocess
from pathlib import Path
from typing import Callable, Dict, List, Optional, Tuple, Union

import procrunner

from murfey.util import Processor
from murfey.util.file_monitor import Monitor

Expand All @@ -32,7 +31,7 @@
self.received_bytes = 0
self.byte_rate: float = 0
self.total_size = 0
self.runner_return: List[procrunner.ReturnObject] = []
self.runner_return: List[subprocess.CompletedProcess] = []
self._root = root
self._sub_structure: Optional[Path] = None
self._notify = notify or (lambda f: None)
Expand All @@ -53,7 +52,7 @@
retry: bool = True,
):
"""
Run rsync -v on a list of files using procrunner.
Run rsync -v on a list of files using subprocess.

:param root: root path of files for transferring; structure below the root is preserved
:type root: pathlib.Path object
Expand Down Expand Up @@ -109,69 +108,64 @@
else:
cmd.append(str(self._finaldir / sub_struct) + "/")
self._transferring = True
runner = procrunner.run(
runner = subprocess.run(
cmd,
callback_stdout=self._parse_rsync_stdout,
callback_stderr=self._parse_rsync_stderr,
capture_output=True,
)
for line in runner.stdout.decode("utf-8", "replace").split("\n"):
self._parse_rsync_stdout(line)
for line in runner.stderr.decode("utf-8", "replace").split("\n"):
self._parse_rsync_stderr(line)
self.runner_return.append(runner)
self.failed.extend(root / sub_struct / f for f in self._failed_tmp)
if retry:
self._in.put(root / sub_struct / f for f in self._failed_tmp)

def _parse_rsync_stdout(self, stdout: bytes):
def _parse_rsync_stdout(self, line: str):
"""
Parse rsync stdout to collect information such as the paths of transferred
files and the amount of data transferred.

:param stdout: stdout of rsync process
:type stdout: bytes
"""
stringy_stdout = str(stdout)
if stringy_stdout:
if self._transferring:
if stringy_stdout.startswith("sent"):
self._transferring = False
byte_info = stringy_stdout.split()
self.sent_bytes = int(
byte_info[byte_info.index("sent") + 1].replace(",", "")
)
self.received_bytes = int(
byte_info[byte_info.index("received") + 1].replace(",", "")
)
self.byte_rate = float(
byte_info[byte_info.index("bytes/sec") - 1].replace(",", "")
)
elif len(stringy_stdout.split()) == 1:
if self._root and self._sub_structure:
self._notify(
self._finaldir / self._sub_structure / stringy_stdout
)
self._out.put(self._root / self._sub_structure / stringy_stdout)
else:
logger.warning(
f"root or substructure not set for transfer of {stringy_stdout}"
)
else:
if "total size" in stringy_stdout:
self.total_size = int(
stringy_stdout.replace("total size", "").split()[1]
if self._transferring:
if line.startswith("sent"):
self._transferring = False
byte_info = line.split()
self.sent_bytes = int(
byte_info[byte_info.index("sent") + 1].replace(",", "")
)
self.received_bytes = int(
byte_info[byte_info.index("received") + 1].replace(",", "")
)
self.byte_rate = float(
byte_info[byte_info.index("bytes/sec") - 1].replace(",", "")
)
elif len(line.split()) == 1:
if self._root and self._sub_structure:
self._notify(self._finaldir / self._sub_structure / line)
self._out.put(self._root / self._sub_structure / line)
else:
logger.warning(

Check warning on line 150 in src/murfey/util/rsync.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/util/rsync.py#L150

Added line #L150 was not covered by tests
f"root or substructure not set for transfer of {line}"
)
else:
if "total size" in line:
self.total_size = int(line.replace("total size", "").split()[1])

def _parse_rsync_stderr(self, stderr: bytes):
def _parse_rsync_stderr(self, line: str):
"""
Parse rsync stderr to collect information on any files that failed to transfer.

:param stderr: stderr of rsync process
:type stderr: bytes
"""
stringy_stderr = str(stderr)
if stringy_stderr:
if (
stringy_stderr.startswith("rsync: link_stat")
or stringy_stderr.startswith("rsync: [sender] link_stat")
) and "failed" in stringy_stderr:
failed_msg = stringy_stderr.split()
self._failed_tmp.append(
failed_msg[failed_msg.index("failed:") - 1].replace('"', "")
)
if (
line.startswith("rsync: link_stat")
or line.startswith("rsync: [sender] link_stat")
) and "failed" in line:
failed_msg = line.split()
self._failed_tmp.append(
failed_msg[failed_msg.index("failed:") - 1].replace('"', "")
)
16 changes: 16 additions & 0 deletions src/murfey/util/tomo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
def midpoint(angles: list[float]) -> int:
"""
Utility function to calculate the midpoint of the angles used in a tilt series.
Used primarily in the tomography workflow.
"""
if not angles:
return 0

Check warning on line 7 in src/murfey/util/tomo.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/util/tomo.py#L7

Added line #L7 was not covered by tests
if len(angles) <= 2:
return round(angles[0])
sorted_angles = sorted(angles)
return round(

Check warning on line 11 in src/murfey/util/tomo.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/util/tomo.py#L9-L11

Added lines #L9 - L11 were not covered by tests
sorted_angles[len(sorted_angles) // 2]
if sorted_angles[len(sorted_angles) // 2]
and sorted_angles[len(sorted_angles) // 2 + 1]
else 0
)
Loading