Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/murfey/client/contexts/clem.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"""

import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, Generator, List, Optional
from urllib.parse import quote
Expand All @@ -31,13 +30,16 @@
instrument_name=environment.instrument_name,
demo=environment.demo,
)
# rsync basepath and modules are set in the microscope's configuration YAML file
return (
Path(machine_config.get("rsync_basepath", ""))
/ str(datetime.now().year)
/ source.name
/ file_path.relative_to(source)

# Construct destination path
base_destination = Path(machine_config.get("rsync_basepath", "")) / Path(

Check warning on line 35 in src/murfey/client/contexts/clem.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/contexts/clem.py#L35

Added line #L35 was not covered by tests
environment.default_destinations[source]
)
# Add the visit number to the path if it's not present in 'source'
if environment.visit not in environment.default_destinations[source]:
base_destination = base_destination / environment.visit
destination = base_destination / file_path.relative_to(source)
return destination

Check warning on line 42 in src/murfey/client/contexts/clem.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/contexts/clem.py#L40-L42

Added lines #L40 - L42 were not covered by tests


def _get_source(
Expand Down Expand Up @@ -291,7 +293,7 @@
post_result = self.process_tiff_series(tiff_dataset, environment)
if post_result is False:
return False

logger.info(f"Started preprocessing of TIFF series {series_name}")

Check warning on line 296 in src/murfey/client/contexts/clem.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/contexts/clem.py#L296

Added line #L296 was not covered by tests
else:
logger.debug(f"TIFF series {series_name!r} is still being processed")

Expand Down
17 changes: 11 additions & 6 deletions src/murfey/instrument_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,16 @@
def setup_multigrid_watcher(
session_id: MurfeySessionID, watcher_spec: MultigridWatcherSpec
):
# Return True if controllers are already set up
if controllers.get(session_id) is not None:
return {"success": True}

# Load machine config as dictionary
machine_config: dict[str, Any] = requests.get(

Check warning on line 149 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L149

Added line #L149 was not covered by tests
f"{_get_murfey_url()}/instruments/{sanitise_nonpath(watcher_spec.instrument_name)}/machine",
headers={"Authorization": f"Bearer {tokens[session_id]}"},
).json()

label = watcher_spec.label
for sid, controller in controllers.items():
if controller.dormant:
Expand All @@ -156,22 +164,19 @@
demo=True,
do_transfer=True,
processing_enabled=not watcher_spec.skip_existing_processing,
_machine_config=watcher_spec.configuration.dict(),
_machine_config=machine_config,
token=tokens.get(session_id, "token"),
data_collection_parameters=data_collection_parameters.get(label, {}),
rsync_restarts=watcher_spec.rsync_restarts,
visit_end_time=watcher_spec.visit_end_time,
)
watcher_spec.source.mkdir(exist_ok=True)
machine_config = requests.get(
f"{_get_murfey_url()}/instruments/{sanitise_nonpath(watcher_spec.instrument_name)}/machine",
headers={"Authorization": f"Bearer {tokens[session_id]}"},
).json()

for d in machine_config.get("create_directories", []):
(watcher_spec.source / d).mkdir(exist_ok=True)
watchers[session_id] = MultigridDirWatcher(
watcher_spec.source,
watcher_spec.configuration.dict(),
machine_config,
skip_existing_processing=watcher_spec.skip_existing_processing,
)
watchers[session_id].subscribe(
Expand Down
11 changes: 1 addition & 10 deletions src/murfey/server/api/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,13 @@ async def setup_multigrid_watcher(
if machine_config.instrument_server_url:
session = db.exec(select(Session).where(Session.id == session_id)).one()
visit = session.visit
_config = {
"acquisition_software": machine_config.acquisition_software,
"calibrations": machine_config.calibrations,
"data_directories": [str(k) for k in machine_config.data_directories],
"create_directories": [str(k) for k in machine_config.create_directories],
"rsync_basepath": str(machine_config.rsync_basepath),
"visit": visit,
"default_model": str(machine_config.default_model),
}

async with aiohttp.ClientSession() as clientsession:
async with clientsession.post(
f"{machine_config.instrument_server_url}/sessions/{session_id}/multigrid_watcher",
json={
"source": str(secure_path(watcher_spec.source / visit)),
"visit": visit,
"configuration": _config,
"label": visit,
"instrument_name": instrument_name,
"skip_existing_processing": watcher_spec.skip_existing_processing,
Expand Down
131 changes: 88 additions & 43 deletions src/murfey/util/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,62 @@
import socket
from functools import lru_cache
from pathlib import Path
from typing import Dict, List, Literal, Optional, Union
from typing import Literal, Optional, Union

import yaml
from backports.entry_points_selectable import entry_points
from pydantic import BaseModel, BaseSettings, Extra, validator


class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore
acquisition_software: List[str]
calibrations: Dict[str, Dict[str, Union[dict, float]]]
data_directories: List[Path]
rsync_basepath: Path
default_model: Path
class MachineConfig(BaseModel): # type: ignore
"""
Keys that describe the type of workflow conducted on the client side, and how
Murfey will handle its data transfer and processing
"""

# General info --------------------------------------------------------------------
display_name: str = ""
instrument_name: str = ""
image_path: Optional[Path] = None
software_versions: Dict[str, str] = {}
external_executables: Dict[str, str] = {}
external_executables_eer: Dict[str, str] = {}
external_environment: Dict[str, str] = {}
rsync_module: str = ""
machine_override: str = ""

# Hardware and software -----------------------------------------------------------
camera: str = "FALCON"
superres: bool = False
calibrations: dict[str, dict[str, Union[dict, float]]]
acquisition_software: list[str]
software_versions: dict[str, str] = {}
software_settings_output_directories: dict[str, list[str]] = {}
data_required_substrings: dict[str, dict[str, list[str]]] = {}

# Client side directory setup -----------------------------------------------------
data_directories: list[Path]
create_directories: list[str] = ["atlas"]
analyse_created_directories: List[str] = []
analyse_created_directories: list[str] = []
gain_reference_directory: Optional[Path] = None
eer_fractionation_file_template: str = ""
processed_directory_name: str = "processed"
gain_directory_name: str = "processing"
node_creator_queue: str = "node_creator"
superres: bool = False
camera: str = "FALCON"
data_required_substrings: Dict[str, Dict[str, List[str]]] = {}
allow_removal: bool = False

# Data transfer setup -------------------------------------------------------------
# Rsync setup
data_transfer_enabled: bool = True
rsync_url: str = ""
rsync_module: str = ""
rsync_basepath: Path
allow_removal: bool = False

# Upstream data download setup
upstream_data_directories: list[Path] = [] # Previous sessions
upstream_data_download_directory: Optional[Path] = None # Set by microscope config
upstream_data_tiff_locations: list[str] = ["processed"] # Location of CLEM TIFFs

# Data processing setup -----------------------------------------------------------
# General processing setup
processing_enabled: bool = True
machine_override: str = ""
processed_extra_directory: str = ""
plugin_packages: Dict[str, Path] = {}
software_settings_output_directories: Dict[str, List[str]] = {}
process_by_default: bool = True
recipes: Dict[str, str] = {
gain_directory_name: str = "processing"
processed_directory_name: str = "processed"
processed_extra_directory: str = ""
recipes: dict[str, str] = {
"em-spa-bfactor": "em-spa-bfactor",
"em-spa-class2d": "em-spa-class2d",
"em-spa-class3d": "em-spa-class3d",
Expand All @@ -53,26 +69,41 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore
"em-tomo-align": "em-tomo-align",
}

# Find and download upstream directories
upstream_data_directories: List[Path] = [] # Previous sessions
upstream_data_download_directory: Optional[Path] = None # Set by microscope config
upstream_data_tiff_locations: List[str] = ["processed"] # Location of CLEM TIFFs

# Particle picking setup
default_model: Path
model_search_directory: str = "processing"
initial_model_search_directory: str = "processing/initial_model"

failure_queue: str = ""
instrument_server_url: str = "http://localhost:8001"
frontend_url: str = "http://localhost:3000"
murfey_url: str = "http://localhost:8000"
rsync_url: str = ""
# Data analysis plugins
external_executables: dict[str, str] = {}
external_executables_eer: dict[str, str] = {}
external_environment: dict[str, str] = {}
plugin_packages: dict[str, Path] = {}

# Server and network setup --------------------------------------------------------
# Configurations and URLs
security_configuration_path: Optional[Path] = None
murfey_url: str = "http://localhost:8000"
frontend_url: str = "http://localhost:3000"
instrument_server_url: str = "http://localhost:8001"

# Messaging queues
failure_queue: str = ""
node_creator_queue: str = "node_creator"
notifications_queue: str = "pato_notification"

class Config:
"""
Inner class that defines this model's parsing and serialising behaviour
"""

def from_file(config_file_path: Path, instrument: str = "") -> Dict[str, MachineConfig]:
extra = Extra.allow
json_encoders = {
Path: str,
}


def from_file(config_file_path: Path, instrument: str = "") -> dict[str, MachineConfig]:
with open(config_file_path, "r") as config_stream:
config = yaml.safe_load(config_stream)
return {
Expand All @@ -83,22 +114,36 @@ def from_file(config_file_path: Path, instrument: str = "") -> Dict[str, Machine


class Security(BaseModel):
# Murfey database settings
murfey_db_credentials: Path
crypto_key: str
auth_key: str = ""
sqlalchemy_pooling: bool = True

# ISPyB settings
ispyb_credentials: Optional[Path] = None

# Murfey server connection settings
auth_algorithm: str = ""
auth_key: str = ""
auth_type: Literal["password", "cookie"] = "password"
auth_url: str = ""
sqlalchemy_pooling: bool = True
allow_origins: List[str] = ["*"]
cookie_key: str = ""
session_validation: str = ""
session_token_timeout: Optional[int] = None
auth_type: Literal["password", "cookie"] = "password"
cookie_key: str = ""
allow_origins: list[str] = ["*"]

# RabbitMQ settings
rabbitmq_credentials: Path
feedback_queue: str = "murfey_feedback"

# Graylog settings
graylog_host: str = ""
graylog_port: Optional[int] = None
ispyb_credentials: Optional[Path] = None

class Config:
json_encoders = {
Path: str,
}

@validator("graylog_port")
def check_port_present_if_host_is(
Expand Down Expand Up @@ -158,7 +203,7 @@ def get_security_config() -> Security:


@lru_cache(maxsize=1)
def get_machine_config(instrument_name: str = "") -> Dict[str, MachineConfig]:
def get_machine_config(instrument_name: str = "") -> dict[str, MachineConfig]:
machine_config = {
"": MachineConfig(
acquisition_software=[],
Expand Down
3 changes: 0 additions & 3 deletions src/murfey/util/instrument_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@

from pydantic import BaseModel

from murfey.util.config import MachineConfig


class MultigridWatcherSpec(BaseModel):
source: Path
configuration: MachineConfig
label: str
visit: str
instrument_name: str
Expand Down
41 changes: 24 additions & 17 deletions src/murfey/workflows/clem/process_raw_lifs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
The recipe referred to here is stored on GitLab.
"""

