Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add len and count rate methods to flash loader #564

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .cspell/custom-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ cryo
cstart
cstep
csvfile
cumsum
custom-dictionary
cval
cvdist
Expand Down
78 changes: 70 additions & 8 deletions src/sed/loader/flash/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pathlib import Path

import dask.dataframe as dd
import numpy as np
from natsort import natsorted

from sed.core.logging import set_verbosity
Expand Down Expand Up @@ -79,6 +80,21 @@ def verbose(self, verbose: bool):
self._verbose = verbose
set_verbosity(logger, self._verbose)

def __len__(self) -> int:
"""
Returns the total number of rows in the electron resolved dataframe.

Returns:
int: Total number of rows.
"""
try:
file_statistics = self.metadata["file_statistics"]["electron"]
except KeyError as exc:
raise KeyError("File statistics missing. Use 'read_dataframe' first.") from exc

total_rows = sum(stats["num_rows"] for stats in file_statistics.values())
return total_rows

def _initialize_dirs(self) -> None:
"""
Initializes the directories on Maxwell based on configuration. If paths is provided in
Expand Down Expand Up @@ -223,12 +239,58 @@ def parse_metadata(self, token: str = None) -> dict:

return metadata

def get_count_rate(
self,
fids: Sequence[int] = None, # noqa: ARG002
**kwds, # noqa: ARG002
):
return None, None
def get_count_rate(self, fids=None, **kwds) -> tuple[np.ndarray, np.ndarray]:
"""
Calculates the count rate using the number of rows and elapsed time for each file.
Hence the resolution is not very high, but this method is very fast.

Args:
fids (Sequence[int]): A sequence of file IDs. Defaults to all files.

Keyword Args:
runs: A sequence of run IDs.

Returns:
tuple[np.ndarray, np.ndarray]: The count rate and elapsed time in seconds.

Raises:
KeyError: If the file statistics are missing.
"""

def counts_per_file(fid):
try:
file_statistics = self.metadata["file_statistics"]["electron"]
except KeyError as exc:
raise KeyError("File statistics missing. Use 'read_dataframe' first.") from exc

counts = file_statistics[str(fid)]["num_rows"]
return counts

runs = kwds.pop("runs", None)
if len(kwds) > 0:
raise TypeError(f"get_elapsed_time() got unexpected keyword arguments {kwds.keys()}.")

all_counts = []
elapsed_times = []
if runs is not None:
fids = []
for run_id in runs:
if self.raw_dir is None:
self._initialize_dirs()
files = self.get_files_from_run_id(run_id=run_id, folders=self.raw_dir)
for file in files:
fids.append(self.files.index(file))
else:
if fids is None:
fids = range(len(self.files))

for fid in fids:
all_counts.append(counts_per_file(fid))
elapsed_times.append(self.get_elapsed_time(fids=[fid]))

count_rate = np.array(all_counts) / np.array(elapsed_times)
seconds = np.cumsum(elapsed_times)
return count_rate, seconds

def get_elapsed_time(self, fids: Sequence[int] = None, **kwds) -> float | list[float]: # type: ignore[override]
"""
Expand All @@ -254,7 +316,7 @@ def get_elapsed_time(self, fids: Sequence[int] = None, **kwds) -> float | list[f
raise KeyError(
"File statistics missing. Use 'read_dataframe' first.",
) from exc
time_stamp_alias = self._config["dataframe"].get("time_stamp_alias", "timeStamp")
time_stamp_alias = self._config["dataframe"]["columns"].get("timestamp", "timeStamp")

def get_elapsed_time_from_fid(fid):
try:
Expand Down Expand Up @@ -407,7 +469,7 @@ def read_dataframe(
self.metadata.update(self.parse_metadata(token) if collect_metadata else {})
self.metadata.update(bh.metadata)

print(f"loading complete in {time.time() - t0: .2f} s")
logger.info(f"Loading complete in {time.time() - t0: .2f} s")

return df, df_timed, self.metadata

Expand Down