Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/murfey/client/multigrid_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,8 +667,22 @@ def _increment_transferred_files_prometheus(
requests.post(url, json=data)

def _increment_transferred_files(
self, updates: List[RSyncerUpdate], source: str, destination: str
self,
updates: List[RSyncerUpdate],
num_skipped_files: int,
source: str,
destination: str,
):
skip_url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_skipped_files_prometheus', visit_name=self._environment.visit)}"
requests.post(
skip_url,
json={
"source": source,
"session_id": self.session_id,
"increment_count": num_skipped_files,
},
)

checked_updates = [
update for update in updates if update.outcome is TransferResult.SUCCESS
]
Expand Down
8 changes: 6 additions & 2 deletions src/murfey/client/rsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def _fake_transfer(self, files: list[Path]) -> bool:
self.notify(update)
updates.append(update)
time.sleep(0.01)
self.notify([update], secondary=True)
self.notify([update], num_skipped_files=0, secondary=True)
# self.notify(updates, secondary=True)

return True
Expand All @@ -328,8 +328,10 @@ def _transfer(self, infiles: list[Path]) -> bool:
if f.is_file() and f.stat().st_ctime < self._end_time.timestamp()
]
self._skipped_files.extend(set(infiles).difference(set(files)))
num_skipped_files = len(set(infiles).difference(set(files)))
else:
files = [f for f in infiles if f.is_file()]
num_skipped_files = 0

previously_transferred = self._files_transferred
transfer_success: set[Path] = set()
Expand Down Expand Up @@ -528,7 +530,9 @@ def parse_stderr(line: str):
if success:
success = result.returncode == 0

self.notify(successful_updates, secondary=True)
self.notify(
successful_updates, num_skipped_files=num_skipped_files, secondary=True
)

# Print out a summary message for each file transfer batch instead of individual messages
# List out file paths as stored in memory to see if issue is due to file path mismatch
Expand Down
13 changes: 10 additions & 3 deletions src/murfey/server/api/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from sqlmodel import select
from werkzeug.utils import secure_filename

import murfey.server.prometheus as prom
from murfey.server.api.auth import MurfeyInstrumentNameFrontend as MurfeyInstrumentName
from murfey.server.api.auth import MurfeySessionIDFrontend as MurfeySessionID
from murfey.server.api.auth import (
Expand Down Expand Up @@ -555,8 +556,8 @@ async def flush_skipped_rsyncer(
db.commit()

# Send request to flush rsyncer
data: dict = {}
update_result: dict = {}
flush_result: dict = {}
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
Expand All @@ -583,8 +584,14 @@ async def flush_skipped_rsyncer(
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
},
) as resp:
data = await resp.json()
return data
flush_result = await resp.json()
if not flush_result.get("success", False):
return {"success": False}
# Reset the skipped file count for the specific Prometheus gauge to 0
prom.skipped_files.labels(
rsync_source=rsyncer_source.source, visit=session_entry.visit
).set(0)
return flush_result


class RSyncerInfo(BaseModel):
Expand Down
11 changes: 10 additions & 1 deletion src/murfey/server/api/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from murfey.server.murfey_db import murfey_db
from murfey.util import sanitise
from murfey.util.db import RsyncInstance
from murfey.util.models import RsyncerInfo
from murfey.util.models import RsyncerInfo, RsyncerSkippedFiles

logger = getLogger("murfey.server.api.prometheus")

Expand Down Expand Up @@ -90,6 +90,15 @@ def increment_rsync_transferred_files_prometheus(
).inc(rsyncer_info.data_bytes)


@router.post("/visits/{visit_name}/increment_rsync_skipped_files_prometheus")
def increment_rsync_skipped_files_prometheus(
visit_name: str, rsyncer_skipped_files: RsyncerSkippedFiles, db=murfey_db
):
prom.skipped_files.labels(
rsync_source=rsyncer_skipped_files.source, visit=visit_name
).inc(rsyncer_skipped_files.increment_count)


@router.post("/visits/{visit_name}/monitoring/{on}")
def change_monitoring_status(visit_name: str, on: int):
prom.monitoring_switch.labels(visit=visit_name)
Expand Down
13 changes: 1 addition & 12 deletions src/murfey/server/api/session_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,18 +224,6 @@ def register_rsyncer(session_id: int, rsyncer_info: RsyncerInfo, db=murfey_db):
db.add(rsync_instance)
db.commit()
db.close()
prom.seen_files.labels(rsync_source=rsyncer_info.source, visit=visit_name)
prom.seen_data_files.labels(rsync_source=rsyncer_info.source, visit=visit_name)
prom.transferred_files.labels(rsync_source=rsyncer_info.source, visit=visit_name)
prom.transferred_files_bytes.labels(
rsync_source=rsyncer_info.source, visit=visit_name
)
prom.transferred_data_files.labels(
rsync_source=rsyncer_info.source, visit=visit_name
)
prom.transferred_data_files_bytes.labels(
rsync_source=rsyncer_info.source, visit=visit_name
)
prom.seen_files.labels(rsync_source=rsyncer_info.source, visit=visit_name).set(0)
prom.transferred_files.labels(
rsync_source=rsyncer_info.source, visit=visit_name
Expand All @@ -252,6 +240,7 @@ def register_rsyncer(session_id: int, rsyncer_info: RsyncerInfo, db=murfey_db):
prom.transferred_data_files_bytes.labels(
rsync_source=rsyncer_info.source, visit=visit_name
).set(0)
prom.skipped_files.labels(rsync_source=rsyncer_info.source, visit=visit_name).set(0)
return rsyncer_info


Expand Down
5 changes: 5 additions & 0 deletions src/murfey/server/api/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ def remove_session_by_id(session_id: int, db):
args=(ri.source, session.visit),
label="transferred_data_file_bytes",
)
safe_run(
prom.skipped_files.remove,
args=(ri.source, session.visit),
label="skipped_files",
)
collected_ids = db.exec(
select(DataCollectionGroup, DataCollection, ProcessingJob)
.where(DataCollectionGroup.session_id == session_id)
Expand Down
6 changes: 6 additions & 0 deletions src/murfey/server/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
["rsync_source", "visit"],
)

skipped_files = Gauge(
"skipped_files",
"Number of files not transferred due to end time",
["rsync_source", "visit"],
)

preprocessed_movies = Counter(
"preprocessed_movies",
"Number of movies that have been preprocessed",
Expand Down
6 changes: 6 additions & 0 deletions src/murfey/util/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ class RsyncerInfo(BaseModel):
tag: str = ""


class RsyncerSkippedFiles(BaseModel):
source: str
session_id: int
increment_count: int = 1


"""
Single Particle Analysis
========================
Expand Down
7 changes: 7 additions & 0 deletions src/murfey/util/route_manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,13 @@ murfey.server.api.prometheus.router:
type: str
methods:
- POST
- path: /prometheus/visits/{visit_name}/increment_rsync_skipped_files_prometheus
function: increment_rsync_skipped_files_prometheus
path_params:
- name: visit_name
type: str
methods:
- POST
- path: /prometheus/visits/{visit_name}/monitoring/{on}
function: change_monitoring_status
path_params:
Expand Down