Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/murfey/client/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ def _analyse(self):
)
self.post_transfer(transferred_file)
self.queue.task_done()
logger.debug("Analyer thread has stopped analysing incoming files")
self.notify(final=True)

def _xml_file(self, data_file: Path) -> Path:
Expand Down Expand Up @@ -403,6 +404,12 @@ def request_stop(self):
self._stopping = True
self._halt_thread = True

def is_safe_to_stop(self):
"""
Checks that the analyser thread is safe to stop
"""
return self._stopping and self._halt_thread and not self.queue.qsize()

def stop(self):
logger.debug("Analyser thread stop requested")
self._stopping = True
Expand All @@ -412,5 +419,8 @@ def stop(self):
self.queue.put(None)
self.thread.join()
except Exception as e:
logger.error(f"Exception encountered while stopping analyser: {e}")
logger.error(
f"Exception encountered while stopping Analyser: {e}",
exc_info=True,
)
logger.debug("Analyser thread stop completed")
118 changes: 89 additions & 29 deletions src/murfey/client/multigrid_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import subprocess
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime
from functools import partial
Expand Down Expand Up @@ -36,6 +37,7 @@ class MultigridController:
rsync_url: str = ""
rsync_module: str = "data"
demo: bool = False
finalising: bool = False
dormant: bool = False
multigrid_watcher_active: bool = True
processing_enabled: bool = True
Expand Down Expand Up @@ -117,34 +119,70 @@ def __post_init__(self):

def _multigrid_watcher_finalised(self):
self.multigrid_watcher_active = False
self.dormancy_check()

def dormancy_check(self):
def is_ready_for_dormancy(self):
"""
When the multigrid watcher is no longer active, sends a request to safely stop
the analyser and file watcher threads, then checks to see that those threads
and the RSyncer processes associated with the current session have all been
safely stopped
"""
log.debug(
f"Starting dormancy check for MultigridController for session {self.session_id}"
)
if not self.multigrid_watcher_active:
if (
for a in self.analysers.values():
if a.is_safe_to_stop():
a.stop()
for w in self._environment.watchers.values():
if w.is_safe_to_stop():
w.stop()
return (
all(r._finalised for r in self.rsync_processes.values())
and not any(a.thread.is_alive() for a in self.analysers.values())
and not any(
w.thread.is_alive() for w in self._environment.watchers.values()
)
):
)
log.debug(f"Multigrid watcher for session {self.session_id} is still active")
return False

def clean_up_once_dormant(self, running_threads: list[threading.Thread]):
"""
A function run in a separate thread that runs the post-session cleanup logic
once all threads associated with this current session are halted, and marks
the controller as being fully dormant after doing so.
"""
for thread in running_threads:
thread.join()
log.debug(f"RSyncer cleanup thread {thread.ident} has stopped safely")
while not self.is_ready_for_dormancy():
time.sleep(10)

# Once all threads are stopped, remove session from the database
log.debug(
f"Submitting request to remove session {self.session_id} from database"
)
response = capture_delete(
f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self.session_id)}",
)
success = response.status_code == 200 if response else False
if not success:
log.warning(f"Could not delete database data for {self.session_id}")

# Send message to frontend to trigger a refresh
self.ws.send(
json.dumps(
{
"message": "refresh",
"target": "sessions",
"instrument_name": self.instrument_name,
}
)
)

def call_remove_session():
response = capture_delete(
f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self.session_id)}",
)
success = response.status_code == 200 if response else False
if not success:
log.warning(
f"Could not delete database data for {self.session_id}"
)

dormancy_thread = threading.Thread(
name=f"Session deletion thread {self.session_id}",
target=call_remove_session,
)
dormancy_thread.start()
self.dormant = True
# Mark as dormant
self.dormant = True

def abandon(self):
for a in self.analysers.values():
Expand All @@ -155,12 +193,26 @@ def abandon(self):
p.request_stop()

def finalise(self):
self.finalising = True
for a in self.analysers.values():
a.request_stop()
log.debug(f"Stop request sent to analyser {a}")
for w in self._environment.watchers.values():
w.request_stop()
log.debug(f"Stop request sent to watcher {w}")
rsync_finaliser_threads = []
for p in self.rsync_processes.keys():
self._finalise_rsyncer(p)
# Collect the running rsyncer finaliser threads to pass to the dormancy checker
rsync_finaliser_threads.append(self._finalise_rsyncer(p))
log.debug(f"Finalised rsyncer {p}")

# Run the session cleanup function in a separate thread
cleanup_upon_dormancy_thread = threading.Thread(
target=self.clean_up_once_dormant,
args=[rsync_finaliser_threads],
daemon=True,
)
cleanup_upon_dormancy_thread.start()

