Skip to content

Commit

Permalink
Merge pull request #9062 from gem/_rates
Browse files Browse the repository at this point in the history
Stored _rates instead of _poes
  • Loading branch information
micheles committed Oct 3, 2023
2 parents 25dbc86 + c373d67 commit c3747b5
Show file tree
Hide file tree
Showing 18 changed files with 87 additions and 87 deletions.
2 changes: 1 addition & 1 deletion openquake/baselib/performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ def compile(sigstr):
return lambda func: func


# used when reading _poes/sid
# used when reading _rates/sid
@compile(["int64[:, :](uint8[:])",
"int64[:, :](uint16[:])",
"int64[:, :](uint32[:])",
Expand Down
13 changes: 8 additions & 5 deletions openquake/calculators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from openquake.hazardlib.site_amplification import Amplifier
from openquake.hazardlib.site_amplification import AmplFunction
from openquake.hazardlib.calc.filters import SourceFilter, getdefault
from openquake.hazardlib.calc.disagg import to_rates
from openquake.hazardlib.source import rupture
from openquake.hazardlib.shakemap.maps import get_sitecol_shakemap
from openquake.hazardlib.shakemap.gmfs import to_gmfs
Expand Down Expand Up @@ -574,8 +575,9 @@ def pre_execute(self):
haz_sitecol = readinput.get_site_collection(oq, self.datastore)
self.load_crmodel() # must be after get_site_collection
self.read_exposure(haz_sitecol) # define .assets_by_site
self.datastore.create_df('_poes',
readinput.Global.pmap.to_dframe())
df = readinput.Global.pmap.to_dframe()
df.rate = to_rates(df.rate)
self.datastore.create_df('_rates', df)
self.datastore['assetcol'] = self.assetcol
self.datastore['full_lt'] = fake = logictree.FullLogicTree.fake()
self.datastore['trt_rlzs'] = U32([[0]])
Expand Down Expand Up @@ -1059,7 +1061,7 @@ def _gen_riskinputs(self, dstore):
full_lt = dstore['full_lt'].init()
out = []
asset_df = self.assetcol.to_dframe('site_id')
slices = performance.get_slices(dstore['_poes/sid'][:])
slices = performance.get_slices(dstore['_rates/sid'][:])
for sid, assets in asset_df.groupby(asset_df.index):
# hcurves, shape (R, N)
getter = getters.PmapGetter(
Expand Down Expand Up @@ -1217,8 +1219,9 @@ def import_gmfs_hdf5(dstore, oqparam):
:param oqparam: an OqParam instance
:returns: event_ids
"""
# NB: you cannot access an external link if the file it points to is already open,
# therefore you cannot run in parallel two calculations starting from the same GMFs
# NB: you cannot access an external link if the file it points to is
# already open, therefore you cannot run in parallel two calculations
# starting from the same GMFs
dstore['gmf_data'] = h5py.ExternalLink(oqparam.inputs['gmfs'], "gmf_data")
attrs = _getset_attrs(oqparam)
oqparam.hazard_imtls = {imt: [0] for imt in attrs['imts']}
Expand Down
33 changes: 17 additions & 16 deletions openquake/calculators/classical.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from openquake.hazardlib.contexts import read_cmakers, basename, get_maxsize
from openquake.hazardlib.calc.hazard_curve import classical as hazclassical
from openquake.hazardlib.calc import disagg
from openquake.hazardlib.probability_map import ProbabilityMap, poes_dt
from openquake.hazardlib.probability_map import ProbabilityMap, rates_dt
from openquake.commonlib import calc
from openquake.calculators import base, getters

Expand Down Expand Up @@ -177,10 +177,10 @@ def postclassical(pgetter, N, hstats, individual_rlzs,
for sid in sids:
idx = sidx[sid]
with combine_mon:
pc = pgetter.get_pcurve(sid) # shape (L, R)
pc = pgetter.get_hcurve(sid) # shape (L, R)
if amplifier:
pc = amplifier.amplify(ampcode[sid], pc)
# NB: the pcurve have soil levels != IMT levels
# NB: the hcurve have soil levels != IMT levels
if pc.array.sum() == 0: # no data
continue
with compute_mon:
Expand Down Expand Up @@ -265,7 +265,7 @@ def get_rates(self, pmap):

def store_poes(self, pnes, the_sids, gid=0):
"""
Store 1-pnes inside the _poes dataset
Store 1-pnes inside the _rates dataset
"""
avg_poe = 0
# store by IMT to save memory
Expand All @@ -286,15 +286,16 @@ def store_poes(self, pnes, the_sids, gid=0):
if len(idxs) == 0: # happens in case_60
return 0
sids = the_sids[idxs]
hdf5.extend(self.datastore['_poes/sid'], sids)
hdf5.extend(self.datastore['_poes/gid'], gids + gid)
hdf5.extend(self.datastore['_poes/lid'], lids + slc.start)
hdf5.extend(self.datastore['_poes/poe'], poes[idxs, lids, gids])
hdf5.extend(self.datastore['_rates/sid'], sids)
hdf5.extend(self.datastore['_rates/gid'], gids + gid)
hdf5.extend(self.datastore['_rates/lid'], lids + slc.start)
hdf5.extend(self.datastore['_rates/rate'],
disagg.to_rates(poes[idxs, lids, gids]))

# slice_by_sid contains 3x6=18 slices in classical/case_22
# which has 6 IMTs each one with 20 levels
sbs = build_slice_by_sid(sids, self.offset)
hdf5.extend(self.datastore['_poes/slice_by_sid'], sbs)
hdf5.extend(self.datastore['_rates/slice_by_sid'], sbs)
self.offset += len(sids)
avg_poe += poes.mean(axis=(0, 2)) @ self.level_weights[slc]
self.acc['avg_poe'] = avg_poe
Expand Down Expand Up @@ -395,8 +396,8 @@ def init_poes(self):
self.cmakers = read_cmakers(self.datastore, self.csm)
self.cfactor = numpy.zeros(3)
self.rel_ruptures = AccumDict(accum=0) # grp_id -> rel_ruptures
self.datastore.create_df('_poes', poes_dt.items())
self.datastore.create_dset('_poes/slice_by_sid', slice_dt)
self.datastore.create_df('_rates', rates_dt.items())
self.datastore.create_dset('_rates/slice_by_sid', slice_dt)
# NB: compressing the dataset causes a big slowdown in writing :-(

oq = self.oqparam
Expand Down Expand Up @@ -444,7 +445,7 @@ def execute(self):
oq.mags_by_trt = {
trt: python3compat.decode(dset[:])
for trt, dset in parent['source_mags'].items()}
if '_poes' in parent:
if '_rates' in parent:
self.build_curves_maps() # repeat post-processing
return {}
else:
Expand Down Expand Up @@ -475,7 +476,7 @@ def execute(self):
logging.info('cfactor = {:_d}/{:_d} = {:.1f}'.format(
int(self.cfactor[1]), int(self.cfactor[0]),
self.cfactor[1] / self.cfactor[0]))
if '_poes' in self.datastore:
if '_rates' in self.datastore:
self.build_curves_maps()
if not oq.hazard_calculation_id:
self.classical_time = time.time() - t0
Expand Down Expand Up @@ -663,19 +664,19 @@ def build_curves_maps(self):
if not oq.hazard_curves: # do nothing
return
N, S, M, P, L1, individual = self._create_hcurves_maps()
poes_gb = self.datastore.getsize('_poes') / 1024**3
poes_gb = self.datastore.getsize('_rates') / 1024**3
if poes_gb < 1:
ct = int(poes_gb * 32) or 1
else:
ct = int(poes_gb) + 32 # number of tasks > number of GB
if ct > 1:
logging.info('Producing %d postclassical tasks', ct)
if '_poes' in set(self.datastore):
if '_rates' in set(self.datastore):
dstore = self.datastore
else:
dstore = self.datastore.parent
sites_per_task = int(numpy.ceil(self.N / ct))
sbs = dstore['_poes/slice_by_sid'][:]
sbs = dstore['_rates/slice_by_sid'][:]
sbs['sid'] //= sites_per_task
# NB: there is a genious idea here, to split in tasks by using
# the formula ``taskno = sites_ids // sites_per_task`` and then
Expand Down
4 changes: 2 additions & 2 deletions openquake/calculators/classical_bcr.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ def classical_bcr(riskinputs, param, monitor):
haz = ri.hazard_getter.get_hazard()
for taxo, assets in ri.asset_df.groupby('taxonomy'):
for rlz in range(R):
pcurve = haz.extract(rlz)
out = crmodel.get_output(assets, pcurve)
hcurve = haz.extract(rlz)
out = crmodel.get_output(assets, hcurve)
for asset, (eal_orig, eal_retro, bcr) in zip(
assets.to_records(), out['structural']):
aval = asset['value-structural']
Expand Down
4 changes: 2 additions & 2 deletions openquake/calculators/classical_damage.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def classical_damage(riskinputs, param, monitor):
haz = ri.hazard_getter.get_hazard()
for taxo, assets in ri.asset_df.groupby('taxonomy'):
for rlz in range(R):
pcurve = haz.extract(rlz)
out = crmodel.get_output(assets, pcurve)
hcurve = haz.extract(rlz)
out = crmodel.get_output(assets, hcurve)
for li, loss_type in enumerate(crmodel.loss_types):
for a, frac in zip(assets.ordinal, out[loss_type]):
result[a][rlz, li] = frac
Expand Down
6 changes: 3 additions & 3 deletions openquake/calculators/classical_risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def classical_risk(riskinputs, oqparam, monitor):
haz = ri.hazard_getter.get_hazard()
for taxo, asset_df in ri.asset_df.groupby('taxonomy'):
for rlz in range(R):
pcurve = haz.extract(rlz)
out = crmodel.get_output(asset_df, pcurve)
hcurve = haz.extract(rlz)
out = crmodel.get_output(asset_df, hcurve)
for li, loss_type in enumerate(crmodel.loss_types):
# loss_curves has shape (A, C)
for i, asset in enumerate(asset_df.to_records()):
Expand Down Expand Up @@ -92,7 +92,7 @@ def pre_execute(self):
"""
oq = self.oqparam
super().pre_execute()
if '_poes' not in self.datastore: # when building short report
if '_rates' not in self.datastore: # when building short report
return
full_lt = self.datastore['full_lt']
self.realizations = full_lt.get_realizations()
Expand Down
12 changes: 6 additions & 6 deletions openquake/calculators/disaggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ def get_curve(self, sid, rlzs):
:returns: a list of Z arrays of PoEs
"""
poes = []
pcurve = self.pgetter.get_pcurve(sid)
hcurve = self.pgetter.get_hcurve(sid)
for z, rlz in enumerate(rlzs):
pc = pcurve.extract(rlz)
pc = hcurve.extract(rlz)
if z == 0:
self.curves.append(pc.array[:, 0])
poes.append(pc.array[:, 0])
Expand Down Expand Up @@ -197,7 +197,7 @@ def full_disaggregation(self):
self.M = len(self.imts)
dstore = (self.datastore.parent if self.datastore.parent
else self.datastore)
nrows = len(dstore['_poes/sid'])
nrows = len(dstore['_rates/sid'])
self.pgetter = getters.PmapGetter(
dstore, full_lt, [(0, nrows + 1)], oq.imtls, oq.poes)

Expand All @@ -207,12 +207,12 @@ def full_disaggregation(self):
rlzs = numpy.zeros((self.N, Z), int)
if self.R > 1:
for sid in self.sitecol.sids:
pcurve = self.pgetter.get_pcurve(sid)
hcurve = self.pgetter.get_hcurve(sid)
mean = getters.build_stat_curve(
pcurve, oq.imtls, stats.mean_curve, full_lt.weights)
hcurve, oq.imtls, stats.mean_curve, full_lt.weights)
# get the closest realization to the mean
rlzs[sid] = util.closest_to_ref(
pcurve.array.T, mean.array)[:Z]
hcurve.array.T, mean.array)[:Z]
self.datastore['best_rlzs'] = rlzs
else:
Z = len(oq.rlz_index)
Expand Down
42 changes: 23 additions & 19 deletions openquake/calculators/getters.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from openquake.hazardlib import probability_map, stats
from openquake.hazardlib.calc.disagg import to_rates, to_probs
from openquake.hazardlib.source.rupture import (
BaseRupture, RuptureProxy, EBRupture, get_ebr)
BaseRupture, RuptureProxy, get_ebr)
from openquake.commonlib import datastore

U16 = numpy.uint16
Expand All @@ -40,11 +40,11 @@ class NotFound(Exception):
pass


def build_stat_curve(pcurve, imtls, stat, weights, use_rates=False):
def build_stat_curve(hcurve, imtls, stat, weights, use_rates=False):
"""
Build statistics by taking into account IMT-dependent weights
"""
poes = pcurve.array.T # shape R, L
poes = hcurve.array.T # shape R, L
assert len(poes) == len(weights), (len(poes), len(weights))
L = imtls.size
array = numpy.zeros((L, 1))
Expand Down Expand Up @@ -99,11 +99,11 @@ def get_hcurve(self, src_id, imt=None, site_id=0, gsim_idx=None):
assert ';' in src_id, src_id # must be a realization specific src_id
imt_slc = self.imtls(imt) if imt else slice(None)
start, gsims, weights = self.bysrc[src_id]
dset = self.dstore['_poes']
dset = self.dstore['_rates']
if gsim_idx is None:
curves = dset[start:start + len(gsims), site_id, imt_slc]
return weights @ curves
return dset[start + gsim_idx, site_id, imt_slc]
return to_probs(dset[start + gsim_idx, site_id, imt_slc])

# NB: not used right now
def get_hcurves(self, src, imt=None, site_id=0, gsim_idx=None):
Expand Down Expand Up @@ -192,15 +192,15 @@ def init(self):
G = len(self.trt_rlzs)
with hdf5.File(self.filename) as dstore:
for start, stop in self.slices:
poes_df = dstore.read_df('_poes', slc=slice(start, stop))
for sid, df in poes_df.groupby('sid'):
rates_df = dstore.read_df('_rates', slc=slice(start, stop))
for sid, df in rates_df.groupby('sid'):
try:
array = self._pmap[sid].array
except KeyError:
array = numpy.zeros((self.L, G))
self._pmap[sid] = probability_map.ProbabilityCurve(
array)
array[df.lid, df.gid] = df.poe
array[df.lid, df.gid] = to_probs(df.rate)
return self._pmap

# used in risk calculations where there is a single site per getter
Expand All @@ -215,20 +215,24 @@ def get_hazard(self, gsim=None):
# classical_risk/case_3 for the first site
return probability_map.ProbabilityCurve(
numpy.zeros((self.L, self.num_rlzs)))
return self.get_pcurve(self.sids[0])
return self.get_hcurve(self.sids[0])

def get_pcurve(self, sid): # used in classical
def get_hcurve(self, sid): # used in classical
"""
:returns: a ProbabilityCurve of shape L, R
:param sid: a site ID
:returns: a ProbabilityCurve of shape L, R for the given site ID
"""
pmap = self.init()
pc0 = probability_map.ProbabilityCurve(
numpy.zeros((self.L, self.num_rlzs)))
if sid not in pmap: # no hazard for sid
return pc0
for g, trs in enumerate(self.trt_rlzs):
probability_map.combine_probs(
pc0.array, pmap[sid].array[:, g], trs % TWO24)
for g, t_rlzs in enumerate(self.trt_rlzs):
rlzs = t_rlzs % TWO24
rates = to_rates(pmap[sid].array[:, g])
for rlz in rlzs:
pc0.array[:, rlz] += rates
pc0.array = to_probs(pc0.array)
return pc0

def get_mean(self):
Expand All @@ -243,16 +247,16 @@ def get_mean(self):
if len(self.weights) == 1: # one realization
# the standard deviation is zero
pmap = self.get(0)
for sid, pcurve in pmap.items():
array = numpy.zeros(pcurve.array.shape)
array[:, 0] = pcurve.array[:, 0]
pcurve.array = array
for sid, hcurve in pmap.items():
array = numpy.zeros(hcurve.array.shape)
array[:, 0] = hcurve.array[:, 0]
hcurve.array = array
return pmap
L = self.imtls.size
pmap = probability_map.ProbabilityMap(self.sids, L, 1)
for sid in self.sids:
pmap[sid] = build_stat_curve(
self.get_pcurve(sid),
self.get_hcurve(sid),
self.imtls, stats.mean_curve, self.weights)
return pmap

Expand Down
6 changes: 2 additions & 4 deletions openquake/calculators/tests/classical_damage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

import numpy
from openquake.qa_tests_data.classical_damage import (
case_1, case_2, case_1a, case_1b, case_1c, case_2a, case_2b, case_3a,
case_4a, case_4b, case_4c, case_5a, case_6a, case_6b, case_7a, case_7b,
case_7c, case_8a, case_master)
from openquake.calculators.export import export
from openquake.calculators.tests import (
CalculatorTestCase, strip_calc_id, NOT_DARWIN)

import numpy
from openquake.calculators.tests import CalculatorTestCase, strip_calc_id

aae = numpy.testing.assert_almost_equal

Expand Down
5 changes: 0 additions & 5 deletions openquake/calculators/tests/classical_risk_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ def test_case_5(self):
# test with different curve resolution for different taxonomies
self.run_calc(case_5.__file__, 'job_h.ini,job_r.ini')

# check the cutoff in classical.fix_ones
df = self.calc.datastore.read_df('_poes')
num_ones = (df.poe == 1.).sum()
self.assertEqual(num_ones, 0)

# check mean loss curves
[fname] = export(('loss_curves/mean', 'csv'), self.calc.datastore)
self.assertEqualFiles('expected/loss_curves-mean.csv', fname)
Expand Down
6 changes: 3 additions & 3 deletions openquake/calculators/tests/classical_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from unittest import mock
from openquake.baselib import parallel, general, config
from openquake.baselib.python3compat import decode
from openquake.hazardlib import InvalidFile, nrml
from openquake.hazardlib import InvalidFile, nrml, calc
from openquake.hazardlib.source.rupture import get_ruptures
from openquake.hazardlib.sourcewriter import write_source_model
from openquake.calculators.views import view, text_table
Expand Down Expand Up @@ -620,13 +620,13 @@ def test_case_76(self):
L1 = L // len(oq.imtls)
branches = self.calc.datastore['full_lt/gsim_lt'].branches
gsims = [br.gsim for br in branches]
df = self.calc.datastore.read_df('_poes')
df = self.calc.datastore.read_df('_rates')
del df['sid']
for g, gsim in enumerate(gsims):
curve = numpy.zeros(L1, oq.imt_dt())
df_for_g = df[df.gid == g]
poes = numpy.zeros(L)
poes[df_for_g.lid] = df_for_g.poe
poes[df_for_g.lid] = calc.disagg.to_probs(df_for_g.rate)
for im in oq.imtls:
curve[im] = poes[oq.imtls(im)]
gs = gsim.__class__.__name__
Expand Down
Loading

0 comments on commit c3747b5

Please sign in to comment.