Skip to content

Commit

Permalink
Merge pull request #9067 from gem/cache_count_ruptures
Browse files Browse the repository at this point in the history
Speeding up count_ruptures in event_based
  • Loading branch information
micheles committed Oct 4, 2023
2 parents 71713ae + 9bdf3ee commit e7280d5
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 122 deletions.
4 changes: 4 additions & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
[Michele Simionato]
* Speeding up `count_ruptures` for multifault sources, which gives a *huge*
in some calculations (like event_based for Dominican Republic)

[Marco Pagani]
* Fixed a bug in the code that resamples a line, with a small effect
on the hazard of models containing simple fault sources
Expand Down
12 changes: 5 additions & 7 deletions openquake/calculators/event_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,23 +396,21 @@ def build_events_from_sources(self):
Prefilter the composite source model and store the source_info
"""
oq = self.oqparam
logging.info('Counting the ruptures in the CompositeSourceModel')
with self.monitor('counting ruptures', measuremem=True):
self.csm.fix_src_offset() # NB: essential
sources = self.csm.get_sources()

# weighting the heavy sources
logging.info('Counting the ruptures in the CompositeSourceModel')
self.datastore.swmr_on()
nrups = parallel.Starmap(
count_ruptures, [(src,) for src in sources if src.code in b'AMC'],
nrups = parallel.Starmap( # weighting the heavy sources
count_ruptures, [(src,) for src in sources if src.code != b'P'],
progress=logging.debug
).reduce()
for src in sources:
try:
src.num_ruptures = nrups[src.source_id]
except KeyError: # light source
except KeyError: # point source
src.num_ruptures = src.count_ruptures()
src.weight = src.num_ruptures
self.csm.fix_src_offset() # NB: essential
maxweight = sum(sg.weight for sg in self.csm.src_groups) / (
self.oqparam.concurrent_tasks or 1)
eff_ruptures = AccumDict(accum=0) # grp_id => potential ruptures
Expand Down
2 changes: 2 additions & 0 deletions openquake/hazardlib/source/area.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def count_ruptures(self):
:meth:`openquake.hazardlib.source.base.BaseSeismicSource.count_ruptures`
for description of parameters and return value.
"""
if self.num_ruptures:
return self.num_ruptures
polygon_mesh = self.polygon.discretize(self.area_discretization)
return (len(polygon_mesh) *
len(self.get_annual_occurrence_rates()) *
Expand Down
5 changes: 3 additions & 2 deletions openquake/hazardlib/source/complex_fault.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ def count_ruptures(self):
See :meth:
`openquake.hazardlib.source.base.BaseSeismicSource.count_ruptures`.
"""
if self.num_ruptures:
return self.num_ruptures
whole_fault_surface = ComplexFaultSurface.from_fault_data(
self.edges, self.rupture_mesh_spacing)
whole_fault_mesh = whole_fault_surface.mesh
Expand Down Expand Up @@ -248,8 +250,7 @@ def __iter__(self):
if len(mag_rates) == 1: # not splittable
yield self
return
if not hasattr(self, '_nr'):
self.count_ruptures()
self.count_ruptures()
for i, (mag, rate) in enumerate(mag_rates):
src = copy.copy(self)
src.mfd = mfd.ArbitraryMFD([mag], [rate])
Expand Down
193 changes: 82 additions & 111 deletions openquake/hazardlib/source/kite_fault.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
:class:`KiteFaultSource`.
"""
import copy
import collections
import numpy as np
from typing import Tuple
from openquake.hazardlib import mfd
Expand All @@ -31,11 +32,58 @@
as ppr


def _get_meshes(omsh, rup_s, rup_d, f_strike, f_dip):
meshes = []

# When f_strike is negative, the floating distance is interpreted as
# a fraction of the rupture length (i.e. a multiple of the sampling
# distance)
if f_strike < 0:
f_strike = int(np.floor(rup_s * abs(f_strike) + 1e-5))
if f_strike < 1:
f_strike = 1

# See f_strike comment above
if f_dip < 0:
f_dip = int(np.floor(rup_d * abs(f_dip) + 1e-5))
if f_dip < 1:
f_dip = 1

# Float the rupture on the mesh describing the surface of the fault
mesh_x_len = omsh.lons.shape[1] - rup_s + 1
mesh_y_len = omsh.lons.shape[0] - rup_d + 1
x_nodes = np.arange(0, mesh_x_len, f_strike)
y_nodes = np.arange(0, mesh_y_len, f_dip)

while (len(x_nodes) > 0 and f_strike > 1
and x_nodes[-1] != omsh.lons.shape[1] - rup_s):
f_strike -= 1
x_nodes = np.arange(0, mesh_x_len, f_strike)

while (len(y_nodes) > 0 and f_dip > 1
and y_nodes[-1] != omsh.lons.shape[0] - rup_d):
f_dip -= 1
y_nodes = np.arange(0, mesh_y_len, f_dip)

for i in x_nodes:
for j in y_nodes:
nel = np.size(omsh.lons[j:j + rup_d, i:i + rup_s])
nna = np.sum(np.isfinite(omsh.lons[j:j + rup_d, i:i + rup_s]))
prc = nna / nel * 100.

# Yield only the ruptures that do not contain NaN
if prc > 99.99 and nna >= 4:
msh = Mesh(omsh.lons[j:j + rup_d, i:i + rup_s],
omsh.lats[j:j + rup_d, i:i + rup_s],
omsh.depths[j:j + rup_d, i:i + rup_s])
meshes.append(msh)
return meshes


class KiteFaultSource(ParametricSeismicSource):
"""
Kite fault source
"""

code = b'K'
MODIFICATIONS = {'adjust_mfd_from_slip'}

Expand All @@ -53,14 +101,11 @@ def __init__(self, source_id, name, tectonic_region_type, mfd,
# TODO add checks
self.profiles = profiles
if profiles_sampling is None:
self.profiles_sampling = (rupture_mesh_spacing /
rupture_aspect_ratio)
self.profiles_sampling = rupture_mesh_spacing / rupture_aspect_ratio
self.rake = rake
self.floating_x_step = floating_x_step
self.floating_y_step = floating_y_step

min_mag, max_mag = self.mfd.get_min_max_mag()

@classmethod
def as_simple_fault(cls, source_id, name, tectonic_region_type,
mfd, rupture_mesh_spacing,
Expand Down Expand Up @@ -102,42 +147,43 @@ def count_ruptures(self) -> int:
:returns:
The number of ruptures that this source generates
"""

if self.num_ruptures:
return self.num_ruptures

# Counting ruptures and rates
rates = {}
count = {}
for rup in self.iter_ruptures():
mag = rup.mag
mag_lab = '{:.2f}'.format(mag)
if mag_lab in rates:
count[mag_lab] += 1
rates[mag_lab] += rup.occurrence_rate
else:
count[mag_lab] = 1
rates[mag_lab] = rup.occurrence_rate

# Saving
self._rupture_rates = rates
self._rupture_count = count

return sum(count[k] for k in count)
self._rupture_count = collections.Counter()
self._rupture_rates = collections.Counter()
for mag, occ_rate, meshes in self._gen_meshes():
n = len(meshes)
mag_str = '{:.2f}'.format(mag)
self._rupture_count[mag_str] += n
self._rupture_rates[mag_str] += occ_rate * n
return sum(self._rupture_count.values())

def iter_ruptures(self, **kwargs):
"""
See :meth:
`openquake.hazardlib.source.base.BaseSeismicSource.iter_ruptures`.
"""

# Set magnitude scaling relationship, temporal occurrence model and
# mesh of the fault surface
msr = self.magnitude_scaling_relationship
tom = self.temporal_occurrence_model
surface = self.surface
step = kwargs.get('step', 1)
for mag, occ_rate, meshes in self._gen_meshes(step):
for msh in meshes[::step]:
surf = KiteSurface(msh)
hypocenter = surf.get_center()
# Yield an instance of a ParametricProbabilisticRupture
yield ppr(mag, self.rake, self.tectonic_region_type,
hypocenter, surf, occ_rate,
self.temporal_occurrence_model)

def _gen_meshes(self, step=1):
surface = self.surface
for mag, mag_occ_rate in self.get_annual_occurrence_rates()[::step]:

# Compute the area, length and width of the ruptures
area = msr.get_median_area(mag=mag, rake=self.rake)
area = self.magnitude_scaling_relationship.get_median_area(
mag=mag, rake=self.rake)
lng, wdt = get_discrete_dimensions(
area, self.rupture_mesh_spacing,
self.rupture_aspect_ratio, self.profiles_sampling)
Expand Down Expand Up @@ -166,84 +212,9 @@ def iter_ruptures(self, **kwargs):

# Get the geometry of all the ruptures that the fault surface
# accommodates
ruptures = []
for rup in self._get_ruptures(surface.mesh, rup_len, rup_wid,
f_strike=fstrike, f_dip=fdip):
ruptures.append(rup)
if len(ruptures) < 1:
continue
occurrence_rate = mag_occ_rate / len(ruptures)

# Rupture generator
for rup in ruptures[::step]:
hypocenter = rup[0].get_center()
# Yield an instance of a ParametricProbabilisticRupture
yield ppr(mag, self.rake, self.tectonic_region_type,
hypocenter, rup[0], occurrence_rate, tom)

def _get_ruptures(self, omsh, rup_s, rup_d, f_strike=1, f_dip=1):
"""
Returns all the ruptures admitted by a given geometry i.e. number of
nodes along strike and dip
:param omsh:
A :class:`~openquake.hazardlib.geo.mesh.Mesh` instance describing
the fault surface
:param rup_s:
Number of cols composing the rupture
:param rup_d:
Number of rows composing the rupture
:param f_strike:
Floating distance along strike (multiple of sampling distance)
:param f_dip:
Floating distance along dip (multiple of sampling distance)
:returns:
A tuple containing the rupture and the indexes of the top right
node of the mesh representing the rupture.
"""

# When f_strike is negative, the floating distance is interpreted as
# a fraction of the rupture length (i.e. a multiple of the sampling
# distance)
if f_strike < 0:
f_strike = int(np.floor(rup_s * abs(f_strike) + 1e-5))
if f_strike < 1:
f_strike = 1

# See f_strike comment above
if f_dip < 0:
f_dip = int(np.floor(rup_d * abs(f_dip) + 1e-5))
if f_dip < 1:
f_dip = 1

# Float the rupture on the mesh describing the surface of the fault
mesh_x_len = omsh.lons.shape[1] - rup_s + 1
mesh_y_len = omsh.lons.shape[0] - rup_d + 1
x_nodes = np.arange(0, mesh_x_len, f_strike)
y_nodes = np.arange(0, mesh_y_len, f_dip)

while (len(x_nodes) > 0 and f_strike > 1
and x_nodes[-1] != omsh.lons.shape[1] - rup_s):
f_strike -= 1
x_nodes = np.arange(0, mesh_x_len, f_strike)

while (len(y_nodes) > 0 and f_dip > 1
and y_nodes[-1] != omsh.lons.shape[0] - rup_d):
f_dip -= 1
y_nodes = np.arange(0, mesh_y_len, f_dip)

for i in x_nodes:
for j in y_nodes:
nel = np.size(omsh.lons[j:j + rup_d, i:i + rup_s])
nna = np.sum(np.isfinite(omsh.lons[j:j + rup_d, i:i + rup_s]))
prc = nna/nel*100.

# Yield only the ruptures that do not contain NaN
if prc > 99.99 and nna >= 4:
msh = Mesh(omsh.lons[j:j + rup_d, i:i + rup_s],
omsh.lats[j:j + rup_d, i:i + rup_s],
omsh.depths[j:j + rup_d, i:i + rup_s])
yield (KiteSurface(msh), j, i)
meshes = _get_meshes(surface.mesh, rup_len, rup_wid, fstrike, fdip)
if len(meshes):
yield mag, mag_occ_rate / len(meshes), meshes

def get_fault_surface_area(self) -> float:
"""
Expand All @@ -261,13 +232,13 @@ def __iter__(self):
if len(self._rupture_rates) == 1: # not splittable
yield self
return
for mag_lab in self._rupture_count:
if self._rupture_rates[mag_lab] == 0:
for mag_str in self._rupture_count:
if self._rupture_rates[mag_str] == 0:
continue
src = copy.copy(self)
mag = float(mag_lab)
src.mfd = mfd.ArbitraryMFD([mag], [self._rupture_rates[mag_lab]])
src.num_ruptures = self._rupture_count[mag_lab]
mag = float(mag_str)
src.mfd = mfd.ArbitraryMFD([mag], [self._rupture_rates[mag_str]])
src.num_ruptures = self._rupture_count[mag_str]
yield src

@property
Expand Down
5 changes: 3 additions & 2 deletions openquake/hazardlib/source/simple_fault.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ def count_ruptures(self):
See :meth:
`openquake.hazardlib.source.base.BaseSeismicSource.count_ruptures`.
"""
if self.num_ruptures:
return self.num_ruptures
whole_fault_surface = SimpleFaultSurface.from_fault_data(
self.fault_trace, self.upper_seismogenic_depth,
self.lower_seismogenic_depth, self.dip, self.rupture_mesh_spacing)
Expand Down Expand Up @@ -328,8 +330,7 @@ def __iter__(self):
if len(mag_rates) == 1: # not splittable
yield self
return
if not hasattr(self, '_nr'):
self.count_ruptures()
self.count_ruptures()
for i, (mag, rate) in enumerate(mag_rates):
# This is needed in order to reproduce the logic in the
# `rupture_count` method
Expand Down

0 comments on commit e7280d5

Please sign in to comment.