Skip to content

Commit

Permalink
Merge pull request #10059 from gem/pick-model
Browse files Browse the repository at this point in the history
ARISTOTLE: if the hypocenter is close to the boundaries of many mosaic models, allow to select one of them...
  • Loading branch information
ptormene authored Oct 16, 2024
2 parents cfda150 + bf55392 commit f157880
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 117 deletions.
5 changes: 3 additions & 2 deletions openquake/calculators/checkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ def assert_close(tbl, fname):
with open(str(fname) + '~', 'w') as f:
f.write(txt)
with open(fname) as f:
exp = f.read()
aac(_data2rows(exp), _data2rows(txt), atol=1E-5, rtol=1E-3)
expected = f.read()
for exp, got in zip(_data2rows(expected), _data2rows(txt)):
aac(exp, got, atol=1E-5, rtol=1E-3)


def check(ini, hc_id=None, exports='', what='', prefix=''):
Expand Down
29 changes: 18 additions & 11 deletions openquake/calculators/event_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from openquake.baselib import config, hdf5, parallel, python3compat
from openquake.baselib.general import (
AccumDict, humansize, groupby, block_splitter)
from openquake.engine.aristotle import get_close_mosaic_models
from openquake.hazardlib.geo.packager import fiona
from openquake.hazardlib.map_array import MapArray, get_mean_curve
from openquake.hazardlib.stats import geom_avg_std, compute_stats
Expand All @@ -38,7 +39,6 @@
from openquake.hazardlib.calc.gmf import GmfComputer
from openquake.hazardlib.calc.conditioned_gmfs import ConditionedGmfComputer
from openquake.hazardlib import logictree, InvalidFile
from openquake.hazardlib.geo.utils import geolocate
from openquake.hazardlib.calc.stochastic import get_rup_array, rupture_dt
from openquake.hazardlib.source.rupture import (
RuptureProxy, EBRupture, get_ruptures)
Expand Down Expand Up @@ -218,7 +218,8 @@ def _event_based(proxies, cmaker, stations, srcfilter, shr,
if stations and stations[0] is not None: # conditioned GMFs
assert cmaker.scenario
with shr['mea'] as mea, shr['tau'] as tau, shr['phi'] as phi:
df = computer.compute_all([mea, tau, phi], max_iml, mmon, cmon, umon)
df = computer.compute_all(
[mea, tau, phi], max_iml, mmon, cmon, umon)
else: # regular GMFs
df = computer.compute_all(None, max_iml, mmon, cmon, umon)
if oq.mea_tau_phi:
Expand Down Expand Up @@ -262,7 +263,8 @@ def event_based(proxies, cmaker, stations, dstore, monitor):
sitecol = dstore['sitecol']
if 'complete' in dstore:
sitecol.complete = dstore['complete']
srcfilter = SourceFilter(sitecol.complete, oq.maximum_distance(cmaker.trt))
srcfilter = SourceFilter(
sitecol.complete, oq.maximum_distance(cmaker.trt))
dset = dstore['rupgeoms']
for proxy in proxies:
proxy.geom = dset[proxy['geom_id']]
Expand Down Expand Up @@ -592,13 +594,17 @@ def _read_scenario_ruptures(self):
gsim_lt = readinput.get_gsim_lt(oq)
if oq.aristotle:
# the gsim_lt is read from the exposure.hdf5 file
mosaic_df = readinput.read_mosaic_df(buffer=1)
if oq.rupture_dict:
lonlat = [[oq.rupture_dict['lon'], oq.rupture_dict['lat']]]
elif oq.rupture_xml:
hypo = readinput.get_rupture(oq).hypocenter
lonlat = [[hypo.x, hypo.y]]
[oq.mosaic_model] = geolocate(F32(lonlat), mosaic_df)
if not oq.mosaic_model:
if oq.rupture_dict:
lon, lat = [oq.rupture_dict['lon'], oq.rupture_dict['lat']]
elif oq.rupture_xml:
hypo = readinput.get_rupture(oq).hypocenter
lon, lat = [hypo.x, hypo.y]
mosaic_models = get_close_mosaic_models(lon, lat, 100)
# NOTE: using the first mosaic model
oq.mosaic_model = mosaic_models[0]
if len(mosaic_models) > 1:
logging.info('Using the "%s" model' % oq.mosaic_model)
[expo_hdf5] = oq.inputs['exposure']
if oq.mosaic_model == '???':
raise ValueError(
Expand Down Expand Up @@ -717,7 +723,8 @@ def execute(self):
dstore.create_dset('gmf_data/slice_by_event', slice_dt)

# event_based in parallel
smap = starmap_from_rups(event_based, oq, self.full_lt, self.sitecol, dstore)
smap = starmap_from_rups(
event_based, oq, self.full_lt, self.sitecol, dstore)
acc = smap.reduce(self.agg_dicts)
if 'gmf_data' not in dstore:
return acc
Expand Down
10 changes: 8 additions & 2 deletions openquake/calculators/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ def _gen_table(self):
yield '</table>\n'


def fix_newlines(row):
return tuple(col.replace('\n', '\\n') for col in row)


def text_table(data, header=None, fmt=None, ext='rst'):
"""
Build a .rst (or .org) table from a matrix or a DataFrame
Expand Down Expand Up @@ -204,6 +208,8 @@ def text_table(data, header=None, fmt=None, ext='rst'):
fmt = functools.partial(scientificformat, fmt=fmt) if fmt else form
for row in data:
tup = tuple(fmt(c) for c in row)
if ext in 'rst org':
tup = fix_newlines(tup)
for (i, col) in enumerate(tup):
col_sizes[i] = max(col_sizes[i], len(col))
if len(tup) != len(col_sizes):
Expand All @@ -227,8 +233,8 @@ def text_table(data, header=None, fmt=None, ext='rst'):
lines = [','.join(header)]
else:
lines = [sepline]
for row in body:
lines.append(templ % row)
for tup in body:
lines.append(templ % tup)
if ext == 'rst':
lines.append(sepline)
return '\n'.join(lines)
Expand Down
4 changes: 4 additions & 0 deletions openquake/commands/mosaic.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def callback(job_id, params, exc=None):
def aristotle(exposure_hdf5=None, *,
rupfname: str=FAMOUS,
stations: str=None,
mosaic_model: str=None,
maximum_distance: float=300.,
maximum_distance_stations: float=None,
asset_hazard_distance: float=15.,
Expand Down Expand Up @@ -311,6 +312,7 @@ def aristotle(exposure_hdf5=None, *,
main_cmd(
usgs_id, rupture_file, rupdic, callback,
maximum_distance=maximum_distance,
mosaic_model=mosaic_model,
trt=trt, truncation_level=truncation_level,
number_of_ground_motion_fields=number_of_ground_motion_fields,
asset_hazard_distance=asset_hazard_distance,
Expand All @@ -320,6 +322,7 @@ def aristotle(exposure_hdf5=None, *,
else: # assume .xml
main_cmd('WithRuptureFile', rupfname, None, callback,
maximum_distance=maximum_distance,
mosaic_model=mosaic_model,
trt=trt, truncation_level=truncation_level,
number_of_ground_motion_fields=number_of_ground_motion_fields,
asset_hazard_distance=asset_hazard_distance,
Expand All @@ -338,6 +341,7 @@ def aristotle(exposure_hdf5=None, *,
aristotle.rupfname = ('Filename with the same format as famous_ruptures.csv '
'or file rupture_model.xml')
aristotle.stations = 'Path to a csv file with the station data'
aristotle.mosaic_model = 'Mosaic model 3-characters code'
aristotle.maximum_distance = 'Maximum distance in km'
aristotle.maximum_distance_stations = "Maximum distance from stations in km"
aristotle.number_of_ground_motion_fields = 'Number of ground motion fields'
Expand Down
156 changes: 100 additions & 56 deletions openquake/engine/aristotle.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import os
import getpass
import logging
from dataclasses import dataclass
import numpy
from json.decoder import JSONDecodeError
from urllib.error import HTTPError
Expand All @@ -35,24 +36,62 @@
CDIR = os.path.dirname(__file__) # openquake/engine


def get_trts_around(rupdic, exposure_hdf5):
@dataclass
class AristotleParam:
rupture_dict: dict
time_event: str
maximum_distance: float
mosaic_model: str
trt: str
truncation_level: float
number_of_ground_motion_fields: int
asset_hazard_distance: float
ses_seed: int
local_timestamp: str = None
exposure_hdf5: str = None
station_data_file: str = None
maximum_distance_stations: float = None
ignore_shakemap: bool = False


def get_close_mosaic_models(lon, lat, max_dist):
"""
:returns: list of TRTs for the mosaic model covering lon, lat
:param lon: longitude
:param lat: latitude
:param max_dist: max dist with respect to a model to consider it relevant
:returns: list of mosaic models with the max_dist distance
"""
lon, lat = rupdic['lon'], rupdic['lat']
usgs_id = rupdic.get('usgs_id', '')
lonlats = numpy.array([[lon, lat]])
mosaic_df = readinput.read_mosaic_df(buffer=1)
[mosaic_model] = geo.utils.geolocate(lonlats, mosaic_df)
if mosaic_model == '???':
raise ValueError(f'({lon}, {lat}) is not covered by the mosaic!')
hypocenter = geo.Point(lon, lat)
minx, miny, maxx, maxy = geo.utils.get_bounding_box([hypocenter], max_dist)
close_mosaic_models = set()
bbox_vertices = [(minx, maxy), (maxx, maxy), (maxx, miny), (minx, miny)]
for vertex in bbox_vertices:
lonlats = numpy.array([vertex[0], vertex[1]])
[mosaic_model] = geo.utils.geolocate([lonlats], mosaic_df)
close_mosaic_models.add(mosaic_model)
close_mosaic_models = sorted([model for model in close_mosaic_models
if model != '???'])
if not close_mosaic_models:
raise ValueError(
f'({lon}, {lat}) is farther than {max_dist}km'
f' from any mosaic model!')
elif len(close_mosaic_models) > 1:
logging.info(
'(%s, %s) is closer than %skm with respect to the following'
' mosaic models: %s' % (lon, lat, max_dist, close_mosaic_models))
return close_mosaic_models


def get_trts_around(mosaic_model, exposure_hdf5):
"""
:returns: list of TRTs for the given mosaic model
"""
with hdf5.File(exposure_hdf5) as f:
df = f.read_df('model_trt_gsim_weight',
sel={'model': mosaic_model.encode()})
logging.info('Considering %s[%s]: (%s, %s)',
usgs_id, mosaic_model, lon, lat)
trts = [trt.decode('utf8') for trt in df.trt.unique()]
return trts, mosaic_model
return trts


def get_tmap_keys(exposure_hdf5, countries):
Expand Down Expand Up @@ -99,28 +138,22 @@ def get_rupture_dict(dic, ignore_shakemap=False):
return rupdic


def get_aristotle_allparams(rupture_dict, time_event,
maximum_distance, trt,
truncation_level, number_of_ground_motion_fields,
asset_hazard_distance, ses_seed,
local_timestamp=None,
exposure_hdf5=None, station_data_file=None,
maximum_distance_stations=None,
ignore_shakemap=False):
def get_aristotle_params(arist):
"""
:param arist: an instance of AristotleParam
:returns: a list of dictionaries suitable for an Aristotle calculation
"""
if exposure_hdf5 is None:
exposure_hdf5 = os.path.join(
if arist.exposure_hdf5 is None:
arist.exposure_hdf5 = os.path.join(
config.directory.mosaic_dir, 'exposure.hdf5')
inputs = {'exposure': [exposure_hdf5],
inputs = {'exposure': [arist.exposure_hdf5],
'job_ini': '<in-memory>'}
rupdic = get_rupture_dict(rupture_dict, ignore_shakemap)
if station_data_file is None:
rupdic = get_rupture_dict(arist.rupture_dict, arist.ignore_shakemap)
if arist.station_data_file is None:
# NOTE: giving precedence to the station_data_file uploaded via form
try:
station_data_file = download_station_data_file(
rupture_dict['usgs_id'])
arist.station_data_file = download_station_data_file(
arist.rupture_dict['usgs_id'])
except HTTPError as exc:
logging.info(f'Station data is not available: {exc}')
except (KeyError, LookupError, UnicodeDecodeError,
Expand All @@ -129,40 +162,49 @@ def get_aristotle_allparams(rupture_dict, time_event,
rupture_file = rupdic.pop('rupture_file')
if rupture_file:
inputs['rupture_model'] = rupture_file
if station_data_file:
inputs['station_data'] = station_data_file
if trt is None:
trts, _ = get_trts_around(rupdic, exposure_hdf5)
trt = trts[0]
if arist.station_data_file:
inputs['station_data'] = arist.station_data_file
if not arist.mosaic_model:
lon, lat = rupdic['lon'], rupdic['lat']
mosaic_models = get_close_mosaic_models(lon, lat, 100)
# NOTE: using the first mosaic model
arist.mosaic_model = mosaic_models[0]
if len(mosaic_models) > 1:
logging.info('Using the "%s" model' % arist.mosaic_model)

if arist.trt is None:
# NOTE: using the first tectonic region type
arist.trt = get_trts_around(arist.mosaic_model, arist.exposure_hdf5)[0]
params = dict(
calculation_mode='scenario_risk',
rupture_dict=str(rupdic),
time_event=time_event,
maximum_distance=str(maximum_distance),
tectonic_region_type=trt,
truncation_level=str(truncation_level),
number_of_ground_motion_fields=str(number_of_ground_motion_fields),
asset_hazard_distance=str(asset_hazard_distance),
ses_seed=str(ses_seed),
time_event=arist.time_event,
maximum_distance=str(arist.maximum_distance),
mosaic_model=arist.mosaic_model,
tectonic_region_type=arist.trt,
truncation_level=str(arist.truncation_level),
number_of_ground_motion_fields=str(arist.number_of_ground_motion_fields),
asset_hazard_distance=str(arist.asset_hazard_distance),
ses_seed=str(arist.ses_seed),
inputs=inputs)
if local_timestamp is not None:
params['local_timestamp'] = local_timestamp
if maximum_distance_stations is not None:
params['maximum_distance_stations'] = str(maximum_distance_stations)
if arist.local_timestamp is not None:
params['local_timestamp'] = arist.local_timestamp
if arist.maximum_distance_stations is not None:
params['maximum_distance_stations'] = str(arist.maximum_distance_stations)
oq = readinput.get_oqparam(params)
# NB: fake h5 to cache `get_site_model` and avoid multiple associations
_sitecol, assetcol, _discarded, _exp = readinput.get_sitecol_assetcol(
oq, h5={'performance_data': hdf5.FakeDataset()})
id0s = numpy.unique(assetcol['ID_0'])
countries = set(assetcol.tagcol.ID_0[i] for i in id0s)
tmap_keys = get_tmap_keys(exposure_hdf5, countries)
tmap_keys = get_tmap_keys(arist.exposure_hdf5, countries)
if not tmap_keys:
raise LookupError(f'No taxonomy mapping was found for {countries}')
logging.root.handlers = [] # avoid breaking the logs
params['description'] = (
f'{rupdic["usgs_id"]} ({rupdic["lat"]}, {rupdic["lon"]})'
f' M{rupdic["mag"]}')
return [params]
return params


def main_web(allparams, jobctxs,
Expand All @@ -184,7 +226,8 @@ def main_web(allparams, jobctxs,
def main_cmd(usgs_id, rupture_file=None, rupture_dict=None,
callback=trivial_callback, *,
time_event='day',
maximum_distance='300', trt=None, truncation_level='3',
maximum_distance='300', mosaic_model=None, trt=None,
truncation_level='3',
number_of_ground_motion_fields='10', asset_hazard_distance='15',
ses_seed='42',
local_timestamp=None, exposure_hdf5=None, station_data_file=None,
Expand All @@ -195,25 +238,25 @@ def main_cmd(usgs_id, rupture_file=None, rupture_dict=None,
if rupture_dict is None:
rupture_dict = dict(usgs_id=usgs_id, rupture_file=rupture_file)
try:
allparams = get_aristotle_allparams(
rupture_dict, time_event, maximum_distance, trt,
truncation_level,
arist = AristotleParam(
rupture_dict, time_event, maximum_distance, mosaic_model,
trt, truncation_level,
number_of_ground_motion_fields, asset_hazard_distance,
ses_seed, local_timestamp, exposure_hdf5, station_data_file,
maximum_distance_stations, ignore_shakemap)
oqparams = get_aristotle_params(arist)
except Exception as exc:
callback(None, dict(usgs_id=usgs_id), exc=exc)
return
# in testing mode create new job contexts
user = getpass.getuser()
jobctxs = engine.create_jobs(allparams, 'warn', None, user, None)
for params, job in zip(allparams, jobctxs):
try:
engine.run_jobs([job])
except Exception as exc:
callback(job.calc_id, params, exc=exc)
else:
callback(job.calc_id, params, exc=None)
[job] = engine.create_jobs([oqparams], 'warn', None, user, None)
try:
engine.run_jobs([job])
except Exception as exc:
callback(job.calc_id, oqparams, exc=exc)
else:
callback(job.calc_id, oqparams, exc=None)


main_cmd.usgs_id = 'ShakeMap ID' # i.e. us6000m0xl
Expand All @@ -222,6 +265,7 @@ def main_cmd(usgs_id, rupture_file=None, rupture_dict=None,
main_cmd.callback = ''
main_cmd.time_event = 'Time of the event (avg, day, night or transit)'
main_cmd.maximum_distance = 'Maximum distance in km'
main_cmd.mosaic_model = 'Mosaic model 3-characters code (optional)'
main_cmd.trt = 'Tectonic region type'
main_cmd.truncation_level = 'Truncation level'
main_cmd.number_of_ground_motion_fields = 'Number of ground motion fields'
Expand Down
Loading

0 comments on commit f157880

Please sign in to comment.