Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 34 additions & 30 deletions src/murfey/client/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import Type

from murfey.client.context import Context
from murfey.client.contexts.atlas import AtlasContext
from murfey.client.contexts.clem import CLEMContext
from murfey.client.contexts.spa import SPAModularContext
from murfey.client.contexts.spa_metadata import SPAMetadataContext
Expand Down Expand Up @@ -135,7 +136,7 @@ def _find_context(self, file_path: Path) -> bool:

# Tomography and SPA workflow checks
if "atlas" in file_path.parts:
self._context = SPAMetadataContext("epu", self._basepath)
self._context = AtlasContext("epu", self._basepath)
return True

if "Metadata" in file_path.parts or file_path.name == "EpuSession.dm":
Expand Down Expand Up @@ -266,7 +267,7 @@ def _analyse(self):
)
except Exception as e:
logger.error(f"Exception encountered: {e}")
if "atlas" not in transferred_file.parts:
if not isinstance(self._context, AtlasContext):
if not dc_metadata:
try:
dc_metadata = self._context.gather_metadata(
Expand Down Expand Up @@ -308,6 +309,10 @@ def _analyse(self):
)
self.post_transfer(transferred_file)

elif isinstance(self._context, AtlasContext):
logger.debug(f"File {transferred_file.name!r} is part of the atlas")
self.post_transfer(transferred_file)

