Skip to content

Commit

Permalink
Merge pull request #374 from OpenCOMPES/parallel_debug_fix
Browse files Browse the repository at this point in the history
Fix debugging for parallelized dfield generation
  • Loading branch information
rettigl authored Apr 4, 2024
2 parents 5d9724c + 17242e4 commit 16dae43
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 66 deletions.
2 changes: 1 addition & 1 deletion benchmarks/benchmark_targets.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
binning_1d: 3.1223518816799785
binning_4d: 9.514051519199997
inv_dfield: 7.265958606239991
inv_dfield: 5.934024921119999
workflow_1d: 18.886161206160004
workflow_4d: 22.608196924320012
74 changes: 9 additions & 65 deletions sed/calibrator/momentum.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import itertools as it
from copy import deepcopy
from datetime import datetime
from multiprocessing import Pool
from typing import Any
from typing import Dict
from typing import List
Expand All @@ -19,13 +18,14 @@
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import psutil
import scipy.ndimage as ndi
import xarray as xr
from bokeh.colors import RGB
from bokeh.io import output_notebook
from bokeh.palettes import Category10 as ColorCycle
from IPython.display import display
from joblib import delayed
from joblib import Parallel
from matplotlib import cm
from numpy.linalg import norm
from scipy.interpolate import griddata
Expand All @@ -34,8 +34,6 @@
from symmetrize import sym
from symmetrize import tps

N_CPU = psutil.cpu_count()


class MomentumCorrector:
"""
Expand Down Expand Up @@ -72,10 +70,6 @@ def __init__(

self._config = config

self.num_cores = self._config.get("binning", {}).get("num_cores", N_CPU - 1)
if self.num_cores >= N_CPU:
self.num_cores = N_CPU - 1

self.image: np.ndarray = None
self.img_ndim: int = None
self.slice: np.ndarray = None
Expand Down Expand Up @@ -1259,7 +1253,6 @@ def calc_inverse_dfield(self):
self.cdeform_field,
self.bin_ranges,
self.detector_ranges,
self.num_cores,
)

return self.inverse_dfield
Expand Down Expand Up @@ -1749,7 +1742,6 @@ def apply_corrections(
self.cdeform_field,
self.bin_ranges,
self.detector_ranges,
self.num_cores,
)
self.dfield_updated = False

Expand Down Expand Up @@ -2085,7 +2077,6 @@ def generate_inverse_dfield(
cdeform_field: np.ndarray,
bin_ranges: List[Tuple],
detector_ranges: List[Tuple],
num_cores: int,
) -> np.ndarray:
"""Generate inverse deformation field using inperpolation with griddata.
Assuming the binning range of the input ``rdeform_field`` and ``cdeform_field``
Expand All @@ -2096,7 +2087,6 @@ def generate_inverse_dfield(
cdeform_field (np.ndarray): Column-wise deformation field.
bin_ranges (List[Tuple]): Detector ranges of the binned coordinates.
detector_ranges (List[Tuple]): Ranges of detector coordinates to interpolate to.
num_cores (int): number of cores to use for parallelization.
Returns:
np.ndarray: The calculated inverse deformation field (row/column)
Expand Down Expand Up @@ -2129,49 +2119,7 @@ def generate_inverse_dfield(
rc_position = [] # row/column position in c/rdeform_field
r_dest = [] # destination pixel row position
c_dest = [] # destination pixel column position
compute_i0 = [(cdeform_field.shape[0] * i) // num_cores for i in np.arange(0, num_cores)]
compute_i1 = [(cdeform_field.shape[0] * i) // num_cores for i in np.arange(1, num_cores + 1)]
data = [
(rdeform_field, cdeform_field, bin_ranges, bin_step, i0, i1)
for (i0, i1) in zip(compute_i0, compute_i1)
]
with Pool(num_cores) as p:
ret = p.map(generate_lists, data)

for pos, rd, cd in ret:
rc_position += pos
r_dest += rd
c_dest += cd

with Pool(2) as p:
ret = p.map(
griddata_,
[
(np.asarray(rc_position), np.asarray(r_dest), (r_mesh, c_mesh)),
(np.asarray(rc_position), np.asarray(c_dest), (r_mesh, c_mesh)),
],
)

inverse_dfield = np.asarray([ret[0], ret[1]])

return inverse_dfield


def generate_lists(args):
"""Function for paralellizing code with multiprocessing.Pool.map
Args:
args: argument tuple containing (rdeform_field, cdeform_field, bin_ranges, bin_step, i0, i1)
Returns:
return tuple of lists (rc_position, r_dest, c_dest)
"""
(rdeform_field, cdeform_field, bin_ranges, bin_step, i0, i1) = args
rc_position = [] # row/column position in c/rdeform_field
r_dest = [] # destination pixel row position
c_dest = [] # destination pixel column position

for i in np.arange(i0, i1):
for i in np.arange(cdeform_field.shape[0]):
for j in np.arange(cdeform_field.shape[1]):
if not np.isnan(rdeform_field[i, j]) and not np.isnan(
cdeform_field[i, j],
Expand All @@ -2188,19 +2136,15 @@ def generate_lists(args):
c_dest.append(
bin_step[1] * j + bin_ranges[1][0],
)
return (rc_position, r_dest, c_dest)


def griddata_(args):
"""Wrapper for griddata to use with multiprocessing.Pool.map
ret = Parallel(n_jobs=2)(
delayed(griddata)(np.asarray(rc_position), np.asarray(arg), (r_mesh, c_mesh))
for arg in [r_dest, c_dest]
)

Args:
args: argument tuple to griddata
inverse_dfield = np.asarray([ret[0], ret[1]])

Returns:
return value of griddata
"""
return griddata(*args)
return inverse_dfield


def load_dfield(file: str) -> Tuple[np.ndarray, np.ndarray]:
Expand Down

0 comments on commit 16dae43

Please sign in to comment.