Skip to content

Commit

Permalink
Merge pull request #541 from OpenCOMPES/flash_normalization_fixes
Browse files Browse the repository at this point in the history
remove empty pulses from timed dataframe, and bring back old behavior
  • Loading branch information
zain-sohail authored Jan 12, 2025
2 parents 178c6ea + ccd5a8c commit 2dca036
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 13 deletions.
41 changes: 34 additions & 7 deletions src/sed/loader/flash/buffer_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def __init__(
extend_aux=True,
)
self.metadata: dict = {}
self.filter_timed_by_electron: bool = None

def _schema_check(self, files: list[Path], expected_schema_set: set) -> None:
"""
Expand Down Expand Up @@ -159,6 +160,30 @@ def _schema_check(self, files: list[Path], expected_schema_set: set) -> None:
"Please check the configuration file or set force_recreate to True.",
)

def _create_timed_dataframe(self, df: dd.DataFrame) -> dd.DataFrame:
"""Creates the timed dataframe, optionally filtering by electron events.
Args:
df (dd.DataFrame): The input dataframe containing all data
Returns:
dd.DataFrame: The timed dataframe
"""
# Get channels that should be in timed dataframe
timed_channels = self.fill_channels

if self.filter_timed_by_electron:
# Get electron channels to use for filtering
electron_channels = get_channels(self._config, "per_electron")
# Filter rows where electron data exists
df_timed = df.dropna(subset=electron_channels)[timed_channels]
else:
# Take all timed data rows without filtering
df_timed = df[timed_channels]

# Take only first electron per event
return df_timed.loc[:, :, 0]

def _save_buffer_file(self, paths: dict[str, Path]) -> None:
"""
Creates the electron and timed buffer files from the raw H5 file.
Expand All @@ -170,26 +195,24 @@ def _save_buffer_file(self, paths: dict[str, Path]) -> None:
Args:
paths (dict[str, Path]): Dictionary containing the paths to the H5 and buffer files.
"""

# Create a DataFrameCreator instance and the h5 file
# Create a DataFrameCreator instance and get the h5 file
df = DataFrameCreator(config_dataframe=self._config, h5_path=paths["raw"]).df

# forward fill all the non-electron channels
df[self.fill_channels] = df[self.fill_channels].ffill()

# Reset the index of the DataFrame and save both the electron and timed dataframes
# electron resolved dataframe
# Save electron resolved dataframe
electron_channels = get_channels(self._config, "per_electron")
dtypes = get_dtypes(self._config, df.columns.values)
df.dropna(subset=electron_channels).astype(dtypes).reset_index().to_parquet(
paths["electron"],
)

# timed dataframe
# drop the electron channels and only take rows with the first electronId
df_timed = df[self.fill_channels].loc[:, :, 0]
# Create and save timed dataframe
df_timed = self._create_timed_dataframe(df)
dtypes = get_dtypes(self._config, df_timed.columns.values)
df_timed.astype(dtypes).reset_index().to_parquet(paths["timed"])

logger.debug(f"Processed {paths['raw'].stem}")

def _save_buffer_files(self, force_recreate: bool, debug: bool) -> None:
Expand Down Expand Up @@ -272,6 +295,7 @@ def process_and_load_dataframe(
suffix: str = "",
debug: bool = False,
remove_invalid_files: bool = False,
filter_timed_by_electron: bool = True,
) -> tuple[dd.DataFrame, dd.DataFrame]:
"""
Runs the buffer file creation process.
Expand All @@ -284,11 +308,14 @@ def process_and_load_dataframe(
force_recreate (bool): Flag to force recreation of buffer files.
suffix (str): Suffix for buffer file names.
debug (bool): Flag to enable debug mode.):
remove_invalid_files (bool): Flag to remove invalid files.
filter_timed_by_electron (bool): Flag to filter timed data by valid electron events.
Returns:
Tuple[dd.DataFrame, dd.DataFrame]: The electron and timed dataframes.
"""
self.fp = BufferFilePaths(self._config, h5_paths, folder, suffix, remove_invalid_files)
self.filter_timed_by_electron = filter_timed_by_electron

if not force_recreate:
schema_set = set(
Expand Down
5 changes: 5 additions & 0 deletions src/sed/loader/flash/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,9 @@ def read_dataframe(
remove_invalid_files (bool, optional): Whether to exclude invalid files.
Defaults to False.
scicat_token (str, optional): The scicat token to use for fetching metadata.
filter_timed_by_electron (bool, optional): When True, the timed dataframe will only
contain data points where valid electron events were detected. When False, all
timed data points are included regardless of electron detection. Defaults to True.
Returns:
tuple[dd.DataFrame, dd.DataFrame, dict]: A tuple containing the concatenated DataFrame
Expand All @@ -345,6 +348,7 @@ def read_dataframe(
debug = kwds.pop("debug", False)
remove_invalid_files = kwds.pop("remove_invalid_files", False)
scicat_token = kwds.pop("scicat_token", None)
filter_timed_by_electron = kwds.pop("filter_timed_by_electron", True)

if len(kwds) > 0:
raise ValueError(f"Unexpected keyword arguments: {kwds.keys()}")
Expand Down Expand Up @@ -391,6 +395,7 @@ def read_dataframe(
suffix=detector,
debug=debug,
remove_invalid_files=remove_invalid_files,
filter_timed_by_electron=filter_timed_by_electron,
)

if self.instrument == "wespe":
Expand Down
4 changes: 2 additions & 2 deletions tutorial/10_hextof_workflow_trXPS_bam_correction.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@
"axes = ['energy', 'delayStage']\n",
"ranges = [[-37.5,-27.5], [1446.75,1449.15]]\n",
"bins = [200,40]\n",
"res = sp_44498.compute(bins=bins, axes=axes, ranges=ranges)"
"res = sp_44498.compute(bins=bins, axes=axes, ranges=ranges, normalize_to_acquisition_time=\"delayStage\")"
]
},
{
Expand Down Expand Up @@ -394,7 +394,7 @@
"axes = ['energy', 'delayStage']\n",
"ranges = [[-37.5,-27.5], [-1.5,1.5]]\n",
"bins = [200,60]\n",
"res_corr = sp_44498.compute(bins=bins, axes=axes, ranges=ranges)"
"res_corr = sp_44498.compute(bins=bins, axes=axes, ranges=ranges, normalize_to_acquisition_time=\"delayStage\")"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@
"axes = ['dldTimeSteps', 'delayStage']\n",
"ranges = [[3900,4200], [-1.5,1.5]]\n",
"bins = [100,60]\n",
"res_corr = sp_44498.compute(bins=bins, axes=axes, ranges=ranges)\n",
"res_corr = sp_44498.compute(bins=bins, axes=axes, ranges=ranges, normalize_to_acquisition_time=\"delayStage\")\n",
"\n",
"fig,ax = plt.subplots(1,2,figsize=(8,3), layout='constrained')\n",
"fig.suptitle(f\"Run {run_number}: W 4f, side bands\")\n",
Expand Down Expand Up @@ -441,7 +441,7 @@
"axes = ['energy', 'delayStage']\n",
"ranges = [[-37.5,-27.5], [-1.5,1.5]]\n",
"bins = [200,60]\n",
"res_corr = sp_44498.compute(bins=bins, axes=axes, ranges=ranges)\n",
"res_corr = sp_44498.compute(bins=bins, axes=axes, ranges=ranges, normalize_to_acquisition_time=\"delayStage\")\n",
"\n",
"fig,ax = plt.subplots(1,2,figsize=(8,3), layout='constrained')\n",
"fig.suptitle(f\"Run {run_number}: W 4f, side bands\")\n",
Expand Down
21 changes: 20 additions & 1 deletion tutorial/4_hextof_workflow.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@
"assert config_file.exists()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The path to the processed folder can also be defined as a keyword argument later."
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -162,7 +169,19 @@
"metadata": {},
"source": [
"### Generate the Processor instance\n",
"this cell generates an instance of the `SedProcessor` class. It will be our workhorse for the entire workflow."
"this cell generates an instance of the `SedProcessor` class. It will be our workhorse for the entire workflow.\n",
"\n",
"#### Important note\n",
"The following extra arguments are available for FlashLoader. None of which are necessary to give but helpful to know.\n",
"- **force_recreate**: Probably the most useful. In case the config is changed, this allows to reduce the raw h5 files to the the intermediate parquet format again. Otherwise, the schema between the saved dataframe and config differs.\n",
"- **debug**: Setting this runs the reduction process in serial, so the errors are easier to find.\n",
"- **remove_invalid_files**: Sometimes some critical channels defined in the config are missing in some raw files. Setting this will make sure to ignore such files.\n",
"- **filter_timed_by_electron**: Defaults to True. When True, the timed dataframe will only\n",
" contain data points where valid electron events were detected. When False, all\n",
" timed data points are included regardless of electron detection (see https://github.com/OpenCOMPES/sed/issues/307)\n",
"- **processed_dir**: Location to save the reduced parquet files. \n",
"- **scicat_token**: Token from your scicat account.\n",
"- **detector**: '1Q' and '4Q' detector for example. Useful when there are separate raw files for each detector."
]
},
{
Expand Down
2 changes: 1 addition & 1 deletion tutorial/9_hextof_workflow_trXPD.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@
"axes = ['energy', 'delayStage']\n",
"ranges = [[-37.5,-27.5], [-1.5,1.5]]\n",
"bins = [200,60]\n",
"res_corr = sp_44498.compute(bins=bins, axes=axes, ranges=ranges)"
"res_corr = sp_44498.compute(bins=bins, axes=axes, ranges=ranges, normalize_to_acquisition_time=\"delayStage\")"
]
},
{
Expand Down

0 comments on commit 2dca036

Please sign in to comment.