# Handle files with tomography and SPA context differently
elif not self._extension or self._unseen_xml:
valid_extension = self._find_extension(transferred_file)
Expand All @@ -325,36 +330,35 @@ def _analyse(self):
)
except Exception as e:
logger.error(f"Exception encountered: {e}")
if "atlas" not in transferred_file.parts:
if not dc_metadata:
try:
dc_metadata = self._context.gather_metadata(
mdoc_for_reading
or self._xml_file(transferred_file),
environment=self._environment,
)
except KeyError as e:
logger.error(
f"Metadata gathering failed with a key error for key: {e.args[0]}"
)
raise e
if not dc_metadata or not self._force_mdoc_metadata:
mdoc_for_reading = None
self._unseen_xml.append(transferred_file)
if dc_metadata:
self._unseen_xml = []
if dc_metadata.get("file_extension"):
self._extension = dc_metadata["file_extension"]
else:
dc_metadata["file_extension"] = self._extension
dc_metadata["acquisition_software"] = (
self._context._acquisition_software
if not dc_metadata:
try:
dc_metadata = self._context.gather_metadata(
mdoc_for_reading
or self._xml_file(transferred_file),
environment=self._environment,
)
self.notify(
{
"form": dc_metadata,
}
except KeyError as e:
logger.error(
f"Metadata gathering failed with a key error for key: {e.args[0]}"
)
raise e
if not dc_metadata or not self._force_mdoc_metadata:
mdoc_for_reading = None
self._unseen_xml.append(transferred_file)
if dc_metadata:
self._unseen_xml = []
if dc_metadata.get("file_extension"):
self._extension = dc_metadata["file_extension"]
else:
dc_metadata["file_extension"] = self._extension
dc_metadata["acquisition_software"] = (
self._context._acquisition_software
)
self.notify(
{
"form": dc_metadata,
}
)
elif isinstance(
self._context,
(
Expand Down
52 changes: 52 additions & 0 deletions src/murfey/client/contexts/atlas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging
from pathlib import Path
from typing import Optional

import requests

from murfey.client.context import Context
from murfey.client.contexts.spa import _get_source
from murfey.client.contexts.spa_metadata import _atlas_destination
from murfey.client.instance_environment import MurfeyInstanceEnvironment
from murfey.util.api import url_path_for
from murfey.util.client import authorised_requests, capture_post

logger = logging.getLogger("murfey.client.contexts.atlas")

requests.get, requests.post, requests.put, requests.delete = authorised_requests()


class AtlasContext(Context):
def __init__(self, acquisition_software: str, basepath: Path):
super().__init__("Atlas", acquisition_software)
self._basepath = basepath

def post_transfer(
self,
transferred_file: Path,
environment: Optional[MurfeyInstanceEnvironment] = None,
**kwargs,
):
super().post_transfer(
transferred_file=transferred_file,
environment=environment,
**kwargs,
)

if (
environment
and "Atlas_" in transferred_file.stem
and transferred_file.suffix == ".mrc"
):
source = _get_source(transferred_file, environment)
if source:
transferred_atlas_name = _atlas_destination(
environment, source, transferred_file
) / transferred_file.relative_to(source.parent)
capture_post(
f"{str(environment.url.geturl())}{url_path_for('session_control.spa_router', 'make_atlas_jpg', session_id=environment.murfey_session)}",
json={"path": str(transferred_atlas_name)},
)
logger.info(
f"Submitted request to create JPG image of atlas {str(transferred_atlas_name)!r}"
)
4 changes: 2 additions & 2 deletions src/murfey/client/multigrid_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False):
requests.delete(remove_url)
else:
stop_url = f"{self.murfey_url}{url_path_for('session_control.router', 'register_stopped_rsyncer', session_id=self.session_id)}"
capture_post(stop_url, json={"source": str(source)})
capture_post(stop_url, json={"path": str(source)})

def _finalise_rsyncer(self, source: Path):
"""
Expand All @@ -312,7 +312,7 @@ def _finalise_rsyncer(self, source: Path):
def _restart_rsyncer(self, source: Path):
self.rsync_processes[source].restart()
restarted_url = f"{self.murfey_url}{url_path_for('session_control.router', 'register_restarted_rsyncer', session_id=self.session_id)}"
capture_post(restarted_url, json={"source": str(source)})
capture_post(restarted_url, json={"path": str(source)})

def _request_watcher_stop(self, source: Path):
self._environment.watchers[source]._stopping = True
Expand Down
26 changes: 20 additions & 6 deletions src/murfey/server/api/session_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
SearchMapParameters,
Visit,
)
from murfey.workflows.spa.atlas import atlas_jpg_from_mrc
from murfey.workflows.spa.flush_spa_preprocess import (
register_foil_hole as _register_foil_hole,
)
Expand Down Expand Up @@ -262,18 +263,18 @@ def get_rsyncers_for_session(session_id: MurfeySessionID, db=murfey_db):
return rsync_instances.all()


class RsyncerSource(BaseModel):
source: str
class StringOfPathModel(BaseModel):
path: str


@router.post("/sessions/{session_id}/rsyncer_stopped")
def register_stopped_rsyncer(
session_id: int, rsyncer_source: RsyncerSource, db=murfey_db
session_id: int, rsyncer_source: StringOfPathModel, db=murfey_db
):
rsyncer = db.exec(
select(RsyncInstance)
.where(RsyncInstance.session_id == session_id)
.where(RsyncInstance.source == rsyncer_source.source)
.where(RsyncInstance.source == rsyncer_source.path)
).one()
rsyncer.transferring = False
db.add(rsyncer)
Expand All @@ -282,12 +283,12 @@ def register_stopped_rsyncer(

@router.post("/sessions/{session_id}/rsyncer_started")
def register_restarted_rsyncer(
session_id: int, rsyncer_source: RsyncerSource, db=murfey_db
session_id: int, rsyncer_source: StringOfPathModel, db=murfey_db
):
rsyncer = db.exec(
select(RsyncInstance)
.where(RsyncInstance.session_id == session_id)
.where(RsyncInstance.source == rsyncer_source.source)
.where(RsyncInstance.source == rsyncer_source.path)
).one()
rsyncer.transferring = True
db.add(rsyncer)
Expand Down Expand Up @@ -347,6 +348,19 @@ def get_foil_hole(
return _get_foil_hole(session_id, fh_name, db)


@spa_router.post("/sessions/{session_id}/make_atlas_jpg")
def make_atlas_jpg(
session_id: MurfeySessionID, atlas_mrc: StringOfPathModel, db=murfey_db
):
logger.debug(
f"Received request to create JPG image of atlas {sanitise(atlas_mrc.path)!r}"
)
session = db.exec(select(Session).where(Session.id == session_id)).one()
return atlas_jpg_from_mrc(
session.instrument_name, session.visit, Path(atlas_mrc.path)
)


@spa_router.post("/sessions/{session_id}/grid_square/{gsid}")
def register_grid_square(
session_id: MurfeySessionID,
Expand Down
5 changes: 5 additions & 0 deletions src/murfey/util/route_manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,11 @@ murfey.server.api.session_control.spa_router:
type: int
methods:
- GET
- path: /session_control/spa/sessions/{session_id}/make_atlas_jpg
function: make_atlas_jpg
path_params: []
methods:
- POST
- path: /session_control/spa/sessions/{session_id}/grid_square/{gsid}
function: register_grid_square
path_params:
Expand Down
43 changes: 43 additions & 0 deletions src/murfey/workflows/spa/atlas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
from pathlib import Path

import mrcfile
import PIL.Image
from werkzeug.utils import secure_filename

from murfey.util import sanitise
from murfey.util.config import get_machine_config

logger = logging.getLogger("murfey.workflows.spa.atlas")


def atlas_jpg_from_mrc(instrument_name: str, visit_name: str, atlas_mrc: Path):
logger.debug(
f"Starting workflow to create JPG image of atlas {sanitise(str(atlas_mrc))!r}"
)
with mrcfile.open(atlas_mrc) as mrc:
data = mrc.data

machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]

parts = [secure_filename(p) for p in atlas_mrc.parts]
visit_idx = parts.index(visit_name)
core = Path("/".join(parts[: visit_idx + 1]))
sample_id = "Sample"
for p in parts:
if "Sample" in p:
sample_id = p
break
atlas_jpg_file = (
core
/ machine_config.processed_directory_name
/ "atlas"
/ secure_filename(f"{sample_id}_{atlas_mrc.stem}_fullres.jpg")
)
atlas_jpg_file.parent.mkdir(parents=True, exist_ok=True)

im = PIL.Image.fromarray(data)
im.convert(mode="L").save(atlas_jpg_file)
logger.debug(f"JPG image of atlas saved as {str(atlas_jpg_file)!r}")
3 changes: 2 additions & 1 deletion tests/client/test_analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

from murfey.client.analyser import Analyser
from murfey.client.contexts.atlas import AtlasContext
from murfey.client.contexts.clem import CLEMContext
from murfey.client.contexts.spa import SPAModularContext
from murfey.client.contexts.spa_metadata import SPAMetadataContext
Expand All @@ -28,7 +29,7 @@
["visit/FoilHole_01234_fractions.tiff", SPAModularContext],
["visit/FoilHole_01234_EER.eer", SPAModularContext],
# SPA metadata
["atlas/atlas.mrc", SPAMetadataContext],
["atlas/atlas.mrc", AtlasContext],
["visit/EpuSession.dm", SPAMetadataContext],
["visit/Metadata/GridSquare.dm", SPAMetadataContext],
# CLEM LIF file
Expand Down
76 changes: 76 additions & 0 deletions tests/server/api/test_session_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from pathlib import Path
from unittest import mock
from unittest.mock import MagicMock

from fastapi import FastAPI
from fastapi.testclient import TestClient
from pytest_mock import MockerFixture

from murfey.server.api.auth import (
validate_instrument_server_session_access,
validate_instrument_token,
)
from murfey.server.api.session_control import spa_router
from murfey.server.murfey_db import murfey_db_session
from murfey.util.api import url_path_for


def test_make_atlas_jpg(mocker: MockerFixture, tmp_path: Path):
# Set up the objects to mock
instrument_name = "test"
visit_name = "test_visit"
session_id = 1

# Override the database session generator
mock_session = MagicMock()
mock_session.instrument_name = instrument_name
mock_session.visit = visit_name
mock_query_result = MagicMock()
mock_query_result.one.return_value = mock_session
mock_db_session = MagicMock()
mock_db_session.exec.return_value = mock_query_result

def mock_get_db_session():
yield mock_db_session

# Mock the instrument server tokens dictionary
mock_tokens = mocker.patch(
"murfey.server.api.instrument.instrument_server_tokens",
{session_id: {"access_token": mock.sentinel}},
)

# Mock the called workflow function
mock_atlas_jpg = mocker.patch(
"murfey.server.api.session_control.atlas_jpg_from_mrc",
return_value=None,
)

# Set up the test file
image_dir = tmp_path / instrument_name / "data" / visit_name / "Atlas"
image_dir.mkdir(parents=True, exist_ok=True)
test_file = image_dir / "Atlas1.mrc"

# Set up the backend server
backend_app = FastAPI()

# Override validation and database dependencies
backend_app.dependency_overrides[validate_instrument_token] = lambda: None
backend_app.dependency_overrides[validate_instrument_server_session_access] = (
lambda: session_id
)
backend_app.dependency_overrides[murfey_db_session] = mock_get_db_session
backend_app.include_router(spa_router)
backend_server = TestClient(backend_app)

atlas_jpg_url = url_path_for(
"api.session_control.spa_router", "make_atlas_jpg", session_id=session_id
)
response = backend_server.post(
atlas_jpg_url,
json={"path": str(test_file)},
headers={"Authorization": f"Bearer {mock_tokens[session_id]['access_token']}"},
)

# Check that the expected calls were made
mock_atlas_jpg.assert_called_once_with(instrument_name, visit_name, test_file)
assert response.status_code == 200
Loading