Skip to content

Commit

Permalink
Merge pull request #10069 from gem/export_job_zip
Browse files Browse the repository at this point in the history
Export job zip/1
  • Loading branch information
micheles authored Oct 18, 2024
2 parents f939746 + f99bbe2 commit 6e27123
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 98 deletions.
2 changes: 0 additions & 2 deletions openquake/calculators/event_based_risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,6 @@ def pre_execute(self):
self.events_per_sid = numpy.zeros(self.N, U32)
self.datastore.swmr_on()
set_oqparam(oq, self.assetcol, self.datastore)
ct = oq.concurrent_tasks or 1
oq.maxweight = int(oq.ebrisk_maxsize / ct)
self.A = A = len(self.assetcol)
self.L = L = len(oq.loss_types)
ELT = len(oq.ext_loss_types)
Expand Down
3 changes: 2 additions & 1 deletion openquake/calculators/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
'asce07': 'ASCE 7 Parameters',
'asce41': 'ASCE 41 Parameters',
'mag_dst_eps_sig': "Deterministic Earthquake Scenarios",
'assetcol': 'Exposure',
'job': 'job.zip',
'asset_risk': 'Exposure + Risk',
'assetcol': 'Exposure CSV',
'gmf_data': 'Ground Motion Fields',
'damages-rlzs': 'Asset Risk Distributions',
'damages-stats': 'Asset Risk Statistics',
Expand Down
10 changes: 10 additions & 0 deletions openquake/calculators/export/hazard.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,16 @@ def export_gmf_data_csv(ekey, dstore):
return [fname, f]


@export.add(('site_model', 'csv'))
def export_site_model_csv(ekey, dstore):
sitecol = dstore['sitecol']
fname = dstore.build_fname(ekey[0], '', ekey[1])
writers.CsvWriter(fmt=writers.FIVEDIGITS).save(
sitecol.array, fname, comment=dstore.metadata)
return [fname]



@export.add(('gmf_data', 'hdf5'))
def export_gmf_data_hdf5(ekey, dstore):
fname = dstore.build_fname('gmf', 'data', 'hdf5')
Expand Down
157 changes: 129 additions & 28 deletions openquake/calculators/export/risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

import os
import json
import shutil
import tempfile
import itertools
import collections
import numpy
import pandas

