From fa5b608674cd4226b9dfc5fedb0f6c50efa27543 Mon Sep 17 00:00:00 2001 From: Michele Simionato Date: Tue, 27 Aug 2024 08:58:01 +0200 Subject: [PATCH] Restored gzip and better monitoring --- openquake/calculators/classical.py | 64 +++++++++++++++--------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/openquake/calculators/classical.py b/openquake/calculators/classical.py index 4168c535150..8888f4cdbb7 100644 --- a/openquake/calculators/classical.py +++ b/openquake/calculators/classical.py @@ -46,7 +46,7 @@ I64 = numpy.int64 TWO24 = 2 ** 24 TWO32 = 2 ** 32 -GZIP = None +GZIP = 'gzip' BUFFER = 1.5 # enlarge the pointsource_distance sphere to fix the weight; # with BUFFER = 1 we would have lots of apparently light sources # collected together in an extra-slow task, as it happens in SHARE @@ -340,36 +340,36 @@ def agg_dicts(self, acc, dic): if dic is None: raise MemoryError('You ran out of memory!') - with self.monitor('aggregating rates', measuremem=True): - sdata = dic['source_data'] - self.source_data += sdata - grp_id = dic.pop('grp_id') - self.rel_ruptures[grp_id] += sum(sdata['nrupts']) - cfactor = dic.pop('cfactor') - if cfactor[1] != cfactor[0]: - print('ctxs_per_mag = {:.0f}, cfactor_per_task = {:.1f}'.format( - cfactor[1] / cfactor[2], cfactor[1] / cfactor[0])) - self.cfactor += cfactor - - # store rup_data if there are few sites - if self.few_sites and len(dic['rup_data']): - with self.monitor('saving rup_data'): - store_ctxs(self.datastore, dic['rup_data'], grp_id) - - rmap = dic.pop('pnemap', None) - source_id = dic.pop('basename', '') # non-empty for disagg_by_src - if source_id: - # accumulate the rates for the given source - acc[source_id] += self.haz.get_rates(rmap, grp_id) - if rmap is None: - # already stored in the workers, case_22 - pass - elif isinstance(rmap, numpy.ndarray): - # store the rates directly, case_03 + sdata = dic['source_data'] + self.source_data += sdata + grp_id = dic.pop('grp_id') + self.rel_ruptures[grp_id] += sum(sdata['nrupts']) + cfactor = dic.pop('cfactor') + if cfactor[1] != cfactor[0]: + print('ctxs_per_mag = {:.0f}, cfactor_per_task = {:.1f}'.format( + cfactor[1] / cfactor[2], cfactor[1] / cfactor[0])) + self.cfactor += cfactor + + # store rup_data if there are few sites + if self.few_sites and len(dic['rup_data']): + with self.monitor('saving rup_data'): + store_ctxs(self.datastore, dic['rup_data'], grp_id) + + rmap = dic.pop('pnemap', None) + source_id = dic.pop('basename', '') # non-empty for disagg_by_src + if source_id: + # accumulate the rates for the given source + acc[source_id] += self.haz.get_rates(rmap, grp_id) + if rmap is None: + # already stored in the workers, case_22 + pass + elif isinstance(rmap, numpy.ndarray): + # store the rates directly, case_03 + with self.monitor('storing rates', measuremem=True): self.store(rmap, self.gids[grp_id]) - else: - # add the rates - self.rmap += rmap + else: + # aggregating rates is ultra-fast compared to storing + self.rmap += rmap return acc def create_rup(self): @@ -552,8 +552,8 @@ def execute_reg(self): self.datastore.swmr_on() # must come before the Starmap smap = parallel.Starmap(classical, allargs, h5=self.datastore.hdf5) acc = smap.reduce(self.agg_dicts, AccumDict(accum=0.)) - with self.monitor('storing rates', measuremem=True): - for g in self.rmap.acc: + for g in self.rmap.acc: + with self.monitor('storing rates', measuremem=True): self.store(self.rmap.to_array([g]), [g]) del self.rmap if oq.disagg_by_src: