Skip to content

Storing the distance matrix in the shared memory in conditioned GMFs calculations #10504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
[Anirudh Rao]
* Sort observed imtls and imts and ensure all of the target imts go
[Michele Simionato]
* Optimized memory consumption in conditioned GMFs calculations

[Anirudh Rao]
* Sort observed imtls and imts and ensure all of the target imts go
into the context maker for the target sites in the conditioned gmfs
calculator - helps in debugging mode to prevent accidental
calculator - helps in debugging mode to prevent accidental
reordering of the imts

[Michele Simionato]
Expand Down
98 changes: 64 additions & 34 deletions openquake/hazardlib/calc/conditioned_gmfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@
"""

import logging
from functools import partial
from dataclasses import dataclass

import psutil
import numpy
from openquake.baselib import parallel
from openquake.hazardlib import correlation, cross_correlation
Expand Down Expand Up @@ -282,13 +282,15 @@ def _create_result(g, m, target_imt, observed_imts, station_data_filtered):
f"The station data contains {num_null_values}"
f" null values for {target_imt.string}."
" Please fill or discard these rows.")
t = TempResult(g, m, bracketed_imts, conditioning_imts, native_data_available)
t = TempResult(g, m, bracketed_imts, conditioning_imts,
native_data_available)
return t


def create_result(g, m, target_imt, target_imts, observed_imts,
station_data, sitecol, station_sitecol,
compute_cov, cross_correl_between):
spatial_correl, cross_correl_within,
cross_correl_between, DD):
"""
:returns: a TempResult
"""
Expand Down Expand Up @@ -335,8 +337,9 @@ def create_result(g, m, target_imt, target_imts, observed_imts,
t.zeta_D = yD - mu_yD
t.phi_D_diag = numpy.diag(phi_D.flatten())

cov_WD_WD = compute_cov(station_sitecol, station_sitecol,
t.conditioning_imts, t.conditioning_imts, t.phi_D_diag, t.phi_D_diag)
cov_WD_WD = compute_spatial_cross_covariance_matrix(
spatial_correl, cross_correl_within, DD,
t.conditioning_imts, t.conditioning_imts, t.phi_D_diag, t.phi_D_diag)

# Add on the additional variance of the residuals
# for the cases where the station data is uncertain
Expand Down Expand Up @@ -364,8 +367,28 @@ def create_result(g, m, target_imt, target_imts, observed_imts,
return t


def compute_distance_matrix(sites1, sites2):
"""
:param sites1: N1 sites
:param sites2: N2 sites
:returns:
a matrix of shape N1 x N2 of float32 distances (~37 GB for 100k sites)
"""
avail_gb = psutil.virtual_memory().available / 1024**3
req_gb = len(sites1) * len(sites2) * 8 / 1024**3
if req_gb > avail_gb:
raise MemoryError('The distance_matrix of shape (%d, %d) is too large!'
% (len(sites1), len(sites2)))
distance_matrix = geodetic_distance(
sites1.lons.reshape(sites1.lons.shape + (1,)),
sites1.lats.reshape(sites1.lats.shape + (1,)),
sites2.lons,
sites2.lats)
return distance_matrix.astype(F32)


def compute_spatial_cross_covariance_matrix(
spatial_correl, cross_correl_within, sites1, sites2,
spatial_correl, cross_correl_within, distance_matrix,
imts1, imts2, diag1, diag2):
# The correlation structure for IMs of differing types at differing
# locations can be reasonably assumed as Markovian in nature, and we
Expand All @@ -374,14 +397,9 @@ def compute_spatial_cross_covariance_matrix(
# at the same location and the spatial correlation due to the distance
# between sites m and n. Can be refactored down the line to support direct
# spatial cross-correlation models
distance_matrix = geodetic_distance(
sites1.lons.reshape(sites1.lons.shape + (1,)),
sites1.lats.reshape(sites1.lats.shape + (1,)),
sites2.lons,
sites2.lats)
rho = numpy.block([[
_compute_spatial_cross_correlation_matrix(
distance_matrix, imt_1, imt_2, spatial_correl, cross_correl_within)
imt_1, imt_2, spatial_correl, cross_correl_within, distance_matrix)
for imt_2 in imts2] for imt_1 in imts1])
return numpy.linalg.multi_dot([diag1, rho, diag2])

Expand All @@ -395,7 +413,8 @@ def compute_spatial_cross_covariance_matrix(
# NB: this is run in parallel
def get_mu_tau_phi(target_imt, gsim, mean_stds,
target_imts, observed_imts, station_data,
target_sitecol, station_sitecol, compute_cov, r, monitor):
target_sitecol, station_sitecol, spatial_correl,
cross_correl_within, r, monitor):
# Using Bayes rule, compute the posterior distribution of the
# normalized between-event residual H|YD=yD, employing
# Engler et al. (2022), eqns B8 and B9 (also B18 and B19),
Expand All @@ -417,9 +436,10 @@ def get_mu_tau_phi(target_imt, gsim, mean_stds,
nominal_bias_mean = numpy.mean(mu_BD_yD)
nominal_bias_stddev = numpy.sqrt(numpy.mean(numpy.diag(cov_BD_BD_yD)))

msg = ("GSIM: %s, IMT: %s, Nominal bias mean: %.3f, Nominal bias stddev: %.3f"
% (gsim.gmpe if hasattr(gsim, 'gmpe') else gsim,
target_imt, nominal_bias_mean, nominal_bias_stddev))
msg = (
"GSIM: %s, IMT: %s, Nominal bias mean: %.3f, Nominal bias stddev: %.3f"
% (gsim.gmpe if hasattr(gsim, 'gmpe') else gsim,
target_imt, nominal_bias_mean, nominal_bias_stddev))

# Predicted mean at the target sites, from GSIM
mu_Y = mean_stds[0, 0][:, None]
Expand All @@ -431,10 +451,15 @@ def get_mu_tau_phi(target_imt, gsim, mean_stds,
# Compute the within-event covariance matrices for the
# target sites and observation sites; the shapes are
# (nsites, nstations) and (nstations, nsites) respectively
cov_WY_WD = compute_cov(target_sitecol, station_sitecol,
[target_imt], r.conditioning_imts, phi_Y_diag, r.phi_D_diag)
cov_WD_WY = compute_cov(station_sitecol, target_sitecol,
r.conditioning_imts, [target_imt], r.phi_D_diag, phi_Y_diag)
with monitor.shared['YD'] as YD:
cov_WY_WD = compute_spatial_cross_covariance_matrix(
spatial_correl, cross_correl_within, YD,
[target_imt], r.conditioning_imts, phi_Y_diag, r.phi_D_diag)

with monitor.shared['DY'] as DY:
cov_WD_WY = compute_spatial_cross_covariance_matrix(
spatial_correl, cross_correl_within, DY,
r.conditioning_imts, [target_imt], r.phi_D_diag, phi_Y_diag)

# Compute the regression coefficient matrix [cov_WY_WD × cov_WD_WD_inv]
RC = cov_WY_WD @ r.cov_WD_WD_inv # shape (nsites, nstations)
Expand All @@ -450,8 +475,11 @@ def get_mu_tau_phi(target_imt, gsim, mean_stds,

# Compute the within-event covariance matrix for the
# target sites (apriori) (nsites, nsites)
cov_WY_WY = compute_cov(target_sitecol, target_sitecol,
[target_imt], [target_imt], phi_Y_diag, phi_Y_diag)

with monitor.shared['YY'] as YY:
cov_WY_WY = compute_spatial_cross_covariance_matrix(
spatial_correl, cross_correl_within, YY,
[target_imt], [target_imt], phi_Y_diag, phi_Y_diag)

# Both conditioned covariance matrices can contain extremely
# small negative values due to limitations of floating point
Expand All @@ -477,14 +505,19 @@ def get_mu_tau_phi(target_imt, gsim, mean_stds,

def get_me_ta_ph(cmaker, sdata, observed_imts, target_imts,
mean_stds_D, mean_stds_Y, target, station_filtered,
compute_cov, cross_correl_between, h5):
spatial_correl, cross_correl_within, cross_correl_between, h5):
G = len(cmaker.gsims)
M = len(target_imts)
N = mean_stds_Y.shape[-1]
me = numpy.zeros((G, M, N, 1))
ta = numpy.zeros((G, M, N, N))
ph = numpy.zeros((G, M, N, N))
smap = parallel.Starmap(get_mu_tau_phi, h5=h5)
smap.share(YY=compute_distance_matrix(target, target),
YD=compute_distance_matrix(target, station_filtered),
DY=compute_distance_matrix(station_filtered, target))
DD = compute_distance_matrix(station_filtered, station_filtered)

for g, gsim in enumerate(cmaker.gsims):
if gsim.DEFINED_FOR_STANDARD_DEVIATION_TYPES == {StdDev.TOTAL}:
if not (type(gsim).__name__ == "ModifiableGMPE"
Expand All @@ -504,10 +537,12 @@ def get_me_ta_ph(cmaker, sdata, observed_imts, target_imts,
result = create_result(
g, m, target_imt, target_imts, observed_imts,
sdata, target, station_filtered,
compute_cov, cross_correl_between)
spatial_correl, cross_correl_within, cross_correl_between,
DD)
smap.submit(
(target_imt, gsim, mean_stds_Y[:, g], target_imts, observed_imts,
sdata, target, station_filtered, compute_cov, result))
(target_imt, gsim, mean_stds_Y[:, g], target_imts,
observed_imts, sdata, target, station_filtered,
spatial_correl, cross_correl_within, result))
for (g, m), (mu, tau, phi, msg) in smap.reduce().items():
me[g, m] = mu
ta[g, m] = tau
Expand Down Expand Up @@ -555,13 +590,10 @@ def get_mean_covs(
numpy.isin(target_sitecol.sids, ctx_Y.sids))
mask = numpy.isin(station_sitecol.sids, ctx_D.sids)
station_filtered = station_sitecol.filter(mask)

compute_cov = partial(compute_spatial_cross_covariance_matrix,
spatial_correl, cross_correl_within)
me, ta, ph = get_me_ta_ph(
cmaker, station_data[mask].copy(), observed_imts, target_imts,
mean_stds_D, mean_stds_Y, target, station_filtered,
compute_cov, cross_correl_between, h5)
spatial_correl, cross_correl_within, cross_correl_between, h5)
if sigma:
return [me, ta + ph, ta, ph]
else:
Expand All @@ -570,12 +602,10 @@ def get_mean_covs(


def _compute_spatial_cross_correlation_matrix(
distance_matrix, imt_1, imt_2, spatial_correl, cross_correl_within):
imt_1, imt_2, spatial_correl, cross_correl_within, distance_matrix):
if imt_1 == imt_2:
# since we have a single IMT, there are no cross-correlation terms
spatial_correlation_matrix = spatial_correl._get_correlation_matrix(
distance_matrix, imt_1)
return spatial_correlation_matrix
return spatial_correl._get_correlation_matrix(distance_matrix, imt_1)
matrix1 = spatial_correl._get_correlation_matrix(distance_matrix, imt_1)
matrix2 = spatial_correl._get_correlation_matrix(distance_matrix, imt_2)
spatial_correlation_matrix = numpy.maximum(matrix1, matrix2)
Expand Down
Loading