diff --git a/sed/develop/_modules/index.html b/sed/develop/_modules/index.html index 266e17f..a0ce1b5 100644 --- a/sed/develop/_modules/index.html +++ b/sed/develop/_modules/index.html @@ -2,12 +2,12 @@ - +
-SED 0.4.0.dev0 documentation
+SED 0.4.1.dev383+g27234e0 documentation
@@ -485,7 +485,7 @@SED 0.4.0.dev0 documentation
+SED 0.4.1.dev383+g27234e0 documentation
@@ -415,54 +415,49 @@
"""This module contains the binning functions of the sed.binning module
-
"""
-import gc
-from functools import reduce
-from typing import cast
-from typing import List
-from typing import Sequence
-from typing import Tuple
-from typing import Union
-
-import dask.dataframe
-import numpy as np
-import pandas as pd
-import psutil
-import xarray as xr
-from threadpoolctl import threadpool_limits
-from tqdm.auto import tqdm
-
-from .numba_bin import numba_histogramdd
-from .utils import _arraysum
-from .utils import bin_centers_to_bin_edges
-from .utils import simplify_binning_arguments
+from __future__ import annotations
+
+import gc
+from collections.abc import Sequence
+from functools import reduce
+from typing import cast
+from typing import Union
+
+import dask.dataframe
+import numpy as np
+import pandas as pd
+import psutil
+import xarray as xr
+from threadpoolctl import threadpool_limits
+from tqdm.auto import tqdm
+
+from .numba_bin import numba_histogramdd
+from .utils import _arraysum
+from .utils import bin_centers_to_bin_edges
+from .utils import simplify_binning_arguments
N_CPU = psutil.cpu_count()
-[docs]def bin_partition(
- part: Union[dask.dataframe.DataFrame, pd.DataFrame],
- bins: Union[
- int,
- dict,
- Sequence[int],
- Sequence[np.ndarray],
- Sequence[tuple],
- ] = 100,
+
+[docs]
+def bin_partition(
+ part: dask.dataframe.DataFrame | pd.DataFrame,
+ bins: int | dict | Sequence[int] | Sequence[np.ndarray] | Sequence[tuple] = 100,
axes: Sequence[str] = None,
- ranges: Sequence[Tuple[float, float]] = None,
+ ranges: Sequence[tuple[float, float]] = None,
hist_mode: str = "numba",
- jitter: Union[list, dict] = None,
+ jitter: list | dict = None,
return_edges: bool = False,
skip_test: bool = False,
-) -> Union[np.ndarray, Tuple[np.ndarray, list]]:
+) -> np.ndarray | tuple[np.ndarray, list]:
"""Compute the n-dimensional histogram of a single dataframe partition.
Args:
- part (Union[dask.dataframe.DataFrame, pd.DataFrame]): dataframe on which
+ part (dask.dataframe.DataFrame | pd.DataFrame): dataframe on which
to perform the histogram. Usually a partition of a dask DataFrame.
- bins (int, dict, Sequence[int], Sequence[np.ndarray], Sequence[tuple], optional):
+ bins (int | dict | Sequence[int] | Sequence[np.ndarray] | Sequence[tuple], optional):
Definition of the bins. Can be any of the following cases:
- an integer describing the number of bins for all dimensions. This
@@ -486,7 +481,7 @@ Source code for sed.binning.binning
the order of the dimensions in the resulting array. Only not required if
bins are provided as dictionary containing the axis names.
Defaults to None.
- ranges (Sequence[Tuple[float, float]], optional): Sequence of tuples containing
+ ranges (Sequence[tuple[float, float]], optional): Sequence of tuples containing
the start and end point of the binning range. Required if bins given as
int or Sequence[int]. Defaults to None.
hist_mode (str, optional): Histogram calculation method.
@@ -495,7 +490,7 @@ Source code for sed.binning.binning
- "numba" use a numba powered similar method.
Defaults to "numba".
- jitter (Union[list, dict], optional): a list of the axes on which to apply
+ jitter (list | dict, optional): a list of the axes on which to apply
jittering. To specify the jitter amplitude or method (normal or uniform
noise) a dictionary can be passed. This should look like
jitter={'axis':{'amplitude':0.5,'mode':'uniform'}}.
@@ -518,8 +513,8 @@ Source code for sed.binning.binning
present in the dataframe
Returns:
- Union[np.ndarray, Tuple[np.ndarray, list]]: 2-element tuple returned only when
- returnEdges is True. Otherwise only hist is returned.
+ np.ndarray | tuple[np.ndarray: 2-element tuple returned only when
+ return_edges is True. Otherwise only hist is returned.
- **hist**: The result of the n-dimensional binning
- **edges**: A list of D arrays describing the bin edges for each dimension.
@@ -538,17 +533,17 @@ Source code for sed.binning.binning
raise TypeError(
"axes needs to be of type 'List[str]' if tests are skipped!",
)
- bins = cast(Union[List[int], List[np.ndarray]], bins)
- axes = cast(List[str], axes)
- ranges = cast(List[Tuple[float, float]], ranges)
+ bins = cast(Union[list[int], list[np.ndarray]], bins)
+ axes = cast(list[str], axes)
+ ranges = cast(list[tuple[float, float]], ranges)
# convert bin centers to bin edges:
if all(isinstance(x, np.ndarray) for x in bins):
- bins = cast(List[np.ndarray], bins)
+ bins = cast(list[np.ndarray], bins)
for i, bin_centers in enumerate(bins):
bins[i] = bin_centers_to_bin_edges(bin_centers)
else:
- bins = cast(List[int], bins)
+ bins = cast(list[int], bins)
# shift ranges by half a bin size to align the bin centers to the given ranges,
# as the histogram functions interpret the ranges as limits for the edges.
for i, nbins in enumerate(bins):
@@ -617,26 +612,23 @@ Source code for sed.binning.binning
return hist_partition
-[docs]def bin_dataframe(
+
+
+[docs]
+def bin_dataframe(
df: dask.dataframe.DataFrame,
- bins: Union[
- int,
- dict,
- Sequence[int],
- Sequence[np.ndarray],
- Sequence[tuple],
- ] = 100,
+ bins: int | dict | Sequence[int] | Sequence[np.ndarray] | Sequence[tuple] = 100,
axes: Sequence[str] = None,
- ranges: Sequence[Tuple[float, float]] = None,
+ ranges: Sequence[tuple[float, float]] = None,
hist_mode: str = "numba",
mode: str = "fast",
- jitter: Union[list, dict] = None,
+ jitter: list | dict = None,
pbar: bool = True,
n_cores: int = N_CPU - 1,
threads_per_worker: int = 4,
threadpool_api: str = "blas",
return_partitions: bool = False,
- **kwds,
+ compute_kwds: dict = {},
) -> xr.DataArray:
"""Computes the n-dimensional histogram on columns of a dataframe,
parallelized.
@@ -644,7 +636,7 @@ Source code for sed.binning.binning
Args:
df (dask.dataframe.DataFrame): a dask.DataFrame on which to perform the
histogram.
- bins (int, dict, Sequence[int], Sequence[np.ndarray], Sequence[tuple], optional):
+ bins (int | dict | Sequence[int] | Sequence[np.ndarray] | Sequence[tuple], optional):
Definition of the bins. Can be any of the following cases:
- an integer describing the number of bins for all dimensions. This
@@ -668,7 +660,7 @@ Source code for sed.binning.binning
the order of the dimensions in the resulting array. Only not required if
bins are provided as dictionary containing the axis names.
Defaults to None.
- ranges (Sequence[Tuple[float, float]], optional): Sequence of tuples containing
+ ranges (Sequence[tuple[float, float]], optional): Sequence of tuples containing
the start and end point of the binning range. Required if bins given as
int or Sequence[int]. Defaults to None.
hist_mode (str, optional): Histogram calculation method.
@@ -685,7 +677,7 @@ Source code for sed.binning.binning
- 'legacy': Single-core recombination of partition results.
Defaults to "fast".
- jitter (Union[list, dict], optional): a list of the axes on which to apply
+ jitter (list | dict, optional): a list of the axes on which to apply
jittering. To specify the jitter amplitude or method (normal or uniform
noise) a dictionary can be passed. This should look like
jitter={'axis':{'amplitude':0.5,'mode':'uniform'}}.
@@ -696,7 +688,7 @@ Source code for sed.binning.binning
Defaults to None.
pbar (bool, optional): Option to show the tqdm progress bar. Defaults to True.
n_cores (int, optional): Number of CPU cores to use for parallelization.
- Defaults to all but one of the available cores. Defaults to N_CPU-1.
+ Defaults to all but one of the available cores.
threads_per_worker (int, optional): Limit the number of threads that
multiprocessing can spawn. Defaults to 4.
threadpool_api (str, optional): The API to use for multiprocessing.
@@ -704,7 +696,7 @@ Source code for sed.binning.binning
return_partitions (bool, optional): Option to return a hypercube of dimension
n+1, where the last dimension corresponds to the dataframe partitions.
Defaults to False.
- **kwds: Keyword arguments passed to ``dask.compute()``
+ compute_kwds (dict, optional): Dict of Keyword arguments passed to ``dask.compute()``
Raises:
Warning: Warns if there are unimplemented features the user is trying to use.
@@ -720,14 +712,14 @@ Source code for sed.binning.binning
# create the coordinate axes for the xarray output
# if provided as array, they are interpreted as bin centers
if isinstance(bins[0], np.ndarray):
- bins = cast(List[np.ndarray], bins)
+ bins = cast(list[np.ndarray], bins)
coords = dict(zip(axes, bins))
elif ranges is None:
raise ValueError(
"bins is not an array and range is none. this shouldn't happen.",
)
else:
- bins = cast(List[int], bins)
+ bins = cast(list[int], bins)
coords = {
ax: np.linspace(r[0], r[1], n, endpoint=False) for ax, r, n in zip(axes, ranges, bins)
}
@@ -764,7 +756,7 @@ Source code for sed.binning.binning
)
if len(core_tasks) > 0:
- core_results = dask.compute(*core_tasks, **kwds)
+ core_results = dask.compute(*core_tasks, **compute_kwds)
if return_partitions:
for core_result in core_results:
@@ -805,7 +797,7 @@ Source code for sed.binning.binning
combine_tasks.append(
dask.delayed(reduce)(_arraysum, combine_parts),
)
- combine_results = dask.compute(*combine_tasks, **kwds)
+ combine_results = dask.compute(*combine_tasks, **compute_kwds)
# Directly fill into target array. This is much faster than
# the (not so parallel) reduce/concatenation used before,
# and uses less memory.
@@ -853,7 +845,8 @@ Source code for sed.binning.binning
return data_array
-def normalization_histogram_from_timestamps(
+
+def normalization_histogram_from_timestamps(
df: dask.dataframe.DataFrame,
axis: str,
bin_centers: np.ndarray,
@@ -889,7 +882,7 @@ Source code for sed.binning.binning
return data_array
-def normalization_histogram_from_timed_dataframe(
+def normalization_histogram_from_timed_dataframe(
df: dask.dataframe.DataFrame,
axis: str,
bin_centers: np.ndarray,
@@ -924,8 +917,8 @@ Source code for sed.binning.binning
return data_array
-def apply_jitter_on_column(
- df: Union[dask.dataframe.core.DataFrame, pd.DataFrame],
+def apply_jitter_on_column(
+ df: dask.dataframe.core.DataFrame | pd.DataFrame,
amp: float,
col: str,
mode: str = "uniform",
@@ -1001,7 +994,7 @@ Source code for sed.binning.binning
sed.binning.numba_bin — SED 0.4.0.dev0 documentation
+ sed.binning.numba_bin — SED 0.4.1.dev383+g27234e0 documentation
@@ -29,7 +29,7 @@
-
+
@@ -37,16 +37,16 @@
-
-
-
+
+
+
-
+
@@ -54,7 +54,7 @@
-
+
@@ -116,7 +116,7 @@
- SED 0.4.0.dev0 documentation
+ SED 0.4.1.dev383+g27234e0 documentation
diff --git a/sed/develop/_modules/sed/binning/numba_bin.html b/sed/develop/_modules/sed/binning/numba_bin.html
index 8166cab..b69fd6d 100644
--- a/sed/develop/_modules/sed/binning/numba_bin.html
+++ b/sed/develop/_modules/sed/binning/numba_bin.html
@@ -2,12 +2,12 @@
-
+
-
@@ -416,21 +416,19 @@
Source code for sed.binning.numba_bin
"""This file contains code for binning using numba precompiled code for the
sed.binning module
-
"""
-from typing import Any
-from typing import cast
-from typing import List
-from typing import Sequence
-from typing import Tuple
-from typing import Union
+from __future__ import annotations
+
+from collections.abc import Sequence
+from typing import Any
+from typing import cast
-import numba
-import numpy as np
+import numba
+import numpy as np
@numba.jit(nogil=True, nopython=True)
-def _hist_from_bin_range(
+def _hist_from_bin_range(
sample: np.ndarray,
bins: Sequence[int],
ranges: np.ndarray,
@@ -487,8 +485,10 @@ Source code for sed.binning.numba_bin
return hist
-[docs]@numba.jit(nogil=True, parallel=False, nopython=True)
-def binsearch(bins: np.ndarray, val: float) -> int:
+
+[docs]
+@numba.jit(nogil=True, parallel=False, nopython=True)
+def binsearch(bins: np.ndarray, val: float) -> int:
"""Bisection index search function.
Finds the index of the bin with the highest value below val, i.e. the left edge.
@@ -520,11 +520,12 @@ Source code for sed.binning.numba_bin
mid = (low + high) // 2
+
@numba.jit(nopython=True, nogil=True, parallel=False)
-def _hist_from_bins(
+def _hist_from_bins(
sample: np.ndarray,
bins: Sequence[np.ndarray],
- shape: Tuple,
+ shape: tuple,
) -> np.ndarray:
"""Numba powered binning method, similar to np.histogramdd.
@@ -534,7 +535,7 @@ Source code for sed.binning.numba_bin
sample (np.ndarray) : the array of shape (N,D) on which to compute the histogram
bins (Sequence[np.ndarray]): array of shape (N,D) defining the D bins on which
to compute the histogram, i.e. the desired output axes.
- shape (Tuple): shape of the resulting array. Workaround for the fact numba
+ shape (tuple): shape of the resulting array. Workaround for the fact numba
does not allow to create tuples.
Returns:
hist: the computed n-dimensional histogram
@@ -568,11 +569,13 @@ Source code for sed.binning.numba_bin
return hist
-[docs]def numba_histogramdd(
+
+[docs]
+def numba_histogramdd(
sample: np.ndarray,
- bins: Union[int, Sequence[int], Sequence[np.ndarray], np.ndarray],
+ bins: int | Sequence[int] | Sequence[np.ndarray] | np.ndarray,
ranges: Sequence = None,
-) -> Tuple[np.ndarray, List[np.ndarray]]:
+) -> tuple[np.ndarray, list[np.ndarray]]:
"""Multidimensional histogram function, powered by Numba.
Behaves in total much like numpy.histogramdd. Returns uint32 arrays.
@@ -584,7 +587,7 @@ Source code for sed.binning.numba_bin
Args:
sample (np.ndarray): The data to be histogram'd with shape N,D
- bins (Union[int, Sequence[int], Sequence[np.ndarray], np.ndarray]): The number
+ bins (int | Sequence[int] | Sequence[np.ndarray] | np.ndarray): The number
of bins for each dimension D, or a sequence of bin edges on which to calculate
the histogram.
ranges (Sequence, optional): The range(s) to use for binning when bins is a sequence
@@ -597,7 +600,7 @@ Source code for sed.binning.numba_bin
RuntimeError: Internal shape error after binning
Returns:
- Tuple[np.ndarray, List[np.ndarray]]: 2-element tuple of The computed histogram
+ tuple[np.ndarray, list[np.ndarray]]: 2-element tuple of The computed histogram
and s list of D arrays describing the bin edges for each dimension.
- **hist**: The computed histogram
@@ -629,7 +632,7 @@ Source code for sed.binning.numba_bin
# method == "array"
if isinstance(bins[0], np.ndarray):
- bins = cast(List[np.ndarray], list(bins))
+ bins = cast(list[np.ndarray], list(bins))
hist = _hist_from_bins(
sample,
tuple(bins),
@@ -655,7 +658,7 @@ Source code for sed.binning.numba_bin
bins = tuple(bins)
# Create edge arrays
- edges: List[Any] = []
+ edges: list[Any] = []
nbin = np.empty(num_cols, int)
for i in range(num_cols):
@@ -669,6 +672,7 @@ Source code for sed.binning.numba_bin
raise RuntimeError("Internal Shape Error")
return hist, edges
+
@@ -719,7 +723,7 @@ Source code for sed.binning.numba_bin
sed.binning.utils — SED 0.4.0.dev0 documentation
+ sed.binning.utils — SED 0.4.1.dev383+g27234e0 documentation
@@ -29,7 +29,7 @@
-
+
@@ -37,16 +37,16 @@
-
-
-
+
+
+
-
+
@@ -54,7 +54,7 @@
-
+
@@ -116,7 +116,7 @@
- SED 0.4.0.dev0 documentation
+ SED 0.4.1.dev383+g27234e0 documentation
diff --git a/sed/develop/_modules/sed/binning/utils.html b/sed/develop/_modules/sed/binning/utils.html
index 58ac64c..e8125d3 100644
--- a/sed/develop/_modules/sed/binning/utils.html
+++ b/sed/develop/_modules/sed/binning/utils.html
@@ -2,12 +2,12 @@
-
+
-
@@ -415,33 +415,27 @@
Source code for sed.binning.utils
"""This file contains helper functions for the sed.binning module
-
"""
-from typing import cast
-from typing import List
-from typing import Sequence
-from typing import Tuple
-from typing import Union
+from __future__ import annotations
+
+from collections.abc import Sequence
+from typing import cast
-import numpy as np
+import numpy as np
-def _arraysum(array_a, array_b):
+def _arraysum(array_a, array_b):
"""Calculate the sum of two arrays."""
return array_a + array_b
-[docs]def simplify_binning_arguments(
- bins: Union[
- int,
- dict,
- Sequence[int],
- Sequence[np.ndarray],
- Sequence[tuple],
- ],
+
+[docs]
+def simplify_binning_arguments(
+ bins: int | dict | Sequence[int] | Sequence[np.ndarray] | Sequence[tuple],
axes: Sequence[str] = None,
- ranges: Sequence[Tuple[float, float]] = None,
-) -> Tuple[Union[List[int], List[np.ndarray]], List[str], List[Tuple[float, float]]]:
+ ranges: Sequence[tuple[float, float]] = None,
+) -> tuple[list[int] | list[np.ndarray], list[str], list[tuple[float, float]]]:
"""Convert the flexible input for defining bins into a
simple "axes" "bins" "ranges" tuple.
@@ -449,7 +443,7 @@ Source code for sed.binning.utils
binning functions defined here.
Args:
- bins (int, dict, Sequence[int], Sequence[np.ndarray], Sequence[tuple]):
+ bins (int | dict | Sequence[int] | Sequence[np.ndarray] | Sequence[tuple]):
Definition of the bins. Can be any of the following cases:
- an integer describing the number of bins for all dimensions. This
@@ -472,7 +466,7 @@ Source code for sed.binning.utils
the order of the dimensions in the resulting array. Only not required if
bins are provided as dictionary containing the axis names.
Defaults to None.
- ranges (Sequence[Tuple[float, float]], optional): Sequence of tuples containing
+ ranges (Sequence[tuple[float, float]], optional): Sequence of tuples containing
the start and end point of the binning range. Required if bins given as
int or Sequence[int]. Defaults to None.
@@ -483,7 +477,7 @@ Source code for sed.binning.utils
AttributeError: Shape mismatch
Returns:
- Tuple[Union[List[int], List[np.ndarray]], List[Tuple[float, float]]]: Tuple
+ tuple[list[int] | list[np.ndarray], list[str], list[tuple[float, float]]]: Tuple
containing lists of bin centers, axes, and ranges.
"""
# if bins is a dictionary: unravel to axes and bins
@@ -529,7 +523,7 @@ Source code for sed.binning.utils
# if bins are provided as int, check that ranges are present
if all(isinstance(x, (int, np.int64)) for x in bins):
- bins = cast(List[int], list(bins))
+ bins = cast(list[int], list(bins))
if ranges is None:
raise AttributeError(
"Must provide a range if bins is an integer or list of integers",
@@ -541,7 +535,7 @@ Source code for sed.binning.utils
# otherwise, all bins should be of type np.ndarray here
elif all(isinstance(x, np.ndarray) for x in bins):
- bins = cast(List[np.ndarray], list(bins))
+ bins = cast(list[np.ndarray], list(bins))
else:
raise TypeError(f"Could not interpret bins of type {type(bins)}")
@@ -554,7 +548,10 @@ Source code for sed.binning.utils
return bins, list(axes), list(ranges) if ranges else None
-[docs]def bin_edges_to_bin_centers(bin_edges: np.ndarray) -> np.ndarray:
+
+
+[docs]
+def bin_edges_to_bin_centers(bin_edges: np.ndarray) -> np.ndarray:
"""Converts a list of bin edges into corresponding bin centers
Args:
@@ -568,7 +565,10 @@ Source code for sed.binning.utils
return bin_centers
-[docs]def bin_centers_to_bin_edges(bin_centers: np.ndarray) -> np.ndarray:
+
+
+[docs]
+def bin_centers_to_bin_edges(bin_centers: np.ndarray) -> np.ndarray:
"""Converts a list of bin centers into corresponding bin edges
Args:
@@ -591,6 +591,7 @@ Source code for sed.binning.utils
)
return bin_edges
+
@@ -641,7 +642,7 @@ Source code for sed.binning.utils
sed.calibrator.delay — SED 0.4.0.dev0 documentation
+ sed.calibrator.delay — SED 0.4.1.dev383+g27234e0 documentation
@@ -29,7 +29,7 @@
-
+
@@ -37,16 +37,16 @@
-
-
-
+
+
+
-
+
@@ -54,7 +54,7 @@
-
+
@@ -116,7 +116,7 @@
- SED 0.4.0.dev0 documentation
+ SED 0.4.1.dev383+g27234e0 documentation
diff --git a/sed/develop/_modules/sed/calibrator/delay.html b/sed/develop/_modules/sed/calibrator/delay.html
index abc6393..2b48cc1 100644
--- a/sed/develop/_modules/sed/calibrator/delay.html
+++ b/sed/develop/_modules/sed/calibrator/delay.html
@@ -2,12 +2,12 @@
-
+
-
@@ -416,91 +416,125 @@
Source code for sed.calibrator.delay
"""sed.calibrator.delay module. Code for delay calibration.
"""
-from copy import deepcopy
-from datetime import datetime
-from typing import Any
-from typing import Dict
-from typing import List
-from typing import Sequence
-from typing import Tuple
-from typing import Union
+from __future__ import annotations
-import dask.dataframe
-import h5py
-import numpy as np
-import pandas as pd
+from collections.abc import Sequence
+from copy import deepcopy
+from datetime import datetime
+from typing import Any
-from sed.core import dfops
+import dask.dataframe
+import h5py
+import numpy as np
+import pandas as pd
+from sed.core import dfops
+from sed.core.logging import set_verbosity
+from sed.core.logging import setup_logging
-[docs]class DelayCalibrator:
+# Configure logging
+logger = setup_logging("delay")
+
+
+
+[docs]
+class DelayCalibrator:
"""
Pump-Probe delay calibration methods.
Initialization of the DelayCalibrator class passes the config.
Args:
config (dict, optional): Config dictionary. Defaults to None.
+ verbose (bool, optional): Option to print out diagnostic information.
+ Defaults to True.
"""
- def __init__(
+ def __init__(
self,
config: dict = None,
+ verbose: bool = True,
) -> None:
"""Initialization of the DelayCalibrator class passes the config.
Args:
config (dict, optional): Config dictionary. Defaults to None.
+ verbose (bool, optional): Option to print out diagnostic information.
+ Defaults to True.
"""
if config is not None:
self._config = config
else:
self._config = {}
- self.adc_column: str = self._config["dataframe"].get("adc_column", None)
- self.delay_column: str = self._config["dataframe"]["delay_column"]
- self.corrected_delay_column = self._config["dataframe"].get(
- "corrected_delay_column",
+ self._verbose = verbose
+ set_verbosity(logger, self._verbose)
+
+ self.adc_column: str = config["dataframe"]["columns"]["adc"]
+ self.delay_column: str = config["dataframe"]["columns"]["delay"]
+ self.corrected_delay_column = self._config["dataframe"]["columns"].get(
+ "corrected_delay",
self.delay_column,
)
- self.calibration: Dict[str, Any] = self._config["delay"].get("calibration", {})
- self.offsets: Dict[str, Any] = self._config["delay"].get("offsets", {})
+ self.calibration: dict[str, Any] = self._config["delay"].get("calibration", {})
+ self.offsets: dict[str, Any] = self._config["delay"].get("offsets", {})
+
+ @property
+ def verbose(self) -> bool:
+ """Accessor to the verbosity flag.
+
+ Returns:
+ bool: Verbosity flag.
+ """
+ return self._verbose
+
+ @verbose.setter
+ def verbose(self, verbose: bool):
+ """Setter for the verbosity.
+
+ Args:
+ verbose (bool): Option to turn on verbose output. Sets loglevel to INFO.
+ """
+ self._verbose = verbose
+ set_verbosity(logger, self._verbose)
-[docs] def append_delay_axis(
+
+[docs]
+ def append_delay_axis(
self,
- df: Union[pd.DataFrame, dask.dataframe.DataFrame],
+ df: pd.DataFrame | dask.dataframe.DataFrame,
adc_column: str = None,
delay_column: str = None,
- calibration: Dict[str, Any] = None,
- adc_range: Union[Tuple, List, np.ndarray] = None,
- delay_range: Union[Tuple, List, np.ndarray] = None,
+ calibration: dict[str, Any] = None,
+ adc_range: tuple | list | np.ndarray = None,
+ delay_range: tuple | list | np.ndarray = None,
time0: float = None,
- delay_range_mm: Union[Tuple, List, np.ndarray] = None,
+ delay_range_mm: tuple | list | np.ndarray = None,
datafile: str = None,
p1_key: str = None,
p2_key: str = None,
t0_key: str = None,
- verbose: bool = True,
- ) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]:
+ suppress_output: bool = False,
+ ) -> tuple[pd.DataFrame | dask.dataframe.DataFrame, dict]:
"""Calculate and append the delay axis to the events dataframe, by converting
values from an analog-digital-converter (ADC).
Args:
- df (Union[pd.DataFrame, dask.dataframe.DataFrame]): The dataframe where
+ df (pd.DataFrame | dask.dataframe.DataFrame): The dataframe where
to apply the delay calibration to.
adc_column (str, optional): Source column for delay calibration.
- Defaults to config["dataframe"]["adc_column"].
+ Defaults to config["dataframe"]["columns"]["adc"].
delay_column (str, optional): Destination column for delay calibration.
- Defaults to config["dataframe"]["delay_column"].
+ Defaults to config["dataframe"]["columns"]["delay"].
calibration (dict, optional): Calibration dictionary with parameters for
delay calibration.
- adc_range (Union[Tuple, List, np.ndarray], optional): The range of used
+ adc_range (tuple | list | np.ndarray, optional): The range of used
ADC values. Defaults to config["delay"]["adc_range"].
- delay_range (Union[Tuple, List, np.ndarray], optional): Range of scanned
+ delay_range (tuple | list | np.ndarray, optional): Range of scanned
delay values in ps. If omitted, the range is calculated from the
delay_range_mm and t0 values.
time0 (float, optional): Pump-Probe overlap value of the delay coordinate.
If omitted, it is searched for in the data files.
- delay_range_mm (Union[Tuple, List, np.ndarray], optional): Range of scanned
+ delay_range_mm (tuple | list | np.ndarray, optional): Range of scanned
delay stage in mm. If omitted, it is searched for in the data files.
datafile (str, optional): Datafile in which delay parameters are searched
for. Defaults to None.
@@ -510,15 +544,14 @@ Source code for sed.calibrator.delay
Defaults to config["delay"]["p2_key"]
t0_key (str, optional): hdf5 key for t0 value (mm).
Defaults to config["delay"]["t0_key"]
- verbose (bool, optional): Option to print out diagnostic information.
- Defaults to True.
+ suppress_output (bool, optional): Option to suppress log output. Defaults to False.
Raises:
ValueError: Raised if delay parameters are not found in the file.
NotImplementedError: Raised if no sufficient information passed.
Returns:
- Union[pd.DataFrame, dask.dataframe.DataFrame]: dataframe with added column
+ tuple[pd.DataFrame | dask.dataframe.DataFrame, dict]: dataframe with added column
and delay calibration metadata dictionary.
"""
# pylint: disable=duplicate-code
@@ -533,7 +566,7 @@ Source code for sed.calibrator.delay
or datafile is not None
):
calibration = {}
- calibration["creation_date"] = datetime.now().timestamp()
+ calibration["creation_date"] = datetime.now()
if adc_range is not None:
calibration["adc_range"] = adc_range
if delay_range is not None:
@@ -544,11 +577,9 @@ Source code for sed.calibrator.delay
calibration["delay_range_mm"] = delay_range_mm
else:
# report usage of loaded parameters
- if "creation_date" in calibration and verbose:
- datestring = datetime.fromtimestamp(calibration["creation_date"]).strftime(
- "%m/%d/%Y, %H:%M:%S",
- )
- print(f"Using delay calibration parameters generated on {datestring}")
+ if "creation_date" in calibration and not suppress_output:
+ datestring = calibration["creation_date"].strftime("%m/%d/%Y, %H:%M:%S")
+ logger.info(f"Using delay calibration parameters generated on {datestring}")
if adc_column is None:
adc_column = self.adc_column
@@ -562,9 +593,10 @@ Source code for sed.calibrator.delay
t0_key = self._config["delay"].get("t0_key", "")
if "adc_range" not in calibration.keys():
- calibration["adc_range"] = np.asarray(
- self._config["delay"]["adc_range"],
- ) / 2 ** (self._config["dataframe"]["adc_binning"] - 1)
+ calibration["adc_range"] = (
+ np.asarray(self._config["delay"]["adc_range"])
+ / self._config["dataframe"]["adc_binning"]
+ )
if "delay_range" not in calibration.keys():
if "delay_range_mm" not in calibration.keys() or "time0" not in calibration.keys():
@@ -579,12 +611,12 @@ Source code for sed.calibrator.delay
except KeyError as exc:
raise ValueError(
"Delay stage values not found in file",
- ) from exc
+ ) from exc
calibration["datafile"] = datafile
calibration["delay_range_mm"] = (ret[0], ret[1])
calibration["time0"] = ret[2]
- if verbose:
- print(f"Extract delay range from file '{datafile}'.")
+ if not suppress_output:
+ logger.info(f"Extract delay range from file '{datafile}'.")
else:
raise NotImplementedError(
"Not enough parameters for delay calibration.",
@@ -596,9 +628,9 @@ Source code for sed.calibrator.delay
calibration["time0"],
),
)
- if verbose:
- print(f"Converted delay_range (ps) = {calibration['delay_range']}")
- calibration["creation_date"] = datetime.now().timestamp()
+ if not suppress_output:
+ logger.info(f"Converted delay_range (ps) = {calibration['delay_range']}")
+ calibration["creation_date"] = datetime.now()
if "delay_range" in calibration.keys():
df[delay_column] = calibration["delay_range"][0] + (
@@ -607,8 +639,8 @@ Source code for sed.calibrator.delay
calibration["adc_range"][1] - calibration["adc_range"][0]
)
self.calibration = deepcopy(calibration)
- if verbose:
- print(
+ if not suppress_output:
+ logger.info(
"Append delay axis using delay_range = "
f"[{calibration['delay_range'][0]}, {calibration['delay_range'][1]}]"
" and adc_range = "
@@ -620,42 +652,45 @@ Source code for sed.calibrator.delay
metadata = {"calibration": calibration}
return df, metadata
-[docs] def add_offsets(
+
+
+[docs]
+ def add_offsets(
self,
df: dask.dataframe.DataFrame,
- offsets: Dict[str, Any] = None,
+ offsets: dict[str, Any] = None,
constant: float = None,
flip_delay_axis: bool = None,
- columns: Union[str, Sequence[str]] = None,
- weights: Union[float, Sequence[float]] = 1.0,
- preserve_mean: Union[bool, Sequence[bool]] = False,
- reductions: Union[str, Sequence[str]] = None,
+ columns: str | Sequence[str] = None,
+ weights: float | Sequence[float] = 1.0,
+ preserve_mean: bool | Sequence[bool] = False,
+ reductions: str | Sequence[str] = None,
delay_column: str = None,
- verbose: bool = True,
- ) -> Tuple[dask.dataframe.DataFrame, dict]:
+ suppress_output: bool = False,
+ ) -> tuple[dask.dataframe.DataFrame, dict]:
"""Apply an offset to the delay column based on a constant or other columns.
Args:
df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use.
- offsets (Dict, optional): Dictionary of delay offset parameters.
+ offsets (dict, optional): Dictionary of delay offset parameters.
constant (float, optional): The constant to shift the delay axis by.
flip_delay_axis (bool, optional): Whether to flip the time axis. Defaults to False.
- columns (Union[str, Sequence[str]]): Name of the column(s) to apply the shift from.
- weights (Union[int, Sequence[int]]): weights to apply to the columns.
+ columns (str | Sequence[str]): Name of the column(s) to apply the shift from.
+ weights (float | Sequence[float]): weights to apply to the columns.
Can also be used to flip the sign (e.g. -1). Defaults to 1.
- preserve_mean (bool): Whether to subtract the mean of the column before applying the
- shift. Defaults to False.
- reductions (str): The reduction to apply to the column. Should be an available method
- of dask.dataframe.Series. For example "mean". In this case the function is applied
- to the column to generate a single value for the whole dataset. If None, the shift
- is applied per-dataframe-row. Defaults to None. Currently only "mean" is supported.
+ preserve_mean (bool | Sequence[bool]): Whether to subtract the mean of the column
+ before applying the shift. Defaults to False.
+ reductions (str | Sequence[str]): The reduction to apply to the column. Should be an
+ available method of dask.dataframe.Series. For example "mean". In this case the
+ function is applied to the column to generate a single value for the whole dataset.
+ If None, the shift is applied per-dataframe-row. Defaults to None. Currently only
+ "mean" is supported.
delay_column (str, optional): Name of the column containing the delay values.
- verbose (bool, optional): Option to print out diagnostic information.
- Defaults to True.
+ suppress_output (bool, optional): Option to suppress log output. Defaults to False.
Returns:
- dask.dataframe.DataFrame: Dataframe with the shifted delay axis.
- dict: Metadata dictionary.
+ tuple[dask.dataframe.DataFrame, dict]: Dataframe with the shifted delay axis and
+ Metadata dictionary.
"""
if offsets is None:
offsets = deepcopy(self.offsets)
@@ -663,7 +698,7 @@ Source code for sed.calibrator.delay
if delay_column is None:
delay_column = self.delay_column
- metadata: Dict[str, Any] = {
+ metadata: dict[str, Any] = {
"applied": True,
}
@@ -671,9 +706,10 @@ Source code for sed.calibrator.delay
# pylint:disable=duplicate-code
# use passed parameters, overwrite config
offsets = {}
- offsets["creation_date"] = datetime.now().timestamp()
+ offsets["creation_date"] = datetime.now()
# column-based offsets
if columns is not None:
+ offsets["columns"] = {}
if weights is None:
weights = 1
if isinstance(weights, (int, float, np.integer, np.floating)):
@@ -700,7 +736,7 @@ Source code for sed.calibrator.delay
# store in offsets dictionary
for col, weight, pmean, red in zip(columns, weights, preserve_mean, reductions):
- offsets[col] = {
+ offsets["columns"][col] = {
"weight": weight,
"preserve_mean": pmean,
"reduction": red,
@@ -715,11 +751,9 @@ Source code for sed.calibrator.delay
if flip_delay_axis:
offsets["flip_delay_axis"] = flip_delay_axis
- elif "creation_date" in offsets and verbose:
- datestring = datetime.fromtimestamp(offsets["creation_date"]).strftime(
- "%m/%d/%Y, %H:%M:%S",
- )
- print(f"Using delay offset parameters generated on {datestring}")
+ elif "creation_date" in offsets and not suppress_output:
+ datestring = offsets["creation_date"].strftime("%m/%d/%Y, %H:%M:%S")
+ logger.info(f"Using delay offset parameters generated on {datestring}")
if len(offsets) > 0:
# unpack dictionary
@@ -727,15 +761,13 @@ Source code for sed.calibrator.delay
weights = []
preserve_mean = []
reductions = []
- if verbose:
- print("Delay offset parameters:")
+ log_str = "Delay offset parameters:"
for k, v in offsets.items():
if k == "creation_date":
continue
if k == "constant":
constant = v
- if verbose:
- print(f" Constant: {constant} ")
+ log_str += f"\n Constant: {constant}"
elif k == "flip_delay_axis":
fda = str(v)
if fda.lower() in ["true", "1"]:
@@ -746,25 +778,28 @@ Source code for sed.calibrator.delay
raise ValueError(
f"Invalid value for flip_delay_axis in config: {flip_delay_axis}.",
)
- if verbose:
- print(f" Flip delay axis: {flip_delay_axis} ")
- else:
- columns.append(k)
- try:
- weight = v["weight"]
- except KeyError:
- weight = 1
- weights.append(weight)
- pm = v.get("preserve_mean", False)
- preserve_mean.append(pm)
- red = v.get("reduction", None)
- reductions.append(red)
- if verbose:
- print(
- f" Column[{k}]: Weight={weight}, Preserve Mean: {pm}, ",
- f"Reductions: {red}.",
+ log_str += f"\n Flip delay axis: {flip_delay_axis}"
+ elif k == "columns":
+ for column_name, column_dict in offsets["columns"].items():
+ columns.append(column_name)
+ weight = column_dict.get("weight", 1)
+ if not isinstance(weight, (int, float, np.integer, np.floating)):
+ raise TypeError(
+ f"Invalid type for weight of column {column_name}: {type(weight)}",
+ )
+ weights.append(weight)
+ pm = column_dict.get("preserve_mean", False)
+ preserve_mean.append(pm)
+ red = column_dict.get("reduction", None)
+ reductions.append(red)
+ log_str += (
+ f"\n Column[{column_name}]: Weight={weight}, Preserve Mean: {pm}, "
+ f"Reductions: {red}."
)
+ if not suppress_output:
+ logger.info(log_str)
+
if len(columns) > 0:
df = dfops.offset_by_other_columns(
df=df,
@@ -787,15 +822,19 @@ Source code for sed.calibrator.delay
self.offsets = offsets
metadata["offsets"] = offsets
- return df, metadata
+ return df, metadata
+
+
-[docs]def extract_delay_stage_parameters(
+
+[docs]
+def extract_delay_stage_parameters(
file: str,
p1_key: str,
p2_key: str,
t0_key: str,
-) -> Tuple:
+) -> tuple:
"""
Read delay stage ranges from hdf5 file
@@ -819,22 +858,26 @@ Source code for sed.calibrator.delay
return tuple(values)
-[docs]def mm_to_ps(
- delay_mm: Union[float, np.ndarray],
+
+
+[docs]
+def mm_to_ps(
+ delay_mm: float | np.ndarray,
time0_mm: float,
-) -> Union[float, np.ndarray]:
+) -> float | np.ndarray:
"""Converts a delay stage position in mm into a relative delay in picoseconds
(double pass).
Args:
- delay_mm (Union[float, Sequence[float]]): Delay stage position in mm
+ delay_mm (float | np.ndarray): Delay stage position in mm
time0_mm (float): Delay stage position of pump-probe overlap in mm
Returns:
- Union[float, Sequence[float]]: Relative delay in picoseconds
+ float | np.ndarray: Relative delay in picoseconds
"""
delay_ps = (delay_mm - time0_mm) / 0.15
return delay_ps
+
@@ -885,7 +928,7 @@ Source code for sed.calibrator.delay
sed.calibrator.energy — SED 0.4.0.dev0 documentation
+ sed.calibrator.energy — SED 0.4.1.dev383+g27234e0 documentation
@@ -29,7 +29,7 @@
-
+
@@ -37,16 +37,16 @@
-
-
-
+
+
+
-
+
@@ -54,7 +54,7 @@
-
+
@@ -116,7 +116,7 @@
- SED 0.4.0.dev0 documentation
+ SED 0.4.1.dev383+g27234e0 documentation
diff --git a/sed/develop/_modules/sed/calibrator/energy.html b/sed/develop/_modules/sed/calibrator/energy.html
index 4e82b47..2ebd21b 100644
--- a/sed/develop/_modules/sed/calibrator/energy.html
+++ b/sed/develop/_modules/sed/calibrator/energy.html
@@ -2,12 +2,12 @@
-
+
-
@@ -417,47 +417,51 @@ Source code for sed.calibrator.energy
"""sed.calibrator.energy module. Code for energy calibration and
correction. Mostly ported from https://github.com/mpes-kit/mpes.
"""
-import itertools as it
-import warnings as wn
-from copy import deepcopy
-from datetime import datetime
-from functools import partial
-from typing import Any
-from typing import cast
-from typing import Dict
-from typing import List
-from typing import Literal
-from typing import Sequence
-from typing import Tuple
-from typing import Union
-
-import bokeh.plotting as pbk
-import dask.dataframe
-import h5py
-import ipywidgets as ipw
-import matplotlib
-import matplotlib.pyplot as plt
-import numpy as np
-import pandas as pd
-import psutil
-import xarray as xr
-from bokeh.io import output_notebook
-from bokeh.palettes import Category10 as ColorCycle
-from fastdtw import fastdtw
-from IPython.display import display
-from lmfit import Minimizer
-from lmfit import Parameters
-from lmfit.printfuncs import report_fit
-from numpy.linalg import lstsq
-from scipy.signal import savgol_filter
-from scipy.sparse.linalg import lsqr
-
-from sed.binning import bin_dataframe
-from sed.core import dfops
-from sed.loader.base.loader import BaseLoader
-
-
-[docs]class EnergyCalibrator:
+from __future__ import annotations
+
+import itertools as it
+from collections.abc import Sequence
+from copy import deepcopy
+from datetime import datetime
+from functools import partial
+from typing import Any
+from typing import cast
+from typing import Literal
+
+import bokeh.plotting as pbk
+import dask.dataframe
+import h5py
+import ipywidgets as ipw
+import matplotlib
+import matplotlib.pyplot as plt
+import numpy as np
+import pandas as pd
+import psutil
+import xarray as xr
+from bokeh.io import output_notebook
+from bokeh.palettes import Category10 as ColorCycle
+from fastdtw import fastdtw
+from IPython.display import display
+from lmfit import Minimizer
+from lmfit import Parameters
+from lmfit.printfuncs import fit_report
+from numpy.linalg import lstsq
+from scipy.signal import savgol_filter
+from scipy.sparse.linalg import lsqr
+
+from sed.binning import bin_dataframe
+from sed.core import dfops
+from sed.core.logging import set_verbosity
+from sed.core.logging import setup_logging
+from sed.loader.base.loader import BaseLoader
+
+# Configure logging
+logger = setup_logging("delay")
+
+
+
+[docs]
+class EnergyCalibrator:
"""Electron binding energy calibration workflow.
For the initialization of the EnergyCalibrator class an instance of a
@@ -472,15 +476,18 @@ Source code for sed.calibrator.energy
tof (np.ndarray, optional): TOF-values for the data traces.
Defaults to None.
config (dict, optional): Config dictionary. Defaults to None.
+ verbose (bool, optional): Option to print out diagnostic information.
+ Defaults to True.
"""
- def __init__(
+ def __init__(
self,
loader: BaseLoader,
biases: np.ndarray = None,
traces: np.ndarray = None,
tof: np.ndarray = None,
config: dict = None,
+ verbose: bool = True,
):
"""For the initialization of the EnergyCalibrator class an instance of a
loader is required. The data can be loaded using the optional arguments,
@@ -494,7 +501,17 @@ Source code for sed.calibrator.energy
tof (np.ndarray, optional): TOF-values for the data traces.
Defaults to None.
config (dict, optional): Config dictionary. Defaults to None.
+ verbose (bool, optional): Option to print out diagnostic information.
+ Defaults to True.
"""
+ if config is None:
+ config = {}
+
+ self._config = config
+
+ self._verbose = verbose
+ set_verbosity(logger, self._verbose)
+
self.loader = loader
self.biases: np.ndarray = None
self.traces: np.ndarray = None
@@ -504,37 +521,49 @@ Source code for sed.calibrator.energy
if traces is not None and tof is not None and biases is not None:
self.load_data(biases=biases, traces=traces, tof=tof)
- if config is None:
- config = {}
-
- self._config = config
-
- self.featranges: List[Tuple] = [] # Value ranges for feature detection
+ self.featranges: list[tuple] = [] # Value ranges for feature detection
self.peaks: np.ndarray = np.asarray([])
- self.calibration: Dict[str, Any] = self._config["energy"].get("calibration", {})
-
- self.tof_column = self._config["dataframe"]["tof_column"]
- self.tof_ns_column = self._config["dataframe"].get("tof_ns_column", None)
- self.corrected_tof_column = self._config["dataframe"]["corrected_tof_column"]
- self.energy_column = self._config["dataframe"]["energy_column"]
- self.x_column = self._config["dataframe"]["x_column"]
- self.y_column = self._config["dataframe"]["y_column"]
+ self.calibration: dict[str, Any] = self._config["energy"].get("calibration", {})
+
+ self.tof_column = self._config["dataframe"]["columns"]["tof"]
+ self.tof_ns_column = self._config["dataframe"]["columns"].get("tof_ns", None)
+ self.corrected_tof_column = self._config["dataframe"]["columns"]["corrected_tof"]
+ self.energy_column = self._config["dataframe"]["columns"]["energy"]
+ self.x_column = self._config["dataframe"]["columns"]["x"]
+ self.y_column = self._config["dataframe"]["columns"]["y"]
self.binwidth: float = self._config["dataframe"]["tof_binwidth"]
self.binning: int = self._config["dataframe"]["tof_binning"]
self.x_width = self._config["energy"]["x_width"]
self.y_width = self._config["energy"]["y_width"]
- self.tof_width = np.asarray(
- self._config["energy"]["tof_width"],
- ) / 2 ** (self.binning - 1)
- self.tof_fermi = self._config["energy"]["tof_fermi"] / 2 ** (self.binning - 1)
+ self.tof_width = np.asarray(self._config["energy"]["tof_width"]) / self.binning
+ self.tof_fermi = self._config["energy"]["tof_fermi"] / self.binning
self.color_clip = self._config["energy"]["color_clip"]
self.sector_delays = self._config["dataframe"].get("sector_delays", None)
- self.sector_id_column = self._config["dataframe"].get("sector_id_column", None)
- self.offsets: Dict[str, Any] = self._config["energy"].get("offsets", {})
- self.correction: Dict[str, Any] = self._config["energy"].get("correction", {})
+ self.sector_id_column = self._config["dataframe"]["columns"].get("sector_id", None)
+ self.offsets: dict[str, Any] = self._config["energy"].get("offsets", {})
+ self.correction: dict[str, Any] = self._config["energy"].get("correction", {})
@property
- def ntraces(self) -> int:
+ def verbose(self) -> bool:
+ """Accessor to the verbosity flag.
+
+ Returns:
+ bool: Verbosity flag.
+ """
+ return self._verbose
+
+ @verbose.setter
+ def verbose(self, verbose: bool):
+ """Setter for the verbosity.
+
+ Args:
+ verbose (bool): Option to turn on verbose output. Sets loglevel to INFO.
+ """
+ self._verbose = verbose
+ set_verbosity(logger, self._verbose)
+
+ @property
+ def ntraces(self) -> int:
"""Property returning the number of traces.
Returns:
@@ -543,7 +572,7 @@ Source code for sed.calibrator.energy
return len(self.traces)
@property
- def nranges(self) -> int:
+ def nranges(self) -> int:
"""Property returning the number of specified feature ranges which Can be a
multiple of ntraces.
@@ -553,7 +582,7 @@ Source code for sed.calibrator.energy
return len(self.featranges)
@property
- def dup(self) -> int:
+ def dup(self) -> int:
"""Property returning the duplication number, i.e. the number of feature
ranges per trace.
@@ -562,7 +591,9 @@ Source code for sed.calibrator.energy
"""
return int(np.round(self.nranges / self.ntraces))
-[docs] def load_data(
+
+[docs]
+ def load_data(
self,
biases: np.ndarray = None,
traces: np.ndarray = None,
@@ -591,12 +622,15 @@ Source code for sed.calibrator.energy
else:
self.traces = self.traces_normed = np.asarray([])
-[docs] def bin_data(
+
+
+[docs]
+ def bin_data(
self,
- data_files: List[str],
- axes: List[str] = None,
- bins: List[int] = None,
- ranges: Sequence[Tuple[float, float]] = None,
+ data_files: list[str],
+ axes: list[str] = None,
+ bins: list[int] = None,
+ ranges: Sequence[tuple[float, float]] = None,
biases: np.ndarray = None,
bias_key: str = None,
**kwds,
@@ -604,12 +638,12 @@ Source code for sed.calibrator.energy
"""Bin data from single-event files, and load into class.
Args:
- data_files (List[str]): list of file names to bin
- axes (List[str], optional): bin axes. Defaults to
- config["dataframe"]["tof_column"].
- bins (List[int], optional): number of bins.
+ data_files (list[str]): list of file names to bin
+ axes (list[str], optional): bin axes. Defaults to
+ config["dataframe"]["columns"]["tof"].
+ bins (list[int], optional): number of bins.
Defaults to config["energy"]["bins"].
- ranges (Sequence[Tuple[float, float]], optional): bin ranges.
+ ranges (Sequence[tuple[float, float]], optional): bin ranges.
Defaults to config["energy"]["ranges"].
biases (np.ndarray, optional): Bias voltages used.
If not provided, biases are extracted from the file meta data.
@@ -622,26 +656,21 @@ Source code for sed.calibrator.energy
if bins is None:
bins = [self._config["energy"]["bins"]]
if ranges is None:
- ranges_ = [
- np.array(self._config["energy"]["ranges"]) / 2 ** (self.binning - 1),
- ]
- ranges = [cast(Tuple[float, float], tuple(v)) for v in ranges_]
+ ranges_ = [np.array(self._config["energy"]["ranges"]) / self.binning]
+ ranges = [cast(tuple[float, float], tuple(v)) for v in ranges_]
# pylint: disable=duplicate-code
hist_mode = kwds.pop("hist_mode", self._config["binning"]["hist_mode"])
mode = kwds.pop("mode", self._config["binning"]["mode"])
pbar = kwds.pop("pbar", self._config["binning"]["pbar"])
try:
- num_cores = kwds.pop("num_cores", self._config["binning"]["num_cores"])
+ num_cores = kwds.pop("num_cores", self._config["core"]["num_cores"])
except KeyError:
num_cores = psutil.cpu_count() - 1
threads_per_worker = kwds.pop(
"threads_per_worker",
self._config["binning"]["threads_per_worker"],
)
- threadpool_api = kwds.pop(
- "threadpool_API",
- self._config["binning"]["threadpool_API"],
- )
+ threadpool_api = kwds.pop("threadpool_API", self._config["binning"]["threadpool_API"])
read_biases = False
if biases is None:
@@ -652,7 +681,7 @@ Source code for sed.calibrator.energy
except KeyError as exc:
raise ValueError(
"Either Bias Values or a valid bias_key has to be present!",
- ) from exc
+ ) from exc
dataframe, _, _ = self.loader.read_dataframe(
files=data_files,
@@ -679,13 +708,16 @@ Source code for sed.calibrator.energy
except KeyError as exc:
raise ValueError(
"Either Bias Values or a valid bias_key has to be present!",
- ) from exc
+ ) from exc
tof = traces.coords[(axes[0])]
self.traces = self.traces_normed = np.asarray(traces.T)
self.tof = np.asarray(tof)
self.biases = np.asarray(biases)
-[docs] def normalize(self, smooth: bool = False, span: int = 7, order: int = 1):
+
+
+[docs]
+ def normalize(self, smooth: bool = False, span: int = 7, order: int = 1):
"""Normalize the spectra along an axis.
Args:
@@ -701,11 +733,15 @@ Source code for sed.calibrator.energy
smooth=smooth,
span=span,
order=order,
- )
+ )
+ logger.debug("Normalized energy calibration traces.")
+
-[docs] def adjust_ranges(
+
+[docs]
+ def adjust_ranges(
self,
- ranges: Tuple,
+ ranges: tuple,
ref_id: int = 0,
traces: np.ndarray = None,
peak_window: int = 7,
@@ -716,7 +752,7 @@ Source code for sed.calibrator.energy
(containing the peaks) among all traces.
Args:
- ranges (Tuple):
+ ranges (tuple):
Collection of feature detection ranges, within which an algorithm
(i.e. 1D peak detector) with look for the feature.
ref_id (int, optional): Index of the reference trace. Defaults to 0.
@@ -726,8 +762,10 @@ Source code for sed.calibrator.energy
Defaults to 7.
apply (bool, optional): Option to directly apply the provided parameters.
Defaults to False.
- **kwds:
- keyword arguments for trace alignment (see ``find_correspondence()``).
+ **kwds: keyword arguments
+ - *labels*: List of labels for plotting. Default uses the bias voltages.
+ - *figsize* Figure size.
+ Additional keyword arguments are passed to ``find_correspondence()``.
"""
if traces is None:
traces = self.traces_normed
@@ -736,40 +774,26 @@ Source code for sed.calibrator.energy
ranges=ranges,
ref_id=ref_id,
traces=traces,
- infer_others=True,
- mode="replace",
+ **kwds,
)
self.feature_extract(peak_window=peak_window)
# make plot
labels = kwds.pop("labels", [str(b) + " V" for b in self.biases])
- figsize = kwds.pop("figsize", (8, 4))
+ figsize = kwds.pop("figsize", (6, 4))
plot_segs = []
plot_peaks = []
fig, ax = plt.subplots(figsize=figsize)
- colors = plt.get_cmap("rainbow")(np.linspace(0, 1, len(traces)))
+ colors = it.cycle(plt.rcParams["axes.prop_cycle"].by_key()["color"])
for itr, color in zip(range(len(traces)), colors):
trace = traces[itr, :]
# main traces
- ax.plot(
- self.tof,
- trace,
- ls="-",
- color=color,
- linewidth=1,
- label=labels[itr],
- )
+ ax.plot(self.tof, trace, ls="-", color=color, linewidth=1, label=labels[itr])
# segments:
seg = self.featranges[itr]
cond = (self.tof >= seg[0]) & (self.tof <= seg[1])
tofseg, traceseg = self.tof[cond], trace[cond]
- (line,) = ax.plot(
- tofseg,
- traceseg,
- ls="-",
- color=color,
- linewidth=3,
- )
+ (line,) = ax.plot(tofseg, traceseg, ls="-", color=color, linewidth=3)
plot_segs.append(line)
# markers
(scatt,) = ax.plot(
@@ -783,9 +807,12 @@ Source code for sed.calibrator.energy
plot_peaks.append(scatt)
ax.legend(fontsize=8, loc="upper right")
ax.set_title("")
+ plt.xlabel("Time-of-flight")
+ plt.ylabel("Intensity")
+ plt.tight_layout()
- def update(refid, ranges):
- self.add_ranges(ranges, refid, traces=traces)
+ def update(refid, ranges):
+ self.add_ranges(ranges, refid, traces=traces, **kwds)
self.feature_extract(peak_window=7)
for itr, _ in enumerate(self.traces_normed):
seg = self.featranges[itr]
@@ -797,8 +824,8 @@ Source code for sed.calibrator.energy
plot_segs[itr].set_ydata(traceseg)
plot_segs[itr].set_xdata(tofseg)
- plot_peaks[itr].set_xdata(self.peaks[itr, 0])
- plot_peaks[itr].set_ydata(self.peaks[itr, 1])
+ plot_peaks[itr].set_xdata([self.peaks[itr, 0]])
+ plot_peaks[itr].set_ydata([self.peaks[itr, 1]])
fig.canvas.draw_idle()
@@ -824,13 +851,16 @@ Source code for sed.calibrator.energy
ranges=ranges_slider,
)
- def apply_func(apply: bool): # noqa: ARG001
+ def apply_func(apply: bool): # noqa: ARG001
self.add_ranges(
ranges_slider.value,
refid_slider.value,
traces=self.traces_normed,
+ **kwds,
)
+ logger.info(f"Use feature ranges: {self.featranges}.")
self.feature_extract(peak_window=7)
+ logger.info(f"Extracted energy features: {self.peaks}.")
ranges_slider.close()
refid_slider.close()
apply_button.close()
@@ -843,9 +873,12 @@ Source code for sed.calibrator.energy
if apply:
apply_func(True)
-[docs] def add_ranges(
+
+
+[docs]
+ def add_ranges(
self,
- ranges: Union[List[Tuple], Tuple],
+ ranges: list[tuple] | tuple,
ref_id: int = 0,
traces: np.ndarray = None,
infer_others: bool = True,
@@ -855,7 +888,7 @@ Source code for sed.calibrator.energy
"""Select or extract the equivalent feature ranges (containing the peaks) among all traces.
Args:
- ranges (Union[List[Tuple], Tuple]):
+ ranges (list[tuple] | tuple):
Collection of feature detection ranges, within which an algorithm
(i.e. 1D peak detector) with look for the feature.
ref_id (int, optional): Index of the reference trace. Defaults to 0.
@@ -875,7 +908,7 @@ Source code for sed.calibrator.energy
# Infer the corresponding feature detection range of other traces by alignment
if infer_others:
assert isinstance(ranges, tuple)
- newranges: List[Tuple] = []
+ newranges: list[tuple] = []
for i in range(self.ntraces):
pathcorr = find_correspondence(
@@ -896,16 +929,19 @@ Source code for sed.calibrator.energy
elif mode == "replace":
self.featranges = newranges
-[docs] def feature_extract(
+
+
+[docs]
+ def feature_extract(
self,
- ranges: List[Tuple] = None,
+ ranges: list[tuple] = None,
traces: np.ndarray = None,
peak_window: int = 7,
):
"""Select or extract the equivalent landmarks (e.g. peaks) among all traces.
Args:
- ranges (List[Tuple], optional): List of ranges in each trace to look for
+ ranges (list[tuple], optional): List of ranges in each trace to look for
the peak feature, [start, end]. Defaults to self.featranges.
traces (np.ndarray, optional): Collection of 1D spectra to use for
calibration. Defaults to self.traces_normed.
@@ -928,23 +964,24 @@ Source code for sed.calibrator.energy
pkwindow=peak_window,
)
-[docs] def calibrate(
+
+
+[docs]
+ def calibrate(
self,
- ref_id: int = 0,
+ ref_energy: float = 0,
method: str = "lmfit",
energy_scale: str = "kinetic",
landmarks: np.ndarray = None,
biases: np.ndarray = None,
t: np.ndarray = None,
- verbose: bool = True,
**kwds,
) -> dict:
"""Calculate the functional mapping between time-of-flight and the energy
scale using optimization methods.
Args:
- ref_id (int, optional): The reference trace index (an integer).
- Defaults to 0.
+ ref_energy (float): Binding/kinetic energy of the detected feature.
method (str, optional): Method for determining the energy calibration.
- **'lmfit'**: Energy calibration using lmfit and 1/t^2 form.
@@ -961,8 +998,6 @@ Source code for sed.calibrator.energy
calibration. Defaults to self.peaks.
biases (np.ndarray, optional): Bias values. Defaults to self.biases.
t (np.ndarray, optional): TOF values. Defaults to self.tof.
- verbose (bool, optional): Option to print out diagnostic information.
- Defaults to True.
**kwds: keyword arguments.
See available keywords for ``poly_energy_calibration()`` and
``fit_energy_calibration()``
@@ -999,17 +1034,16 @@ Source code for sed.calibrator.energy
sign * biases,
binwidth,
binning,
- ref_id=ref_id,
+ ref_energy=ref_energy,
t=t,
energy_scale=energy_scale,
- verbose=verbose,
**kwds,
)
elif method in ("lstsq", "lsqr"):
self.calibration = poly_energy_calibration(
landmarks,
sign * biases,
- ref_id=ref_id,
+ ref_energy=ref_energy,
aug=self.dup,
method=method,
t=t,
@@ -1019,13 +1053,16 @@ Source code for sed.calibrator.energy
else:
raise NotImplementedError()
- self.calibration["creation_date"] = datetime.now().timestamp()
+ self.calibration["creation_date"] = datetime.now()
return self.calibration
-[docs] def view( # pylint: disable=dangerous-default-value
+
+
+[docs]
+ def view(
self,
traces: np.ndarray,
- segs: List[Tuple] = None,
+ segs: list[tuple] = None,
peaks: np.ndarray = None,
show_legend: bool = True,
backend: str = "matplotlib",
@@ -1039,7 +1076,7 @@ Source code for sed.calibrator.energy
Args:
traces (np.ndarray): Matrix of traces to visualize.
- segs (List[Tuple], optional): Segments to be highlighted in the
+ segs (list[tuple], optional): Segments to be highlighted in the
visualization. Defaults to None.
peaks (np.ndarray, optional): Peak positions for labelling the traces.
Defaults to None.
@@ -1071,15 +1108,19 @@ Source code for sed.calibrator.energy
sign = 1 if energy_scale == "kinetic" else -1
+ figsize = kwds.pop("figsize", (6, 4))
+
if backend == "matplotlib":
- figsize = kwds.pop("figsize", (12, 4))
- fig, ax = plt.subplots(figsize=figsize)
- for itr, trace in enumerate(traces):
+ colors = it.cycle(plt.rcParams["axes.prop_cycle"].by_key()["color"])
+ _, ax = plt.subplots(figsize=figsize)
+ for itr, color in zip(range(len(traces)), colors):
+ trace = traces[itr, :]
if align:
ax.plot(
- xaxis + sign * (self.biases[itr] - self.biases[self.calibration["refid"]]),
+ xaxis + sign * (self.biases[itr]),
trace,
ls="-",
+ color=color,
linewidth=1,
label=lbs[itr],
**linekwds,
@@ -1089,6 +1130,7 @@ Source code for sed.calibrator.energy
xaxis,
trace,
ls="-",
+ color=color,
linewidth=1,
label=lbs[itr],
**linekwds,
@@ -1103,6 +1145,7 @@ Source code for sed.calibrator.energy
tofseg,
traceseg,
ls="-",
+ color=color,
linewidth=2,
**linesegkwds,
)
@@ -1117,7 +1160,7 @@ Source code for sed.calibrator.energy
if show_legend:
try:
- ax.legend(fontsize=12, **legkwds)
+ ax.legend(fontsize=8, loc="upper right", **legkwds)
except TypeError:
pass
@@ -1128,11 +1171,10 @@ Source code for sed.calibrator.energy
colors = it.cycle(ColorCycle[10])
ttp = [("(x, y)", "($x, $y)")]
- figsize = kwds.pop("figsize", (800, 300))
fig = pbk.figure(
title=ttl,
- width=figsize[0],
- height=figsize[1],
+ width=figsize[0] * 100,
+ height=figsize[1] * 100,
tooltips=ttp,
)
# Plotting the main traces
@@ -1140,7 +1182,7 @@ Source code for sed.calibrator.energy
trace = traces[itr, :]
if align:
fig.line(
- xaxis + sign * (self.biases[itr] - self.biases[self.calibration["refid"]]),
+ xaxis + sign * (self.biases[itr]),
trace,
color=color,
line_dash="solid",
@@ -1193,29 +1235,36 @@ Source code for sed.calibrator.energy
pbk.show(fig)
-[docs] def append_energy_axis(
+
+
+[docs]
+ def append_energy_axis(
self,
- df: Union[pd.DataFrame, dask.dataframe.DataFrame],
+ df: pd.DataFrame | dask.dataframe.DataFrame,
tof_column: str = None,
energy_column: str = None,
calibration: dict = None,
- verbose: bool = True,
+ bias_voltage: float = None,
+ suppress_output: bool = False,
**kwds,
- ) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]:
+ ) -> tuple[pd.DataFrame | dask.dataframe.DataFrame, dict]:
"""Calculate and append the energy axis to the events dataframe.
Args:
- df (Union[pd.DataFrame, dask.dataframe.DataFrame]):
+ df (pd.DataFrame | dask.dataframe.DataFrame):
Dataframe to apply the energy axis calibration to.
tof_column (str, optional): Label of the source column.
- Defaults to config["dataframe"]["tof_column"].
+ Defaults to config["dataframe"]["columns"]["tof"].
energy_column (str, optional): Label of the destination column.
- Defaults to config["dataframe"]["energy_column"].
+ Defaults to config["dataframe"]["columns"]["energy"].
calibration (dict, optional): Calibration dictionary. If provided,
overrides calibration from class or config.
Defaults to self.calibration or config["energy"]["calibration"].
- verbose (bool, optional): Option to print out diagnostic information.
- Defaults to True.
+ bias_voltage (float, optional): Sample bias voltage of the scan data. If omitted,
+ the bias voltage is being read from the dataframe. If it is not found there,
+ a warning is printed and the calibrated data might have an offset.
+ verbose (bool, optional): Option to suppress output of diagnostic information.
+ Defaults to False.
**kwds: additional keyword arguments for the energy conversion. They are
added to the calibration dictionary.
@@ -1224,7 +1273,7 @@ Source code for sed.calibrator.energy
NotImplementedError: Raised if an invalid calib_type is found.
Returns:
- Union[pd.DataFrame, dask.dataframe.DataFrame]: dataframe with added column
+ tuple[pd.DataFrame | dask.dataframe.DataFrame, dict]: dataframe with added column
and energy calibration metadata dictionary.
"""
if tof_column is None:
@@ -1246,13 +1295,11 @@ Source code for sed.calibrator.energy
if len(kwds) > 0:
for key, value in kwds.items():
calibration[key] = value
- calibration["creation_date"] = datetime.now().timestamp()
+ calibration["creation_date"] = datetime.now()
- elif "creation_date" in calibration and verbose:
- datestring = datetime.fromtimestamp(calibration["creation_date"]).strftime(
- "%m/%d/%Y, %H:%M:%S",
- )
- print(f"Using energy calibration parameters generated on {datestring}")
+ elif "creation_date" in calibration and not suppress_output:
+ datestring = calibration["creation_date"].strftime("%m/%d/%Y, %H:%M:%S")
+ logger.info(f"Using energy calibration parameters generated on {datestring}")
# try to determine calibration type if not provided
if "calib_type" not in calibration:
@@ -1263,6 +1310,8 @@ Source code for sed.calibrator.energy
elif "coeffs" in calibration and "E0" in calibration:
calibration["calib_type"] = "poly"
+ if "energy_scale" not in calibration:
+ calibration["energy_scale"] = "kinetic"
else:
raise ValueError("No valid calibration parameters provided!")
@@ -1302,36 +1351,78 @@ Source code for sed.calibrator.energy
else:
raise NotImplementedError
+ if not suppress_output:
+ report_dict = {
+ "calib_type": calibration["calib_type"],
+ "fit_function": calibration["fit_function"],
+ "coefficients": calibration["coefficients"],
+ }
+ logger.debug(f"Used energy calibration parameters: {report_dict}.")
+
+ # apply bias offset
+ scale_sign: Literal[-1, 1] = -1 if calibration["energy_scale"] == "binding" else 1
+ if bias_voltage is not None:
+ df[energy_column] = df[energy_column] + scale_sign * bias_voltage
+ if not suppress_output:
+ logger.debug(f"Shifted energy column by constant bias value: {bias_voltage}.")
+ elif self._config["dataframe"]["columns"]["bias"] in df.columns:
+ df = dfops.offset_by_other_columns(
+ df=df,
+ target_column=energy_column,
+ offset_columns=self._config["dataframe"]["columns"]["bias"],
+ weights=scale_sign,
+ )
+ if not suppress_output:
+ logger.debug(
+ "Shifted energy column by bias column: "
+ f"{self._config['dataframe']['columns']['bias']}.",
+ )
+ else:
+ logger.warning(
+ "Sample bias data not found or provided. Calibrated energy might be incorrect.",
+ )
+
metadata = self.gather_calibration_metadata(calibration)
return df, metadata
-[docs] def append_tof_ns_axis(
+
+
+[docs]
+ def append_tof_ns_axis(
self,
- df: Union[pd.DataFrame, dask.dataframe.DataFrame],
+ df: pd.DataFrame | dask.dataframe.DataFrame,
tof_column: str = None,
tof_ns_column: str = None,
**kwds,
- ) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]:
+ ) -> tuple[pd.DataFrame | dask.dataframe.DataFrame, dict]:
"""Converts the time-of-flight time from steps to time in ns.
Args:
- df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to convert.
+ df (pd.DataFrame | dask.dataframe.DataFrame): Dataframe to convert.
tof_column (str, optional): Name of the column containing the
- time-of-flight steps. Defaults to config["dataframe"]["tof_column"].
+ time-of-flight steps. Defaults to config["dataframe"]["columns"]["tof"].
tof_ns_column (str, optional): Name of the column to store the
- time-of-flight in nanoseconds. Defaults to config["dataframe"]["tof_ns_column"].
+ time-of-flight in nanoseconds. Defaults to config["dataframe"]["columns"]["tof_ns"].
binwidth (float, optional): Time-of-flight binwidth in ns.
Defaults to config["energy"]["tof_binwidth"].
binning (int, optional): Time-of-flight binning factor.
Defaults to config["energy"]["tof_binning"].
+ **kwds:
+
+ - **binwidth**: Binwidth in ns.
+ - **binning**: Binning factor of the TOF column.
Returns:
- dask.dataframe.DataFrame: Dataframe with the new columns.
- dict: Metadata dictionary.
+ tuple[pd.DataFrame | dask.dataframe.DataFrame, dict]: Dataframe with the new columns
+ and Metadata dictionary.
"""
binwidth = kwds.pop("binwidth", self.binwidth)
binning = kwds.pop("binning", self.binning)
+
+ if len(kwds) > 0:
+ raise TypeError(f"append_tof_ns_axis() got unexpected keyword arguments {kwds.keys()}.")
+
if tof_column is None:
if self.corrected_tof_column in df.columns:
tof_column = self.corrected_tof_column
@@ -1346,14 +1437,17 @@ Source code for sed.calibrator.energy
binning,
df[tof_column].astype("float64"),
)
- metadata: Dict[str, Any] = {
+ metadata: dict[str, Any] = {
"applied": True,
"binwidth": binwidth,
"binning": binning,
}
return df, metadata
-[docs] def gather_calibration_metadata(self, calibration: dict = None) -> dict:
+
+
+[docs]
+ def gather_calibration_metadata(self, calibration: dict = None) -> dict:
"""Collects metadata from the energy calibration
Args:
@@ -1365,7 +1459,7 @@ Source code for sed.calibrator.energy
"""
if calibration is None:
calibration = self.calibration
- metadata: Dict[Any, Any] = {}
+ metadata: dict[Any, Any] = {}
metadata["applied"] = True
metadata["calibration"] = deepcopy(calibration)
metadata["tof"] = deepcopy(self.tof)
@@ -1377,12 +1471,15 @@ Source code for sed.calibrator.energy
return metadata
-[docs] def adjust_energy_correction(
+
+
+[docs]
+ def adjust_energy_correction(
self,
image: xr.DataArray,
correction_type: str = None,
amplitude: float = None,
- center: Tuple[float, float] = None,
+ center: tuple[float, float] = None,
correction: dict = None,
apply: bool = False,
**kwds,
@@ -1402,7 +1499,7 @@ Source code for sed.calibrator.energy
Defaults to config["energy"]["correction_type"].
amplitude (float, optional): Amplitude of the time-of-flight correction
term. Defaults to config["energy"]["correction"]["correction_type"].
- center (Tuple[float, float], optional): Center (x/y) coordinates for the
+ center (tuple[float, float], optional): Center (x/y) coordinates for the
correction. Defaults to config["energy"]["correction"]["center"].
correction (dict, optional): Correction dict. Defaults to the config values
and is updated from provided and adjusted parameters.
@@ -1536,7 +1633,7 @@ Source code for sed.calibrator.energy
step=1,
)
- def update(amplitude, x_center, y_center, **kwds):
+ def update(amplitude, x_center, y_center, **kwds):
nonlocal correction
correction["amplitude"] = amplitude
correction["center"] = (x_center, y_center)
@@ -1553,18 +1650,18 @@ Source code for sed.calibrator.energy
)
trace1.set_ydata(correction_x)
- line1.set_xdata(x=x_center)
+ line1.set_xdata([x_center])
trace2.set_ydata(correction_y)
- line2.set_xdata(x=y_center)
+ line2.set_xdata([y_center])
fig.canvas.draw_idle()
- def common_apply_func(apply: bool): # noqa: ARG001
+ def common_apply_func(apply: bool): # noqa: ARG001
self.correction = {}
self.correction["amplitude"] = correction["amplitude"]
self.correction["center"] = correction["center"]
self.correction["correction_type"] = correction["correction_type"]
- self.correction["creation_date"] = datetime.now().timestamp()
+ self.correction["creation_date"] = datetime.now()
amplitude_slider.close()
x_center_slider.close()
y_center_slider.close()
@@ -1577,7 +1674,7 @@ Source code for sed.calibrator.energy
raise ValueError(
"Parameter 'diameter' required for correction type 'spherical', ",
"but not present!",
- ) from exc
+ ) from exc
diameter_slider = ipw.FloatSlider(
value=correction["diameter"],
@@ -1594,7 +1691,7 @@ Source code for sed.calibrator.energy
diameter=diameter_slider,
)
- def apply_func(apply: bool):
+ def apply_func(apply: bool):
common_apply_func(apply)
self.correction["diameter"] = correction["diameter"]
diameter_slider.close()
@@ -1605,7 +1702,7 @@ Source code for sed.calibrator.energy
except KeyError as exc:
raise ValueError(
"Parameter 'gamma' required for correction type 'Lorentzian', but not present!",
- ) from exc
+ ) from exc
gamma_slider = ipw.FloatSlider(
value=correction["gamma"],
@@ -1622,7 +1719,7 @@ Source code for sed.calibrator.energy
gamma=gamma_slider,
)
- def apply_func(apply: bool):
+ def apply_func(apply: bool):
common_apply_func(apply)
self.correction["gamma"] = correction["gamma"]
gamma_slider.close()
@@ -1633,7 +1730,7 @@ Source code for sed.calibrator.energy
except KeyError as exc:
raise ValueError(
"Parameter 'sigma' required for correction type 'Gaussian', but not present!",
- ) from exc
+ ) from exc
sigma_slider = ipw.FloatSlider(
value=correction["sigma"],
@@ -1650,7 +1747,7 @@ Source code for sed.calibrator.energy
sigma=sigma_slider,
)
- def apply_func(apply: bool):
+ def apply_func(apply: bool):
common_apply_func(apply)
self.correction["sigma"] = correction["sigma"]
sigma_slider.close()
@@ -1673,7 +1770,7 @@ Source code for sed.calibrator.energy
raise ValueError(
"Parameter 'gamma' required for correction type 'Lorentzian_asymmetric', ",
"but not present!",
- ) from exc
+ ) from exc
gamma_slider = ipw.FloatSlider(
value=correction["gamma"],
@@ -1706,7 +1803,7 @@ Source code for sed.calibrator.energy
gamma2=gamma2_slider,
)
- def apply_func(apply: bool):
+ def apply_func(apply: bool):
common_apply_func(apply)
self.correction["gamma"] = correction["gamma"]
self.correction["amplitude2"] = correction["amplitude2"]
@@ -1726,26 +1823,29 @@ Source code for sed.calibrator.energy
if apply:
apply_func(True)
-[docs] def apply_energy_correction(
+
+
+[docs]
+ def apply_energy_correction(
self,
- df: Union[pd.DataFrame, dask.dataframe.DataFrame],
+ df: pd.DataFrame | dask.dataframe.DataFrame,
tof_column: str = None,
new_tof_column: str = None,
correction_type: str = None,
amplitude: float = None,
correction: dict = None,
- verbose: bool = True,
+ suppress_output: bool = False,
**kwds,
- ) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]:
+ ) -> tuple[pd.DataFrame | dask.dataframe.DataFrame, dict]:
"""Apply correction to the time-of-flight (TOF) axis of single-event data.
Args:
- df (Union[pd.DataFrame, dask.dataframe.DataFrame]): The dataframe where
+ df (pd.DataFrame | dask.dataframe.DataFrame): The dataframe where
to apply the energy correction to.
tof_column (str, optional): Name of the source column to convert.
- Defaults to config["dataframe"]["tof_column"].
+ Defaults to config["dataframe"]["columns"]["tof"].
new_tof_column (str, optional): Name of the destination column to convert.
- Defaults to config["dataframe"]["corrected_tof_column"].
+ Defaults to config["dataframe"]["columns"]["corrected_tof"].
correction_type (str, optional): Type of correction to apply to the TOF
axis. Valid values are:
@@ -1760,8 +1860,8 @@ Source code for sed.calibrator.energy
correction (dict, optional): Correction dictionary containing parameters
for the correction. Defaults to self.correction or
config["energy"]["correction"].
- verbose (bool, optional): Option to print out diagnostic information.
- Defaults to True.
+ suppress_output (bool, optional): Option to suppress output of diagnostic information.
+ Defaults to False.
**kwds: Additional parameters to use for the correction:
- **x_column** (str): Name of the x column.
@@ -1777,7 +1877,7 @@ Source code for sed.calibrator.energy
asymmetric 2D Lorentz profile, X-direction.
Returns:
- Union[pd.DataFrame, dask.dataframe.DataFrame]: dataframe with added column
+ tuple[pd.DataFrame | dask.dataframe.DataFrame, dict]: dataframe with added column
and Energy correction metadata dictionary.
"""
if correction is None:
@@ -1802,18 +1902,19 @@ Source code for sed.calibrator.energy
for key, value in kwds.items():
correction[key] = value
- correction["creation_date"] = datetime.now().timestamp()
+ correction["creation_date"] = datetime.now()
- elif "creation_date" in correction and verbose:
- datestring = datetime.fromtimestamp(correction["creation_date"]).strftime(
- "%m/%d/%Y, %H:%M:%S",
- )
- print(f"Using energy correction parameters generated on {datestring}")
+ elif "creation_date" in correction and not suppress_output:
+ datestring = correction["creation_date"].strftime("%m/%d/%Y, %H:%M:%S")
+ logger.info(f"Using energy correction parameters generated on {datestring}")
missing_keys = {"correction_type", "center", "amplitude"} - set(correction.keys())
if missing_keys:
raise ValueError(f"Required correction parameters '{missing_keys}' missing!")
+ if not suppress_output:
+ logger.debug(f"Correction parameters:\n{correction}")
+
df[new_tof_column] = df[tof_column] + correction_function(
x=df[x_column],
y=df[y_column],
@@ -1823,7 +1924,10 @@ Source code for sed.calibrator.energy
return df, metadata
-[docs] def gather_correction_metadata(self, correction: dict = None) -> dict:
+
+
+[docs]
+ def gather_correction_metadata(self, correction: dict = None) -> dict:
"""Collect meta data for energy correction
Args:
@@ -1835,33 +1939,36 @@ Source code for sed.calibrator.energy
"""
if correction is None:
correction = self.correction
- metadata: Dict[Any, Any] = {}
+ metadata: dict[Any, Any] = {}
metadata["applied"] = True
metadata["correction"] = deepcopy(correction)
return metadata
-[docs] def align_dld_sectors(
+
+
+[docs]
+ def align_dld_sectors(
self,
df: dask.dataframe.DataFrame,
tof_column: str = None,
sector_id_column: str = None,
sector_delays: np.ndarray = None,
- ) -> Tuple[dask.dataframe.DataFrame, dict]:
+ ) -> tuple[dask.dataframe.DataFrame, dict]:
"""Aligns the time-of-flight axis of the different sections of a detector.
Args:
- df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use.
+ df (dask.dataframe.DataFrame): Dataframe to use.
tof_column (str, optional): Name of the column containing the time-of-flight values.
- Defaults to config["dataframe"]["tof_column"].
+ Defaults to config["dataframe"]["columns"]["tof"].
sector_id_column (str, optional): Name of the column containing the sector id values.
- Defaults to config["dataframe"]["sector_id_column"].
+ Defaults to config["dataframe"]["columns"]["sector_id"].
sector_delays (np.ndarray, optional): Array containing the sector delays. Defaults to
config["dataframe"]["sector_delays"].
Returns:
- dask.dataframe.DataFrame: Dataframe with the new columns.
- dict: Metadata dictionary.
+ tuple[dask.dataframe.DataFrame, dict]: Dataframe with the new columns and Metadata
+ dictionary.
"""
if sector_delays is None:
sector_delays = self.sector_delays
@@ -1878,29 +1985,32 @@ Source code for sed.calibrator.energy
# align the 8s sectors
sector_delays_arr = dask.array.from_array(sector_delays)
- def align_sector(x):
+ def align_sector(x):
val = x[tof_column] - sector_delays_arr[x[sector_id_column].values.astype(int)]
return val.astype(np.float32)
df[tof_column] = df.map_partitions(align_sector, meta=(tof_column, np.float32))
- metadata: Dict[str, Any] = {
+ metadata: dict[str, Any] = {
"applied": True,
"sector_delays": sector_delays,
}
return df, metadata
-[docs] def add_offsets(
+
+
+[docs]
+ def add_offsets(
self,
- df: Union[pd.DataFrame, dask.dataframe.DataFrame] = None,
- offsets: Dict[str, Any] = None,
+ df: pd.DataFrame | dask.dataframe.DataFrame = None,
+ offsets: dict[str, Any] = None,
constant: float = None,
- columns: Union[str, Sequence[str]] = None,
- weights: Union[float, Sequence[float]] = None,
- preserve_mean: Union[bool, Sequence[bool]] = False,
- reductions: Union[str, Sequence[str]] = None,
+ columns: str | Sequence[str] = None,
+ weights: float | Sequence[float] = None,
+ preserve_mean: bool | Sequence[bool] = False,
+ reductions: str | Sequence[str] = None,
energy_column: str = None,
- verbose: bool = True,
- ) -> Tuple[Union[pd.DataFrame, dask.dataframe.DataFrame], dict]:
+ suppress_output: bool = False,
+ ) -> tuple[pd.DataFrame | dask.dataframe.DataFrame, dict]:
"""Apply an offset to the energy column by the values of the provided columns.
If no parameter is passed to this function, the offset is applied as defined in the
@@ -1908,25 +2018,26 @@ Source code for sed.calibrator.energy
and the offset is applied using the ``dfops.apply_offset_from_columns()`` function.
Args:
- df (Union[pd.DataFrame, dask.dataframe.DataFrame]): Dataframe to use.
+ df (pd.DataFrame | dask.dataframe.DataFrame): Dataframe to use.
offsets (Dict, optional): Dictionary of energy offset parameters.
constant (float, optional): The constant to shift the energy axis by.
- columns (Union[str, Sequence[str]]): Name of the column(s) to apply the shift from.
- weights (Union[float, Sequence[float]]): weights to apply to the columns.
+ columns (str | Sequence[str]): Name of the column(s) to apply the shift from.
+ weights (float | Sequence[float]): weights to apply to the columns.
Can also be used to flip the sign (e.g. -1). Defaults to 1.
- preserve_mean (bool): Whether to subtract the mean of the column before applying the
- shift. Defaults to False.
- reductions (str): The reduction to apply to the column. Should be an available method
- of dask.dataframe.Series. For example "mean". In this case the function is applied
- to the column to generate a single value for the whole dataset. If None, the shift
- is applied per-dataframe-row. Defaults to None. Currently only "mean" is supported.
+ preserve_mean (bool | Sequence[bool]): Whether to subtract the mean of the column
+ before applying the shift. Defaults to False.
+ reductions (str | Sequence[str]): The reduction to apply to the column. Should be an
+ available method of dask.dataframe.Series. For example "mean". In this case the
+ function is applied to the column to generate a single value for the whole dataset.
+ If None, the shift is applied per-dataframe-row. Defaults to None. Currently only
+ "mean" is supported.
energy_column (str, optional): Name of the column containing the energy values.
- verbose (bool, optional): Option to print out diagnostic information.
- Defaults to True.
+ suppress_output (bool, optional): Option to suppress output of diagnostic information.
+ Defaults to False.
Returns:
- dask.dataframe.DataFrame: Dataframe with the new columns.
- dict: Metadata dictionary.
+ tuple[pd.DataFrame | dask.dataframe.DataFrame, dict]: Dataframe with the new columns
+ and Metadata dictionary.
"""
if offsets is None:
offsets = deepcopy(self.offsets)
@@ -1934,7 +2045,7 @@ Source code for sed.calibrator.energy
if energy_column is None:
energy_column = self.energy_column
- metadata: Dict[str, Any] = {
+ metadata: dict[str, Any] = {
"applied": True,
}
@@ -1950,9 +2061,13 @@ Source code for sed.calibrator.energy
# pylint:disable=duplicate-code
# use passed parameters, overwrite config
offsets = {}
- offsets["creation_date"] = datetime.now().timestamp()
+ offsets["creation_date"] = datetime.now()
# column-based offsets
if columns is not None:
+ offsets["columns"] = {}
+ if isinstance(columns, str):
+ columns = [columns]
+
if weights is None:
weights = 1
if isinstance(weights, (int, float, np.integer, np.floating)):
@@ -1964,10 +2079,13 @@ Source code for sed.calibrator.energy
if not all(isinstance(s, (int, float, np.integer, np.floating)) for s in weights):
raise TypeError(f"Invalid type for weights: {type(weights)}")
- if isinstance(columns, str):
- columns = [columns]
- if isinstance(preserve_mean, bool):
- preserve_mean = [preserve_mean] * len(columns)
+ if preserve_mean is None:
+ preserve_mean = False
+ if not isinstance(preserve_mean, Sequence):
+ preserve_mean = [preserve_mean]
+ if len(preserve_mean) == 1:
+ preserve_mean = [preserve_mean[0]] * len(columns)
+
if not isinstance(reductions, Sequence):
reductions = [reductions]
if len(reductions) == 1:
@@ -1975,7 +2093,7 @@ Source code for sed.calibrator.energy
# store in offsets dictionary
for col, weight, pmean, red in zip(columns, weights, preserve_mean, reductions):
- offsets[col] = {
+ offsets["columns"][col] = {
"weight": weight,
"preserve_mean": pmean,
"reduction": red,
@@ -1987,11 +2105,9 @@ Source code for sed.calibrator.energy
elif constant is not None:
raise TypeError(f"Invalid type for constant: {type(constant)}")
- elif "creation_date" in offsets and verbose:
- datestring = datetime.fromtimestamp(offsets["creation_date"]).strftime(
- "%m/%d/%Y, %H:%M:%S",
- )
- print(f"Using energy offset parameters generated on {datestring}")
+ elif "creation_date" in offsets and not suppress_output:
+ datestring = offsets["creation_date"].strftime("%m/%d/%Y, %H:%M:%S")
+ logger.info(f"Using energy offset parameters generated on {datestring}")
if len(offsets) > 0:
# unpack dictionary
@@ -2000,43 +2116,39 @@ Source code for sed.calibrator.energy
weights = []
preserve_mean = []
reductions = []
- if verbose:
- print("Energy offset parameters:")
+ log_str = "Energy offset parameters:"
for k, v in offsets.items():
if k == "creation_date":
continue
- if k == "constant":
+ elif k == "constant":
# flip sign if binding energy scale
constant = v * scale_sign
- if verbose:
- print(f" Constant: {constant} ")
- else:
- columns.append(k)
- try:
- weight = v["weight"]
- except KeyError:
- weight = 1
- if not isinstance(weight, (int, float, np.integer, np.floating)):
- raise TypeError(f"Invalid type for weight of column {k}: {type(weight)}")
- # flip sign if binding energy scale
- weight = weight * scale_sign
- weights.append(weight)
- pm = v.get("preserve_mean", False)
- if str(pm).lower() in ["false", "0", "no"]:
- pm = False
- elif str(pm).lower() in ["true", "1", "yes"]:
- pm = True
- preserve_mean.append(pm)
- red = v.get("reduction", None)
- if str(red).lower() in ["none", "null"]:
- red = None
- reductions.append(red)
- if verbose:
- print(
- f" Column[{k}]: Weight={weight}, Preserve Mean: {pm}, ",
- f"Reductions: {red}.",
+ log_str += f"\n Constant: {constant}"
+ elif k == "columns":
+ for column_name, column_dict in offsets["columns"].items():
+ columns.append(column_name)
+ weight = column_dict.get("weight", 1)
+ if not isinstance(weight, (int, float, np.integer, np.floating)):
+ raise TypeError(
+ f"Invalid type for weight of column {column_name}: {type(weight)}",
+ )
+ # flip sign if binding energy scale
+ weight = weight * scale_sign
+ weights.append(weight)
+ pm = column_dict.get("preserve_mean", False)
+ preserve_mean.append(pm)
+ red = column_dict.get("reduction", None)
+ if str(red).lower() in ["none", "null"]:
+ red = None
+ reductions.append(red)
+ log_str += (
+ f"\n Column[{column_name}]: Weight={weight}, Preserve Mean: {pm}, "
+ f"Reductions: {red}."
)
+ if not suppress_output:
+ logger.info(log_str)
+
if len(columns) > 0:
df = dfops.offset_by_other_columns(
df=df,
@@ -2051,28 +2163,29 @@ Source code for sed.calibrator.energy
if constant:
if not isinstance(constant, (int, float, np.integer, np.floating)):
raise TypeError(f"Invalid type for constant: {type(constant)}")
- df[energy_column] = df.map_partitions(
- lambda x: x[energy_column] + constant,
- meta=(energy_column, np.float64),
- )
+ df[energy_column] = df[energy_column] + constant
self.offsets = offsets
metadata["offsets"] = offsets
- return df, metadata
+ return df, metadata
+
-[docs]def extract_bias(files: List[str], bias_key: str) -> np.ndarray:
+
+
+[docs]
+def extract_bias(files: list[str], bias_key: str) -> np.ndarray:
"""Read bias values from hdf5 files
Args:
- files (List[str]): List of filenames
+ files (list[str]): List of filenames
bias_key (str): hdf5 path to the bias value
Returns:
np.ndarray: Array of bias values.
"""
- bias_list: List[float] = []
+ bias_list: list[float] = []
for file in files:
with h5py.File(file, "r") as file_handle:
if bias_key[0] == "@":
@@ -2083,22 +2196,25 @@ Source code for sed.calibrator.energy
return np.asarray(bias_list)
-[docs]def correction_function(
- x: Union[float, np.ndarray],
- y: Union[float, np.ndarray],
+
+
+[docs]
+def correction_function(
+ x: float | np.ndarray,
+ y: float | np.ndarray,
correction_type: str,
- center: Tuple[float, float],
+ center: tuple[float, float],
amplitude: float,
**kwds,
-) -> Union[float, np.ndarray]:
+) -> float | np.ndarray:
"""Calculate the TOF correction based on the given X/Y coordinates and a model.
Args:
- x (float): x coordinate
- y (float): y coordinate
+ x (float | np.ndarray): x coordinate
+ y (float | np.ndarray): y coordinate
correction_type (str): type of correction. One of
"spherical", "Lorentzian", "Gaussian", or "Lorentzian_asymmetric"
- center (Tuple[int, int]): center position of the distribution (x,y)
+ center (tuple[int, int]): center position of the distribution (x,y)
amplitude (float): Amplitude of the correction
**kwds: Keyword arguments:
@@ -2113,7 +2229,7 @@ Source code for sed.calibrator.energy
asymmetric 2D Lorentz profile, X-direction.
Returns:
- float: calculated correction value
+ float | np.ndarray: calculated correction value
"""
if correction_type == "spherical":
try:
@@ -2122,7 +2238,7 @@ Source code for sed.calibrator.energy
raise ValueError(
f"Parameter 'diameter' required for correction type '{correction_type}' "
"but not provided!",
- ) from exc
+ ) from exc
correction = -(
(
1
@@ -2141,7 +2257,7 @@ Source code for sed.calibrator.energy
raise ValueError(
f"Parameter 'gamma' required for correction type '{correction_type}' "
"but not provided!",
- ) from exc
+ ) from exc
correction = (
100000
* amplitude
@@ -2156,7 +2272,7 @@ Source code for sed.calibrator.energy
raise ValueError(
f"Parameter 'sigma' required for correction type '{correction_type}' "
"but not provided!",
- ) from exc
+ ) from exc
correction = (
20000
* amplitude
@@ -2176,7 +2292,7 @@ Source code for sed.calibrator.energy
raise ValueError(
f"Parameter 'gamma' required for correction type '{correction_type}' "
"but not provided!",
- ) from exc
+ ) from exc
gamma2 = kwds.pop("gamma2", gamma)
amplitude2 = kwds.pop("amplitude2", amplitude)
correction = (
@@ -2198,7 +2314,10 @@ Source code for sed.calibrator.energy
return correction
-[docs]def normspec(
+
+
+[docs]
+def normspec(
specs: np.ndarray,
smooth: bool = False,
span: int = 7,
@@ -2239,7 +2358,10 @@ Source code for sed.calibrator.energy
return normalized_specs
-[docs]def find_correspondence(
+
+
+[docs]
+def find_correspondence(
sig_still: np.ndarray,
sig_mov: np.ndarray,
**kwds,
@@ -2251,6 +2373,8 @@ Source code for sed.calibrator.energy
sig_still (np.ndarray): Reference 1D signals.
sig_mov (np.ndarray): 1D signal to be aligned.
**kwds: keyword arguments for ``fastdtw.fastdtw()``
+ - *dist_metric*: Distance metric to use for time warping. Defaults to None.
+ - *radius*: Radius to use for time warping. Defaults to 1.
Returns:
np.ndarray: Pixel-wise path correspondences between two input 1D arrays
@@ -2258,27 +2382,34 @@ Source code for sed.calibrator.energy
"""
dist = kwds.pop("dist_metric", None)
rad = kwds.pop("radius", 1)
+
+ if len(kwds) > 0:
+ raise TypeError(f"find_correspondence() got unexpected keyword arguments {kwds.keys()}.")
+
_, pathcorr = fastdtw(sig_still, sig_mov, dist=dist, radius=rad)
return np.asarray(pathcorr)
-[docs]def range_convert(
+
+
+[docs]
+def range_convert(
x: np.ndarray,
- xrng: Tuple,
+ xrng: tuple,
pathcorr: np.ndarray,
-) -> Tuple:
+) -> tuple:
"""Convert value range using a pairwise path correspondence (e.g. obtained
from time warping algorithm).
Args:
x (np.ndarray): Values of the x axis (e.g. time-of-flight values).
- xrng (Tuple): Boundary value range on the x axis.
+ xrng (tuple): Boundary value range on the x axis.
pathcorr (np.ndarray): Path correspondence between two 1D arrays in the
following form,
[(id_1_trace_1, id_1_trace_2), (id_2_trace_1, id_2_trace_2), ...]
Returns:
- Tuple: Transformed range according to the path correspondence.
+ tuple: Transformed range according to the path correspondence.
"""
pathcorr = np.asarray(pathcorr)
xrange_trans = []
@@ -2292,7 +2423,10 @@ Source code for sed.calibrator.energy
return tuple(xrange_trans)
-