Skip to content

Commit

Permalink
Restored gzip and better monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
micheles committed Aug 27, 2024
1 parent 798a70d commit fa5b608
Showing 1 changed file with 32 additions and 32 deletions.
64 changes: 32 additions & 32 deletions openquake/calculators/classical.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
I64 = numpy.int64
TWO24 = 2 ** 24
TWO32 = 2 ** 32
GZIP = None
GZIP = 'gzip'
BUFFER = 1.5 # enlarge the pointsource_distance sphere to fix the weight;
# with BUFFER = 1 we would have lots of apparently light sources
# collected together in an extra-slow task, as it happens in SHARE
Expand Down Expand Up @@ -340,36 +340,36 @@ def agg_dicts(self, acc, dic):
if dic is None:
raise MemoryError('You ran out of memory!')

with self.monitor('aggregating rates', measuremem=True):
sdata = dic['source_data']
self.source_data += sdata
grp_id = dic.pop('grp_id')
self.rel_ruptures[grp_id] += sum(sdata['nrupts'])
cfactor = dic.pop('cfactor')
if cfactor[1] != cfactor[0]:
print('ctxs_per_mag = {:.0f}, cfactor_per_task = {:.1f}'.format(
cfactor[1] / cfactor[2], cfactor[1] / cfactor[0]))
self.cfactor += cfactor

# store rup_data if there are few sites
if self.few_sites and len(dic['rup_data']):
with self.monitor('saving rup_data'):
store_ctxs(self.datastore, dic['rup_data'], grp_id)

rmap = dic.pop('pnemap', None)
source_id = dic.pop('basename', '') # non-empty for disagg_by_src
if source_id:
# accumulate the rates for the given source
acc[source_id] += self.haz.get_rates(rmap, grp_id)
if rmap is None:
# already stored in the workers, case_22
pass
elif isinstance(rmap, numpy.ndarray):
# store the rates directly, case_03
sdata = dic['source_data']
self.source_data += sdata
grp_id = dic.pop('grp_id')
self.rel_ruptures[grp_id] += sum(sdata['nrupts'])
cfactor = dic.pop('cfactor')
if cfactor[1] != cfactor[0]:
print('ctxs_per_mag = {:.0f}, cfactor_per_task = {:.1f}'.format(
cfactor[1] / cfactor[2], cfactor[1] / cfactor[0]))
self.cfactor += cfactor

# store rup_data if there are few sites
if self.few_sites and len(dic['rup_data']):
with self.monitor('saving rup_data'):
store_ctxs(self.datastore, dic['rup_data'], grp_id)

rmap = dic.pop('pnemap', None)
source_id = dic.pop('basename', '') # non-empty for disagg_by_src
if source_id:
# accumulate the rates for the given source
acc[source_id] += self.haz.get_rates(rmap, grp_id)
if rmap is None:
# already stored in the workers, case_22
pass
elif isinstance(rmap, numpy.ndarray):
# store the rates directly, case_03
with self.monitor('storing rates', measuremem=True):
self.store(rmap, self.gids[grp_id])
else:
# add the rates
self.rmap += rmap
else:
# aggregating rates is ultra-fast compared to storing
self.rmap += rmap
return acc

def create_rup(self):
Expand Down Expand Up @@ -552,8 +552,8 @@ def execute_reg(self):
self.datastore.swmr_on() # must come before the Starmap
smap = parallel.Starmap(classical, allargs, h5=self.datastore.hdf5)
acc = smap.reduce(self.agg_dicts, AccumDict(accum=0.))
with self.monitor('storing rates', measuremem=True):
for g in self.rmap.acc:
for g in self.rmap.acc:
with self.monitor('storing rates', measuremem=True):
self.store(self.rmap.to_array([g]), [g])
del self.rmap
if oq.disagg_by_src:
Expand Down

0 comments on commit fa5b608

Please sign in to comment.