from logging import getLogger
from pathlib import Path
from typing import Optional

Expand All @@ -11,6 +12,8 @@
except AttributeError:
pass # Ignore if ISPyB credentials environment variable not set

logger = getLogger("murfey.workflows.clem.process_raw_lifs")


def zocalo_cluster_request(
file: Path,
Expand Down Expand Up @@ -43,24 +46,28 @@ def zocalo_cluster_request(
# Load machine config to get the feedback queue
feedback_queue: str = messenger.feedback_queue

# Send the message
# The keys under "parameters" will populate all the matching fields in {}
# in the processing recipe
messenger.send(
"processing_recipe",
{
"recipes": ["clem-lif-to-stack"],
"parameters": {
# Job parameters
"lif_file": f"{str(file)}",
"root_folder": root_folder,
# Other recipe parameters
"session_dir": f"{str(session_dir)}",
"session_id": session_id,
"job_name": job_name,
"feedback_queue": feedback_queue,
},
# Construct recipe and submit it for processing
recipe = {
"recipes": ["clem-lif-to-stack"],
"parameters": {
# Job parameters
"lif_file": f"{str(file)}",
"root_folder": root_folder,
# Other recipe parameters
"session_dir": f"{str(session_dir)}",
"session_id": session_id,
"job_name": job_name,
"feedback_queue": feedback_queue,
},
}
logger.debug(
f"Submitting LIF processing request to {messenger.feedback_queue!r} "
"with the following recipe: \n"
f"{recipe}"
)
messenger.send(
queue="processing_recipe",
message=recipe,
new_connection=True,
)
else:
Expand Down
Loading