Skip to content

Commit

Permalink
fix the bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
zain-sohail committed Feb 19, 2025
1 parent dbb7e94 commit df78f69
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 36 deletions.
20 changes: 10 additions & 10 deletions src/sed/config/lab_example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ core:
# the beamline where experiment took place
beamline: cfel
# the ID number of the beamtime
beamtime_id: 11019101
beamtime_id: 11021732
# the year of the beamtime
year: 2023
year: 2025
# the instrument used
instrument: hextof # hextof, wespe, etc

# The paths to the raw and parquet data directories. If these are not
# provided, the loader will try to find the data based on year beamtimeID etc
paths:
# location of the raw data.
raw: ""
raw: "/asap3/fs-flash-o/gpfs/hextof/2025/data/11021732/raw/"
# location of the intermediate parquet files.
processed: ""
processed: "."

# The beamtime directories for different DAQ systems.
# (Not to be changed by user)
Expand All @@ -32,7 +32,7 @@ core:
dataframe:
daq: fl1user3 # DAQ system name to resolve filenames/paths
ubid_offset: 5 # Offset correction to the pulseId
forward_fill_iterations: 2 # Number of iterations to fill the pulseId forward
forward_fill_iterations: 0 # Number of iterations to fill the pulseId forward
split_sector_id_from_dld_time: True # Remove reserved bits for dldSectorID from dldTimeSteps column
sector_id_reserved_bits: 3 # Bits reserved for dldSectorID in the dldTimeSteps column
sector_delays: [0., 0., 0., 0., 0., 0., 0., 0.] # Sector delays
Expand All @@ -42,7 +42,7 @@ dataframe:
tof_binning: 8 # Binning parameter for time-of-flight data

# Columns used for jitter correction
index: index
index: [countId]
jitter_cols: [dldPosX, dldPosY, dldTimeSteps]
formats: [per_file, per_train, per_electron]
fill_formats: [per_train, per_file] # Channels with this format will be forward filled
Expand Down Expand Up @@ -102,28 +102,28 @@ dataframe:
# format: per_file
# dataset_key: "/uncategorised/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.1/time"
# event key
index:
countId:
format: per_file
dataset_key: /DLD/NumOfEvents
# detector x position
dldPosX:
format: per_electron
dataset_key: /DLD/DLD/xPos
dtype: uint32
# dtype: uint32

# detector y position
dldPosY:
format: per_electron
dataset_key: /DLD/DLD/yPos
dtype: uint32
# dtype: uint32

# Detector time-of-flight channel
# if split_sector_id_from_dld_time is set to True, This this will generate
# also the dldSectorID channel
dldTimeSteps:
format: per_electron
dataset_key: /DLD/DLD/times
dtype: uint32
# dtype: uint32

# The auxiliary channel has a special structure where the group further contains
# a multidimensional structure so further aliases are defined below
Expand Down
2 changes: 1 addition & 1 deletion src/sed/core/config_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class DataframeModel(BaseModel):
sector_id_reserved_bits: Optional[int] = None
sector_delays: Optional[Sequence[float]] = None
daq: Optional[str] = None
index: Optional[Union[Sequence[str], str]] = None
index: Optional[Sequence[str]] = None
formats: Optional[Union[Sequence[str], str]] = None
fill_formats: Optional[Union[Sequence[str], str]] = None
# SXP specific settings
Expand Down
48 changes: 24 additions & 24 deletions src/sed/loader/flash/buffer_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ def _create_timed_dataframe(self, df: dd.DataFrame) -> dd.DataFrame:
# Take all timed data rows without filtering
df_timed = df[timed_channels]

# Take only first electron per event
return df_timed.loc[:, :, 0]
return df_timed

def _save_buffer_file(self, paths: dict[str, Path]) -> None:
"""Creates the electron and timed buffer files from the raw H5 file."""
Expand Down Expand Up @@ -265,25 +264,26 @@ def _get_dataframes(self) -> None:
filling = {}
for typ in DF_TYP:
# Read the parquet files into a dask dataframe
df = dd.read_parquet(self.fp[typ], calculate_divisions=True)
df = dd.read_parquet(self.fp[typ]) # , calculate_divisions=True)
# Get the metadata from the parquet files
file_stats[typ] = get_parquet_metadata(self.fp[typ])

# Forward fill the non-electron channels across files
overlap = min(file["num_rows"] for file in file_stats[typ].values())
iterations = self._config.get("forward_fill_iterations", 2)
df = forward_fill_lazy(
df=df,
columns=self.fill_channels,
before=overlap,
iterations=iterations,
)
# TODO: This dict should be returned by forward_fill_lazy
filling[typ] = {
"columns": self.fill_channels,
"overlap": overlap,
"iterations": iterations,
}
if iterations:
df = forward_fill_lazy(
df=df,
columns=self.fill_channels,
before=overlap,
iterations=iterations,
)
# TODO: This dict should be returned by forward_fill_lazy
filling[typ] = {
"columns": self.fill_channels,
"overlap": overlap,
"iterations": iterations,
}

self.df[typ] = df
self.metadata.update({"file_statistics": file_stats, "filling": filling})
Expand Down Expand Up @@ -336,15 +336,15 @@ def process_and_load_dataframe(
get_channels(self._config, formats="all", index=True, extend_aux=True),
)
self._schema_check(self.fp["electron"], schema_set)
schema_set = set(
get_channels(
self._config,
formats=["per_pulse", "per_train"],
index=True,
extend_aux=True,
),
) - {"electronId"}
self._schema_check(self.fp["timed"], schema_set)
# schema_set = set(
# get_channels(
# self._config,
# formats=["per_pulse", "per_train"],
# index=True,
# extend_aux=True,
# ),
# ) - {"electronId"}
# self._schema_check(self.fp["timed"], schema_set)

self._save_buffer_files(force_recreate, debug)

Expand Down
4 changes: 3 additions & 1 deletion src/sed/loader/flash/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,8 @@ def df_train(self) -> pd.DataFrame:
dataset = self.get_dataset_array(channel)
# Electron and pulse resolved MultiIndex is created. Since this is train data,
# the electron and pulse index is always 0
index = np.cumsum([0, *self.get_dataset_array("index")[:-1]])
index_alias = self._config.get("index", ["countId"])[0]
index = np.cumsum([0, *self.get_dataset_array(index_alias)[:-1]])
# Auxiliary dataset (which is stored in the same dataset as other DLD channels)
# contains multiple channels inside. Even though they are resolved per train,
# they come in pulse format, so the extra values are sliced and individual channels are
Expand Down Expand Up @@ -491,4 +492,5 @@ def df(self) -> pd.DataFrame:
# concat offers best performance, almost 3 times faster
df = pd.concat((self.df_electron, self.df_train), axis=1)
df[self.df_train.columns] = df[self.df_train.columns].ffill()
df.index.name = self._config.get("index", ["countId"])[0]
return df

0 comments on commit df78f69

Please sign in to comment.