from openquake.baselib import hdf5, writers, general
from openquake.baselib import hdf5, writers, general, node, config
from openquake.baselib.python3compat import decode
from openquake.hazardlib import nrml
from openquake.hazardlib.stats import compute_stats2
from openquake.risklib import scientific
from openquake.calculators.extract import (
Expand Down Expand Up @@ -654,33 +659,6 @@ def export_aggcurves_csv(ekey, dstore):
return fnames


@export.add(('assetcol', 'csv'))
def export_assetcol_csv(ekey, dstore):
"""
:param ekey: export key, i.e. a pair (datastore key, fmt)
:param dstore: datastore object
"""
assetcol = dstore['assetcol'].array
writer = writers.CsvWriter(fmt=writers.FIVEDIGITS)
df = pandas.DataFrame(assetcol)
tagcol = dstore['assetcol'].tagcol
tagnames = tagcol.tagnames
sorted_cols = sorted([col for col in tagnames if col in df.columns])
unsorted_cols = [col for col in df.columns if col not in tagnames]
df = df[unsorted_cols + sorted_cols]
for asset_idx in range(len(assetcol)):
for tagname in tagnames:
tag_id = df[tagname][asset_idx]
tag_str = tagcol.get_tag(tagname, tag_id).split('=')[1]
df.loc[asset_idx, tagname] = tag_str
df.drop(columns=['ordinal', 'site_id'], inplace=True)
df['id'] = df['id'].apply(lambda x: x.decode('utf8'))
dest = dstore.export_path('%s.%s' % ekey)
md = dstore.metadata
writer.save(df, dest, comment=md)
return [dest]


@export.add(('reinsurance-risk_by_event', 'csv'),
('reinsurance-aggcurves', 'csv'),
('reinsurance-avg_portfolio', 'csv'),
Expand Down Expand Up @@ -720,3 +698,126 @@ def export_node_el(ekey, dstore):
writer = writers.CsvWriter(fmt=writers.FIVEDIGITS)
writer.save(df, dest, comment=dstore.metadata)
return writer.getsaved()


def convert_df_to_vulnerability(loss_type, df):
N = node.Node
root = N('vulnerabilityModel', {'id': "vulnerability_model",
'assetCategory': "buildings",
"lossCategory": loss_type})
descr = N('description', {}, f"{loss_type} vulnerability model")
root.append(descr)
for riskfunc in df.riskfunc:
rfunc = json.loads(riskfunc)['openquake.risklib.scientific.VulnerabilityFunction']
vfunc = N('vulnerabilityFunction',
{'id': rfunc['id'], 'dist': rfunc['distribution_name']})
imls = N('imls', {'imt': rfunc['imt']}, rfunc['imls'])
vfunc.append(imls)
vfunc.append(N('meanLRs', {}, rfunc['mean_loss_ratios']))
vfunc.append(N('covLRs', {}, rfunc['covs']))
root.append(vfunc)
return root


def export_vulnerability_xml(dstore, edir):
fnames = []
for loss_type, df in dstore.read_df('crm').groupby('loss_type'):
nodeobj = convert_df_to_vulnerability(loss_type, df)
dest = os.path.join(edir, '%s_vulnerability.xml' % loss_type)
with open(dest, 'wb') as out:
nrml.write([nodeobj], out)
fnames.append(dest)
return fnames



@export.add(('assetcol', 'csv'))
def export_assetcol_csv(ekey, dstore):
assetcol = dstore['assetcol'].array
writer = writers.CsvWriter(fmt=writers.FIVEDIGITS)
df = pandas.DataFrame(assetcol)
tagcol = dstore['assetcol'].tagcol
tagnames = tagcol.tagnames
sorted_cols = sorted([col for col in tagnames if col in df.columns])
unsorted_cols = [col for col in df.columns if col not in tagnames]
df = df[unsorted_cols + sorted_cols]
for asset_idx in range(len(assetcol)):
for tagname in tagnames:
tag_id = df[tagname][asset_idx]
tag_str = tagcol.get_tag(tagname, tag_id).split('=')[1]
df.loc[asset_idx, tagname] = tag_str
df.drop(columns=['ordinal', 'site_id'], inplace=True)
df['id'] = df['id'].apply(lambda x: x.decode('utf8'))
dest_csv = dstore.export_path('%s.%s' % ekey)
writer.save(df, dest_csv)
return [dest_csv]


def export_exposure(dstore, edir):
"""
:param dstore: datastore object
"""
[dest] = export(('assetcol', 'csv'), dstore)
assetcol_csv = os.path.join(edir, 'assetcol.csv')
shutil.move(dest, assetcol_csv)
tagnames = dstore['assetcol/tagcol'].tagnames
cost_types = dstore.getitem('exposure') # cost_type, area_type, unit
N = node.Node
root = N('exposureModel', {'id': 'exposure', 'category': 'buildings'})
root.append(N('description', {}, 'Generated exposure'))
conversions = N('conversions', {})
costtypes = N('costTypes', {})
for ct in cost_types:
costtypes.append(N('costType', {
'name': ct['loss_type'],
'type': ct['cost_type'],
'unit': ct['unit']}))
conversions.append(costtypes)
root.append(conversions)
root.append(N('occupancyPeriods', {}, 'night'))
root.append(N('tagNames', {}, tagnames))
root.append(N('assets', {}, 'assetcol.csv'))
exposure_xml = os.path.join(edir, 'exposure.xml')
with open(exposure_xml, 'wb') as out:
nrml.write([root], out)
return [exposure_xml, assetcol_csv]


@export.add(('job', 'zip'))
def export_job_zip(ekey, dstore):
"""
Exports:
- job.ini
- rupture.csv
- gsim_lt.xml
- site_model.csv
- exposure.xml and assetcol.csv
- vulnerability functions.xml
- taxonomy_mapping.csv
"""
oq = dstore['oqparam']
edir = tempfile.mkdtemp(dir=config.directory.custom_tmp or tempfile.gettempdir())
fnames = export_exposure(dstore, edir)
job_ini = os.path.join(edir, 'job.ini')
with open(job_ini, 'w') as out:
out.write(oq.to_ini(exposure='exposure.xml'))
fnames.append(job_ini)
csv = extract(dstore, 'ruptures?slice=0&slice=1').array
dest = os.path.join(edir, 'rupture.csv')
with open(dest, 'w') as out:
out.write(csv)
fnames.append(dest)
gsim_lt = dstore['full_lt'].gsim_lt
dest = os.path.join(edir, 'gsim_logic_tree.xml')
with open(dest, 'wb') as out:
nrml.write([gsim_lt.to_node()], out)
fnames.append(dest)
fnames.extend(export_vulnerability_xml(dstore, edir))

dest = os.path.join(edir, 'taxonomy_mapping.csv')
taxmap = dstore.read_df('taxmap')
writer = writers.CsvWriter(fmt=writers.FIVEDIGITS)
del taxmap['taxi']
writer.save(taxmap, dest)
fnames.append(dest)
return fnames
7 changes: 6 additions & 1 deletion openquake/calculators/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -1504,8 +1504,13 @@ def extract_ruptures(dstore, what):
for rgetter in getters.get_rupture_getters(dstore, rupids=rup_ids):
ebrups.extend(rupture.get_ebr(proxy.rec, proxy.geom, rgetter.trt)
for proxy in rgetter.get_proxies(min_mag))
if 'slice' in qdict:
s0, s1 = qdict['slice']
slc = slice(s0, s1)
else:
slc = slice(None)
bio = io.StringIO()
arr = rupture.to_csv_array(ebrups)
arr = rupture.to_csv_array(ebrups[slc])
writers.write_csv(bio, arr, comment=comment)
return bio.getvalue()

Expand Down
12 changes: 5 additions & 7 deletions openquake/commonlib/oqvalidation.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,6 @@
Example: *distance_bin_width = 20*.
Default: no default
ebrisk_maxsize:
INTERNAL
epsilon_star:
A boolean controlling the typology of disaggregation output to be provided.
When True disaggregation is perfomed in terms of epsilon* rather then
Expand Down Expand Up @@ -1149,7 +1146,6 @@ class OqParam(valid.ParamSet):
split_sources = valid.Param(valid.boolean, True)
split_by_gsim = valid.Param(valid.positiveint, 0)
outs_per_task = valid.Param(valid.positiveint, 4)
ebrisk_maxsize = valid.Param(valid.positivefloat, 2E10) # used in ebrisk
tectonic_region_type = valid.Param(valid.utf8, '*')
time_event = valid.Param(
valid.Choice('avg', 'day', 'night', 'transit'), 'avg')
Expand Down Expand Up @@ -2232,15 +2228,17 @@ def docs(cls):
return dic

# tested in run-demos.sh
def to_ini(self):
def to_ini(self, **inputs):
"""
Converts the parameters into a string in .ini format
"""
dic = {k: v for k, v in vars(self).items() if not k.startswith('_')}
dic['inputs'].update(inputs)
del dic['base_path']
del dic['req_site_params']
dic.pop('export_dir', None)
dic.pop('all_cost_types', None)
for k in 'export_dir exports all_cost_types hdf5path ideduc M K A'.split():
dic.pop(k, None)

if 'secondary_perils' in dic:
dic['secondary_perils'] = ' '.join(dic['secondary_perils'])
if 'aggregate_by' in dic:
Expand Down
23 changes: 22 additions & 1 deletion openquake/engine/tests/aristotle_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,33 @@
import unittest
import pytest
from openquake.calculators.checkers import check
from openquake.calculators.export import export


cd = pathlib.Path(__file__).parent

def check_export_job(dstore):
fnames = export(('job', 'zip'), dstore)
fnames = [os.path.basename(f) for f in fnames]
assert fnames == ['exposure.xml',
'assetcol.csv',
'job.ini',
'rupture.csv',
'gsim_logic_tree.xml',
'area_vulnerability.xml',
'contents_vulnerability.xml',
'nonstructural_vulnerability.xml',
'number_vulnerability.xml',
'occupants_vulnerability.xml',
'residents_vulnerability.xml',
'structural_vulnerability.xml',
'taxonomy_mapping.csv']


@pytest.mark.parametrize('n', [1, 2, 3])
def test_aristotle(n):
if not os.path.exists(cd.parent.parent.parent / 'exposure.hdf5'):
raise unittest.SkipTest('Please download exposure.hdf5')
check(cd / f'aristotle{n}/job.ini', what='aggrisk')
calc = check(cd / f'aristotle{n}/job.ini', what='aggrisk')
if n == 1:
check_export_job(calc.datastore)
17 changes: 10 additions & 7 deletions openquake/hazardlib/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,19 @@ def trivial(ctx, name):


class Oq(object):
"""
A mock for OqParam
"""
mea_tau_phi = False
split_sources = True

def __init__(self, **hparams):
vars(self).update(hparams)

@property
def min_iml(self):
return numpy.array([1E-10 for imt in self.imtls])

def get_reqv(self):
if 'reqv' not in self.inputs:
return
Expand Down Expand Up @@ -532,8 +540,6 @@ def __init__(self, trt, gsims, oq, monitor=Monitor(), extraparams=()):
self.cross_correl = param.get('cross_correl') # cond_spectra_test
else: # OqParam
param = vars(oq)
param['split_sources'] = oq.split_sources
param['min_iml'] = oq.min_iml
param['reqv'] = oq.get_reqv()
param['af'] = getattr(oq, 'af', None)
self.cross_correl = oq.cross_correl
Expand Down Expand Up @@ -587,7 +593,7 @@ def _init1(self, param):
self.num_epsilon_bins = param.get('num_epsilon_bins', 1)
self.disagg_bin_edges = param.get('disagg_bin_edges', {})
self.ps_grid_spacing = param.get('ps_grid_spacing')
self.split_sources = param.get('split_sources')
self.split_sources = self.oq.split_sources

def _init2(self, param, extraparams):
for gsim in self.gsims:
Expand Down Expand Up @@ -617,10 +623,7 @@ def _init2(self, param, extraparams):
reqset.add('ch_phiss03')
reqset.add('ch_phiss06')
setattr(self, 'REQUIRES_' + req, reqset)
try:
self.min_iml = param['min_iml']
except KeyError:
self.min_iml = numpy.array([0. for imt in self.imtls])
self.min_iml = self.oq.min_iml
self.reqv = param.get('reqv')
if self.reqv is not None:
self.REQUIRES_DISTANCES.add('repi')
Expand Down
Loading

0 comments on commit 6e27123

Please sign in to comment.