def update_visit_time(self, new_end_time: datetime):
# Convert the received server timestamp into the local equivalent
Expand Down Expand Up @@ -224,7 +276,15 @@ def _start_rsyncer_multigrid(
transfer=machine_data.get("data_transfer_enabled", True),
restarted=str(source) in self.rsync_restarts,
)
self.ws.send(json.dumps({"message": "refresh"}))
self.ws.send(
json.dumps(
{
"message": "refresh",
"target": "rsyncer",
"session_id": self.session_id,
}
)
)

def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False):
if explicit_stop:
Expand All @@ -235,15 +295,19 @@ def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False):
capture_post(stop_url, json={"source": str(source)})

def _finalise_rsyncer(self, source: Path):
"""
Starts a new Rsyncer thread that cleans up the directories, and returns that
thread to be managed by a central thread.
"""
finalise_thread = threading.Thread(
name=f"Controller finaliser thread ({source})",
target=partial(
self.rsync_processes[source].finalise, callback=self.dormancy_check
),
target=self.rsync_processes[source].finalise,
kwargs={"thread": False},
daemon=True,
)
finalise_thread.start()
log.debug(f"Started RSync cleanup for {str(source)}")
return finalise_thread

def _restart_rsyncer(self, source: Path):
self.rsync_processes[source].restart()
Expand Down Expand Up @@ -368,7 +432,6 @@ def rsync_result(update: RSyncerUpdate):
)
else:
self.analysers[source].subscribe(self._data_collection_form)
self.analysers[source].subscribe(self.dormancy_check, final=True)
self.analysers[source].start()
if transfer:
self.rsync_processes[source].subscribe(self.analysers[source].enqueue)
Expand Down Expand Up @@ -408,9 +471,6 @@ def _rsync_update_converter(p: Path) -> None:
),
secondary=True,
)
self._environment.watchers[source].subscribe(
self.dormancy_check, final=True
)
self._environment.watchers[source].start()

def _data_collection_form(self, response: dict):
Expand Down
20 changes: 16 additions & 4 deletions src/murfey/client/watchdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,27 @@ def request_stop(self):
self._stopping = True
self._halt_thread = True

def is_safe_to_stop(self):
"""
Checks that the directory watcher thread is safe to stop
"""
return self._stopping and self._halt_thread and not self.queue.qsize()

def stop(self):
log.debug("DirWatcher thread stop requested")
self._stopping = True
if self.thread.is_alive():
self.queue.join()

self._halt_thread = True
if self.thread.is_alive():
self.queue.put(None)
self.thread.join()
try:
if self.thread.is_alive():
self.queue.put(None)
self.thread.join()
except Exception as e:
log.error(
f"Exception encountered while stopping DirWatcher: {e}",
exc_info=True,
)
log.debug("DirWatcher thread stop completed")

def _process(self):
Expand All @@ -94,6 +105,7 @@ def _process(self):
modification_time=modification_time, transfer_all=self._transfer_all
)
time.sleep(15)
log.debug(f"DirWatcher {self} has stopped scanning")
self.notify(final=True)

def scan(self, modification_time: float | None = None, transfer_all: bool = False):
Expand Down
21 changes: 16 additions & 5 deletions src/murfey/instrument_server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,32 @@ def start_instrument_server():

LogFilter.install()

# Log everything from Murfey by default
logging.getLogger("murfey").setLevel(logging.DEBUG)

# Show only logs at INFO level and above in the console
rich_handler = RichHandler(enable_link_path=False)
logging.getLogger("murfey").setLevel(logging.INFO)
rich_handler.setLevel(logging.INFO)
logging.getLogger("murfey").addHandler(rich_handler)
logging.getLogger("fastapi").addHandler(rich_handler)
logging.getLogger("uvicorn").addHandler(rich_handler)

# Create a websocket app to connect to the backend
ws = murfey.client.websocket.WSApp(
server=read_config().get("Murfey", "server", fallback=""),
register_client=False,
)

handler = CustomHandler(ws.send)
logging.getLogger("murfey").addHandler(handler)
logging.getLogger("fastapi").addHandler(handler)
logging.getLogger("uvicorn").addHandler(handler)
# Forward DEBUG levels logs and above from Murfey to the backend
murfey_ws_handler = CustomHandler(ws.send)
murfey_ws_handler.setLevel(logging.DEBUG)
logging.getLogger("murfey").addHandler(murfey_ws_handler)

# Forward only INFO level logs and above for other packages
other_ws_handler = CustomHandler(ws.send)
other_ws_handler.setLevel(logging.INFO)
logging.getLogger("fastapi").addHandler(other_ws_handler)
logging.getLogger("uvicorn").addHandler(other_ws_handler)

logger.info(
f"Starting Murfey server version {murfey.__version__}, listening on {args.host}:{args.port}"
Expand Down
Loading