From 8687e64f2f92e44cbb2ed3eb3292b1ad6ccb40a8 Mon Sep 17 00:00:00 2001 From: drotheram Date: Mon, 4 Oct 2021 15:51:02 -0700 Subject: [PATCH] Update hexbin and dsra source (#129) * Updates to new DSRA source repo and hex processing * changs to DSRA repo references * Update to same ES/Kibana version as FGP env * remove hexbin centroid aggregations * modularize data loading from postgres to ES * PSRA results for national model & add bld agg * load national psra instead of p/t subset * switch from P/T to national layer * update Kibana space and hexgrid * add index prefix * update social_fab loading and add hex * add global fabric * Kibana: update index patterns and logs * Kibana logs & patterns * add retry on ES timeout * ES/Kibana updates * lint style fixes * lint style fixes 2 * lint style fixes 3 * lint style fixes 4 * lint style fixes 5 * Update lintly-flake8.yml ignore E501, line too long. Adds undue burden trying to shorten long SQL queries * lint style fixes 6 Co-authored-by: William Chow <59062243+wkhchow@users.noreply.github.com> --- .github/workflows/lintly-flake8.yml | 2 +- docker-compose-run.yml | 4 +- docker-compose.yml | 4 +- python/add_data.sh | 138 +++++++--- python/dsraAllScenariosCduid_postgres2es.py | 45 ++++ python/dsraAllScenariosCsduid_postgres2es.py | 45 ++++ python/dsraAllScenariosDauid_postgres2es.py | 47 ++++ python/dsraAllScenariosEruid_postgres2es.py | 45 ++++ python/dsraAllScenariosSauid_postgres2es.py | 47 ++++ python/dsra_postgres2es.py | 8 +- python/exposure_postgres2es.py | 220 +++++----------- python/hazardThreat_postgres2es.py | 10 +- python/hexgrid_100km_postgres2es.py | 45 ++++ python/hexgrid_10km_postgres2es.py | 45 ++++ python/hexgrid_25km_postgres2es.py | 45 ++++ python/hexgrid_50km_postgres2es.py | 45 ++++ python/hexgrid_5km_postgres2es.py | 45 ++++ python/hexgrid_sauid_postgres2es.py | 45 ++++ python/hmaps_postgres2es.py | 211 +++------------- python/psra_postgres2es.py | 196 +++----------- python/riskDynamics_postgres2es.py | 12 +- python/sauid_postgres2es.py | 45 ++++ python/socialFabric_postgres2es.py | 243 ++++++------------ python/srcLoss_postgres2es.py | 147 +++-------- python/uhs_postgres2es.py | 174 +++---------- python/utils.py | 253 +++++++++++++++++++ sample.env | 1 + 27 files changed, 1185 insertions(+), 982 deletions(-) create mode 100644 python/dsraAllScenariosCduid_postgres2es.py create mode 100644 python/dsraAllScenariosCsduid_postgres2es.py create mode 100644 python/dsraAllScenariosDauid_postgres2es.py create mode 100644 python/dsraAllScenariosEruid_postgres2es.py create mode 100644 python/dsraAllScenariosSauid_postgres2es.py create mode 100644 python/hexgrid_100km_postgres2es.py create mode 100644 python/hexgrid_10km_postgres2es.py create mode 100644 python/hexgrid_25km_postgres2es.py create mode 100644 python/hexgrid_50km_postgres2es.py create mode 100644 python/hexgrid_5km_postgres2es.py create mode 100644 python/hexgrid_sauid_postgres2es.py create mode 100644 python/sauid_postgres2es.py create mode 100644 python/utils.py diff --git a/.github/workflows/lintly-flake8.yml b/.github/workflows/lintly-flake8.yml index e5fed724..95c8f6f5 100644 --- a/.github/workflows/lintly-flake8.yml +++ b/.github/workflows/lintly-flake8.yml @@ -17,4 +17,4 @@ jobs: # Fail if "new" violations detected or "any", default "new" failIf: new # Additional arguments to pass to flake8, default "." (current directory) - args: "--ignore=E121,E123 --per-file-ignores=python/gen_pygeoapi_config.py:E501 ." + args: "--ignore=E121,E123,E501 --per-file-ignores=python/gen_pygeoapi_config.py:E501 ." diff --git a/docker-compose-run.yml b/docker-compose-run.yml index 188b97bd..0121170e 100644 --- a/docker-compose-run.yml +++ b/docker-compose-run.yml @@ -6,7 +6,7 @@ volumes: services: kibana-opendrr: - image: kibana:7.7.1 + image: kibana:7.12.0 environment: ELASTICSEARCH_HOSTS: http://elasticsearch-opendrr:9200 @@ -18,7 +18,7 @@ services: - elasticsearch-opendrr elasticsearch-opendrr: - image: elasticsearch:7.7.1 + image: elasticsearch:7.12.0 environment: - discovery.type=single-node diff --git a/docker-compose.yml b/docker-compose.yml index e00ca2eb..467f402e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,7 +22,7 @@ services: kibana-opendrr: - image: kibana:7.7.1 + image: kibana:7.12.0 environment: ELASTICSEARCH_HOSTS: http://elasticsearch-opendrr:9200 @@ -34,7 +34,7 @@ services: - elasticsearch-opendrr elasticsearch-opendrr: - image: elasticsearch:7.7.1 + image: elasticsearch:7.12.0 environment: - discovery.type=single-node diff --git a/python/add_data.sh b/python/add_data.sh index 4eea2af2..da2456a5 100755 --- a/python/add_data.sh +++ b/python/add_data.sh @@ -27,7 +27,7 @@ ENV_VAR_LIST=( ADD_DATA_PRINT_FUNCNAME=${ADD_DATA_PRINT_FUNCNAME:-true} ADD_DATA_PRINT_LINENO=${ADD_DATA_PRINT_LINENO:-true} -DSRA_REPOSITORY=https://github.com/OpenDRR/scenario-catalogue/tree/master/FINISHED +DSRA_REPOSITORY=https://github.com/OpenDRR/earthquake-scenarios/tree/master/FINISHED PT_LIST=(AB BC MB NB NL NS NT NU ON PE QC SK YT) @@ -373,7 +373,7 @@ read_github_token() { status_code=$(curl --write-out "%{http_code}" --silent --output /dev/null -H "Authorization: token ${GITHUB_TOKEN}" \ -O \ - -L https://api.github.com/repos/OpenDRR/scenario-catalogue/contents/deterministic/outputs) + -L https://api.github.com/repos/OpenDRR/earthquake-scenarios/contents/FINISHED) if [[ "$status_code" -ne 200 ]] ; then echo "GitHub token is not valid! Exiting!" @@ -385,7 +385,7 @@ read_github_token() { # from the OpenDRR/model-factory repository get_model_factory_scripts() { # TODO: Make this more robust - RUN git clone https://github.com/OpenDRR/model-factory.git --depth 1 || (cd model-factory ; RUN git pull) + RUN git clone https://github.com/OpenDRR/model-factory.git --branch update_hexbin_and_dsra_source --depth 1 || (cd model-factory ; RUN git pull) # Copy model-factory scripts to working directory # TODO: Find ways to keep these scripts in their place without copying them all to WORKDIR @@ -402,7 +402,7 @@ get_git_lfs_pointers_of_csv_files() { rm -rf "$base_dir" mkdir -p "$base_dir" ( cd "$base_dir" && \ - for repo in canada-srm2 model-inputs openquake-inputs scenario-catalogue; do + for repo in canada-srm2 model-inputs openquake-inputs earthquake-scenarios; do RUN git clone --filter=blob:none --no-checkout "https://${GITHUB_TOKEN}@github.com/OpenDRR/${repo}.git" is_dry_run || \ ( cd $repo && \ @@ -447,7 +447,7 @@ import_census_boundaries() { RUN run_ogr2ogr "Geometry_$i" done - for i in HexGrid_5km HexGrid_10km HexGrid_25km HexGrid_50km SAUID_HexGrid; do + for i in HexGrid_5km HexGrid_10km HexGrid_25km HexGrid_50km HexGrid_GlobalFabric SAUID_HexGrid SAUID_HexGrid_5km_intersect SAUID_HexGrid_10km_intersect SAUID_HexGrid_25km_intersect SAUID_HexGrid_50km_intersect SAUID_HexGrid_100km_intersect SAUID_HexGrid_GlobalFabric_intersect; do RUN run_ogr2ogr "hexbin_4326/$i" done @@ -579,6 +579,12 @@ generate_indicators() { RUN run_psql Create_MH_risk_sauid_prioritization_prereq_tables.sql RUN run_psql Create_MH_risk_sauid_prioritization_Canada.sql # RUN run_psql Create_MH_risk_sauid_ALL.sql + RUN run_psql Create_hexbin_physical_exposure_aggregation_area_proxy.sql + # RUN run_psql Create_hexbin_physical_exposure_hexbin_aggregation_centroid.sql + RUN run_psql Create_hexbin_MH_risk_sauid_prioritization_aggregation_area.sql + # RUN run_psql Create_hexbin_MH_risk_sauid_prioritization_aggregation_centroid.sql + RUN run_psql Create_hexbin_social_vulnerability_aggregation_area_proxy.sql + # RUN run_psql Create_hexbin_social_vulnerability_aggregation_centroid.sql } ############################################################################################ @@ -671,13 +677,13 @@ import_earthquake_scenarios() { RUN curl -H "Authorization: token ${GITHUB_TOKEN}" \ --retry 999 --retry-max-time 0 \ -o FINISHED.json \ - -L https://api.github.com/repos/OpenDRR/scenario-catalogue/contents/FINISHED + -L https://api.github.com/repos/OpenDRR/earthquake-scenarios/contents/FINISHED - # s_lossesbyasset_ACM6p5_Beaufort_r2_299_b.csv → ACM6p5_Beaufort - RUN mapfile -t EQSCENARIO_LIST < <(jq -r '.[].name | scan("(?<=s_lossesbyasset_).*(?=_r2)")' FINISHED.json) + # s_lossesbyasset_ACM6p5_Beaufort_r1_299_b.csv → ACM6p5_Beaufort + RUN mapfile -t EQSCENARIO_LIST < <(jq -r '.[].name | scan("(?<=s_lossesbyasset_).*(?=_r1)")' FINISHED.json) - # s_lossesbyasset_ACM6p5_Beaufort_r2_299_b.csv → ACM6p5_Beaufort_r2_299_b.csv - RUN mapfile -t EQSCENARIO_LIST_LONGFORM < <(jq -r '.[].name | scan("(?<=s_lossesbyasset_).*r2.*\\.csv")' FINISHED.json) + # s_lossesbyasset_ACM6p5_Beaufort_r1_299_b.csv → ACM6p5_Beaufort_r1_299_b.csv + RUN mapfile -t EQSCENARIO_LIST_LONGFORM < <(jq -r '.[].name | scan("(?<=s_lossesbyasset_).*r1.*\\.csv")' FINISHED.json) LOG "## Importing scenario outputs into PostGIS" for eqscenario in ${EQSCENARIO_LIST[*]}; do @@ -732,7 +738,7 @@ import_shakemap() { import_rupture_model() { LOG "## Importing Rupture Model" -RUN python3 DSRA_ruptures2postgres.py --dsraRuptureDir="https://github.com/OpenDRR/scenario-catalogue/tree/master/deterministic/ruptures" +RUN python3 DSRA_ruptures2postgres.py --dsraRuptureDir="https://github.com/OpenDRR/earthquake-scenarios/tree/master/ruptures" LOG "## Generating indicator views" for item in ${EQSCENARIO_LIST_LONGFORM[*]}; do @@ -777,66 +783,120 @@ export_to_elasticsearch() { sleep 10 done + LOG "## Create Kibana Space" + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/api/spaces/space" -H "kbn-xsrf: true" -d '{"id": "gsc-cgc","name": "GSC-CGC","description" : "Geological Survey of Canada Private Space","color": "#aabbcc","initials": "G"}' + LOG "## Load Probabilistic Model Indicators" # shellcheck disable=SC2154 if [ "$loadPsraModels" = true ]; then LOG "Creating PSRA indices in Elasticsearch" - for PT in ${PT_LIST[*]}; do - RUN python3 psra_postgres2es.py --province="$PT" --dbview="all_indicators" --idField="building" - RUN python3 psra_postgres2es.py --province="$PT" --dbview="all_indicators" --idField="sauid" - RUN python3 hmaps_postgres2es.py --province="$PT" - RUN python3 uhs_postgres2es.py --province="$PT" - RUN python3 srcLoss_postgres2es.py --province="$PT" - done + RUN python3 psra_postgres2es.py + RUN python3 hmaps_postgres2es.py + RUN python3 uhs_postgres2es.py + RUN python3 srcLoss_postgres2es.py LOG "Creating PSRA Kibana Index Patterns" - RUN curl -X POST -H "securitytenant: global" -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/api/saved_objects/index-pattern/psra*all_indicators_s" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"psra*all_indicators_s"}}' - RUN curl -X POST -H "securitytenant: global" -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/api/saved_objects/index-pattern/psra*all_indicators_b" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"psra*all_indicators_b"}}' - RUN curl -X POST -H "securitytenant: global" -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/api/saved_objects/index-pattern/psra_*_hmaps" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"psra_*_hmaps"}}' - RUN curl -X POST -H "securitytenant: global" -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/api/saved_objects/index-pattern/psra_*_uhs" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"psra_*_uhs"}}' - RUN curl -X POST -H "securitytenant: global" -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/api/saved_objects/index-pattern/psra_*_srcLoss" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"psra_*_srcLoss"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_psra_indicators_s" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_psra_indicators_s"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_psra_indicators_b" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_psra_indicators_b"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_psra_hmaps" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_psra_hmaps"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_psra_uhs" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_psra_uhs"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_psra_srcLoss" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_psra_srcLoss"}}' fi # Load Deterministic Model Indicators # shellcheck disable=SC2154 if [[ "$loadDsraScenario" = true ]]; then - for eqscenario in ${EQSCENARIO_LIST[*]}; do - LOG "Creating Elasticsearch indexes for DSRA" - RUN python3 dsra_postgres2es.py --eqScenario="$eqscenario" --dbview="all_indicators" --idField="building" - RUN python3 dsra_postgres2es.py --eqScenario="$eqscenario" --dbview="all_indicators" --idField="sauid" - done +for eqscenario in ${EQSCENARIO_LIST[*]}; do + LOG "Creating Elasticsearch indexes for DSRA" + RUN python3 dsra_postgres2es.py --eqScenario="$eqscenario" --dbview="all_indicators" --idField="building" + RUN python3 dsra_postgres2es.py --eqScenario="$eqscenario" --dbview="all_indicators" --idField="sauid" + + # LOG "Creating DSRA Kibana Index Patterns" + # Need to develop saved object workflow for automated index patern generation + # RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_dsra_${eqscenario}_all_indicators_s" -H "kbn-xsrf: true" -d "{ 'attributes': { 'title':'opendrr_dsra_${eqscenario}_all_indicators_s'}}" + # RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_dsra_${eqscenario}_all_indicators_b" -H "kbn-xsrf: true" -d "{ 'attributes': { 'title':'opendrr_dsra_${eqscenario}_all_indicators_b'}}" +done fi # Load Hazard Threat Views # shellcheck disable=SC2154 - if [[ "$loadHazardThreat" = true ]]; then - # All Indicators - RUN python3 hazardThreat_postgres2es.py --type="all_indicators" --aggregation="sauid" --geometry=geom_poly --idField="Sauid" - fi + # 2021/09/21 DR - Keeping Hazard Threah and Risk Dynamics out of ES for the time being + # if [[ "$loadHazardThreat" = true ]]; then + # # All Indicators + # LOG "Creating Elasticsearch indexes for Hazard Threat" + # RUN python3 hazardThreat_postgres2es.py --type="all_indicators" --aggregation="sauid" --geometry=geom_poly --idField="Sauid" + + # LOG "Creating Hazard Threat Kibana Index Patterns" + # RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_hazard_threat_all_indicators_s" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_hazard_threat_all_indicators_s"}}' + # fi # Load physical exposure indicators # shellcheck disable=SC2154 if [[ $loadPhysicalExposure = true ]]; then - RUN python3 exposure_postgres2es.py --type="all_indicators" --aggregation="building" --geometry=geom_point --idField="BldgID" - RUN python3 exposure_postgres2es.py --type="all_indicators" --aggregation="sauid" --geometry=geom_poly --idField="Sauid" + LOG "Creating Elasticsearch indexes for Physical Exposure" + RUN python3 exposure_postgres2es.py --aggregation="building" --geometry=geom_point + RUN python3 exposure_postgres2es.py --aggregation="sauid" --geometry=geom_poly + + LOG "Creating Exposure Kibana Index Patterns" + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_nhsl_physical_exposure_all_indicators_s" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_nhsl_physical_exposure_all_indicators_s"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_nhsl_physical_exposure_all_indicators_b" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_nhsl_physical_exposure_all_indicators_b"}}' fi # Load Risk Dynamics Views # shellcheck disable=SC2154 - if [[ $loadRiskDynamics = true ]]; then - RUN python3 riskDynamics_postgres2es.py --type="all_indicators" --aggregation="sauid" --geometry=geom_point --idField="ghslID" - fi + # 2021/09/21 DR - Keeping Hazard Threah and Risk Dynamics out of ES for the time being + # if [[ $loadRiskDynamics = true ]]; then + # LOG "Creating Elasticsearch indexes for Risk Dynamics" + # RUN python3 riskDynamics_postgres2es.py --type="all_indicators" --aggregation="sauid" --geometry=geom_point --idField="ghslID" + + # LOG "Creating Risk Dynamics Kibana Index Patterns" + # RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_nhsl_risk_dynamics_all_indicators" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_nhsl_risk_dynamics_all_indicators"}}' + # fi # Load Social Fabric Views # shellcheck disable=SC2154 if [[ $loadSocialFabric = true ]]; then - RUN python3 socialFabric_postgres2es.py --type="all_indicators" --aggregation="sauid" --geometry=geom_poly --idField="Sauid" + LOG "Creating Elasticsearch indexes for Social Fabric" + RUN python3 socialFabric_postgres2es.py --aggregation="sauid" --geometry=geom_poly --sortfield="Sauid" + RUN python3 socialFabric_postgres2es.py --aggregation="hexgrid_5km" --geometry=geom --sortfield="gridid_5" + RUN python3 socialFabric_postgres2es.py --aggregation="hexgrid_10km" --geometry=geom --sortfield="gridid_10" + RUN python3 socialFabric_postgres2es.py --aggregation="hexgrid_25km" --geometry=geom --sortfield="gridid_25" + RUN python3 socialFabric_postgres2es.py --aggregation="hexgrid_50km" --geometry=geom --sortfield="gridid_50" + RUN python3 socialFabric_postgres2es.py --aggregation="hexgrid_100km" --geometry=geom --sortfield="gridid_100" + + LOG "Creating Social Fabric Kibana Index Patterns" + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_nhsl_social_fabric_all_indicators_s" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_nhsl_social_fabric_all_indicators_s"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_nhsl_social_fabric_all_indicators_hexgrid_5km" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_nhsl_social_fabric_all_indicators_hexgrid_5km"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_nhsl_social_fabric_all_indicators_hexgrid_10km" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_nhsl_social_fabric_all_indicators_hexgrid_10km"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_nhsl_social_fabric_all_indicators_hexgrid_25km" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_nhsl_social_fabric_all_indicators_hexgrid_25km"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_nhsl_social_fabric_all_indicators_hexgrid_50km" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_nhsl_social_fabric_all_indicators_hexgrid_50km"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_nhsl_social_fabric_all_indicators_hexgrid_100km" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_nhsl_social_fabric_all_indicators_hexgrid_100km"}}' + fi + + # Load Hexgrid Geometries + # shellcheck disable=SC2154 + if [[ $loadHexGrid = true ]]; then + LOG "Creating Elasticsearch indexes for Hexgrids" + RUN python3 hexgrid_5km_postgres2es.py + RUN python3 hexgrid_10km_postgres2es.py + RUN python3 hexgrid_25km_postgres2es.py + RUN python3 hexgrid_50km_postgres2es.py + RUN python3 hexgrid_100km_postgres2es.py + RUN python3 hexgrid_sauid_postgres2es.py + + LOG "Creating HexGrid Kibana Index Patterns" + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_hexgrid_5km" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_hexgrid_5km"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_hexgrid_10km" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_hexgrid_10km"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_hexgrid_25km" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_hexgrid_25km"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_hexgrid_50km" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_hexgrid_50km"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_hexgrid_100km" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_hexgrid_100km"}}' + RUN curl -X POST -H "Content-Type: application/json" "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/index-pattern/opendrr_sauid_hexgrid" -H "kbn-xsrf: true" -d '{ "attributes": { "title":"opendrr_sauid_hexgrid"}}' fi } load_kibana_saved_objects() { LOG "# Loading Kibana Saved Objects" - RUN curl -X POST -H "securitytenant: global" "${KIBANA_ENDPOINT}/api/saved_objects/_import" -H "kbn-xsrf: true" --form file=@kibanaSavedObjects.ndjson + RUN curl -X POST "${KIBANA_ENDPOINT}/s/gsc-cgc/api/saved_objects/_import" -H "kbn-xsrf: true" --form file=@kibanaSavedObjects.ndjson } diff --git a/python/dsraAllScenariosCduid_postgres2es.py b/python/dsraAllScenariosCduid_postgres2es.py new file mode 100644 index 00000000..a7195c89 --- /dev/null +++ b/python/dsraAllScenariosCduid_postgres2es.py @@ -0,0 +1,45 @@ +# ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden +# ================================================================= + + +import utils + + +def main(): + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } + } + } + }), + view="opendrr_dsra_all_scenarios_cduid", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom) \ + FROM dsra.dsra_all_scenarios_cduid \ + ORDER BY dsra_all_scenarios_cduid."cduid" \ + LIMIT {limit} \ + OFFSET {offset}' + ) + + table.postgis2es() + + return + + +if __name__ == '__main__': + main() diff --git a/python/dsraAllScenariosCsduid_postgres2es.py b/python/dsraAllScenariosCsduid_postgres2es.py new file mode 100644 index 00000000..ba6735cf --- /dev/null +++ b/python/dsraAllScenariosCsduid_postgres2es.py @@ -0,0 +1,45 @@ +# ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden +# ================================================================= + + +import utils + + +def main(): + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } + } + } + }), + view="opendrr_dsra_all_scenarios_csduid", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom) \ + FROM dsra.dsra_all_scenarios_csduid \ + ORDER BY dsra_all_scenarios_csduid."csduid" \ + LIMIT {limit} \ + OFFSET {offset}' + ) + + table.postgis2es() + + return + + +if __name__ == '__main__': + main() diff --git a/python/dsraAllScenariosDauid_postgres2es.py b/python/dsraAllScenariosDauid_postgres2es.py new file mode 100644 index 00000000..8fa2ba1c --- /dev/null +++ b/python/dsraAllScenariosDauid_postgres2es.py @@ -0,0 +1,47 @@ +# ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden +# ================================================================= + + +from utils import ESConnection +from utils import PostGISdataset +from utils import PostGISConnection + + +def main(): + table = PostGISdataset( + PostGISConnection(), + ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } + } + } + }), + view="opendrr_dsra_all_scenarios_dauid", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom) \ + FROM dsra.dsra_all_scenarios_dauid \ + ORDER BY dsra_all_scenarios_dauid."dauid" \ + LIMIT {limit} \ + OFFSET {offset}' + ) + + table.postgis2es() + + return + + +if __name__ == '__main__': + main() diff --git a/python/dsraAllScenariosEruid_postgres2es.py b/python/dsraAllScenariosEruid_postgres2es.py new file mode 100644 index 00000000..10987581 --- /dev/null +++ b/python/dsraAllScenariosEruid_postgres2es.py @@ -0,0 +1,45 @@ +# ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden +# ================================================================= + + +import utils + + +def main(): + eruidTable = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } + } + } + }), + view="opendrr_dsra_all_scenarios_eruid", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom) \ + FROM dsra.dsra_all_scenarios_eruid \ + ORDER BY dsra_all_scenarios_eruid."eruid" \ + LIMIT {limit} \ + OFFSET {offset}' + ) + + eruidTable.postgis2es() + + return + + +if __name__ == '__main__': + main() diff --git a/python/dsraAllScenariosSauid_postgres2es.py b/python/dsraAllScenariosSauid_postgres2es.py new file mode 100644 index 00000000..d3df2f34 --- /dev/null +++ b/python/dsraAllScenariosSauid_postgres2es.py @@ -0,0 +1,47 @@ +# ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden +# ================================================================= + + +from utils import ESConnection +from utils import PostGISdataset +from utils import PostGISConnection + + +def main(): + table = PostGISdataset( + PostGISConnection(), + ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } + } + } + }), + view="opendrr_dsra_all_scenarios_sauid", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom) \ + FROM dsra.dsra_all_scenarios_sauid \ + ORDER BY dsra_all_scenarios_sauid."sauid" \ + LIMIT {limit} \ + OFFSET {offset}' + ) + + table.postgis2es() + + return + + +if __name__ == '__main__': + main() diff --git a/python/dsra_postgres2es.py b/python/dsra_postgres2es.py index 97a7a715..0b07a68c 100644 --- a/python/dsra_postgres2es.py +++ b/python/dsra_postgres2es.py @@ -43,8 +43,8 @@ def main(): es = Elasticsearch([auth.get('es', 'es_endpoint')], http_auth=(auth.get('es', 'es_un'), auth.get('es', 'es_pw'))) - if es.indices.exists(view): - es.indices.delete(view) + if es.indices.exists("opendrr_" + view): + es.indices.delete("opendrr_" + view) if args.idField.lower() == 'sauid': id_field = 'Sauid' settings = { @@ -78,7 +78,7 @@ def main(): } } } - es.indices.create(index=view, body=settings, request_timeout=90) + es.indices.create(index="opendrr_" + view, body=settings, request_timeout=90) while True: if args.idField.lower() == 'sauid': @@ -158,7 +158,7 @@ def main(): default=decimal_default) d = json.loads(geojsonobject) helpers.bulk(es, - gendata(d, view, id_field), + gendata(d, "opendrr_" + view, id_field), raise_on_error=False) else: diff --git a/python/exposure_postgres2es.py b/python/exposure_postgres2es.py index 658bb674..6bfc8ba5 100644 --- a/python/exposure_postgres2es.py +++ b/python/exposure_postgres2es.py @@ -1,43 +1,16 @@ # ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT # -# Authors: Tom Kralidis -# -# Copyright (c) 2020 Tom Kralidis -# -# Permission is hereby granted, free of charge, to any person -# obtaining a copy of this software and associated documentation -# files (the "Software"), to deal in the Software without -# restriction, including without limitation the rights to use, -# copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the -# Software is furnished to do so, subject to the following -# conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES -# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT -# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR -# OTHER DEALINGS IN THE SOFTWARE. +# Copyright (C) 2020-2021 Government of Canada # +# Main Authors: Drew Rotheram +# Joost van Ulden # ================================================================= -import json -import os -import sys -import psycopg2 -import configparser -import logging +import utils import argparse -from elasticsearch import Elasticsearch -from elasticsearch import helpers -from decimal import Decimal ''' Script to convert Physical Exposure Views to ElasticSearch Index @@ -52,144 +25,75 @@ #Main Function def main(): - logFileName = '{}.log'.format(os.path.splitext(sys.argv[0])[0]) - logging.basicConfig(level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[logging.FileHandler(logFileName), - logging.StreamHandler()]) - tracer = logging.getLogger('elasticsearch') - tracer.setLevel(logging.ERROR) - tracer.addHandler(logging.FileHandler('{}.log'.format(os.path.splitext(sys.argv[0])[0]))) - auth = get_config_params('config.ini') args = parse_args() # index settings if args.geometry == "geom_poly": - settings = { - 'settings': { - 'number_of_shards': 1, - 'number_of_replicas': 0 - }, - 'mappings': { - 'properties': { - 'geometry': { - 'type': 'geo_shape' + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } } } - } - } + }), + view="opendrr_nhsl_physical_exposure_all_indicators_{agg}".format(**{ + 'agg': args.aggregation[0].lower()}), + sqlquerystring='SELECT *, ST_AsGeoJSON(geom_poly) \ + FROM \ + results_nhsl_physical_exposure.nhsl_physical_exposure_all_indicators_{agg} \ + LIMIT {{limit}} \ + OFFSET {{offset}}'.format(**{ + 'agg': args.aggregation[0].lower()}) + ) elif args.geometry == "geom_point": - settings = { - 'settings': { - 'number_of_shards': 1, - 'number_of_replicas': 0 - }, - 'mappings': { - 'properties': { - 'geometry': { - 'properties': { - 'coordinates': { - 'type': 'geo_point' - } + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'coordinates': { + 'type': 'geo_point' + }, + 'geometry': { + 'type': 'geo_shape' } } } - } - } - - view = "nhsl_physical_exposure_{type}_{aggregation}".format(**{ - 'type': args.type, - 'aggregation': args.aggregation[0].lower()}) - id_field = args.idField - - # es = Elasticsearch() - es = Elasticsearch([auth.get('es', 'es_endpoint')], - http_auth=(auth.get('es', 'es_un'), - auth.get('es', 'es_pw'))) - # create index - if es.indices.exists(view): - es.indices.delete(view) - es.indices.create(index=view, body=settings, request_timeout=90) - - sqlquerystring = 'SELECT *, ST_AsGeoJSON({geometry}) \ - FROM "results_nhsl_physical_exposure"."{view}"'.format(**{ - 'geometry': args.geometry, 'view': view}) + }), + view="opendrr_nhsl_physical_exposure_all_indicators_{agg}".format(**{ + 'agg': args.aggregation[0].lower()}), + sqlquerystring='SELECT *, ST_AsGeoJSON(geom_point) \ + FROM \ + results_nhsl_physical_exposure.nhsl_physical_exposure_all_indicators_{agg} \ + LIMIT {{limit}} \ + OFFSET {{offset}}'.format(**{ + 'agg': args.aggregation[0].lower()}) + ) + + table.postgis2es() - connection = None - try: - #Connect to the PostGIS database hosted on RDS - connection = psycopg2.connect(user = auth.get('rds', 'postgres_un'), - password = auth.get('rds', 'postgres_pw'), - host = auth.get('rds', 'postgres_host'), - port = auth.get('rds', 'postgres_port'), - database = auth.get('rds', 'postgres_db')) - #Query the entire view with the geometries in geojson format - cur = connection.cursor() - cur.execute(sqlquerystring) - rows = cur.fetchall() - columns = [name[0] for name in cur.description] - geomIndex = columns.index('st_asgeojson') - feature_collection = {'type': 'FeatureCollection', 'features': []} - - #Format the table into a geojson format for ES/Kibana consumption - i = 0 - for row in rows: - feature = { - 'type': 'Feature', - 'geometry': json.loads(row[geomIndex]), - 'properties': {}, - } - for index, column in enumerate(columns): - if column != "st_asgeojson": - value =row[index] - if isinstance(value, Decimal): - value = float(value) - feature['properties'][column] = value - feature_collection['features'].append(feature) - i+=1 - if i==10000: - geojsonobject = json.dumps(feature_collection, indent=2) - d = json.loads(geojsonobject) - helpers.bulk(es, gendata(d, view, id_field), raise_on_error=False, request_timeout=30) - feature_collection = {'type': 'FeatureCollection', 'features': []} - i=0 - geojsonobject = json.dumps(feature_collection, indent=2) - d = json.loads(geojsonobject) - helpers.bulk(es, gendata(d, view, id_field), raise_on_error=False, request_timeout=30) - feature_collection = {'type': 'FeatureCollection', 'features': []} - - except (Exception, psycopg2.Error) as error : - logging.error(error) - - finally: - if(connection): - # cursor.close() - connection.close() return -def gendata(data, view, id_field): - for item in data['features']: - yield { - "_index": view, - "_id": item['properties'][id_field], - "_source": item - } - -def get_config_params(args): - """ - Parse Input/Output columns from supplied *.ini file - """ - configParseObj = configparser.ConfigParser() - configParseObj.read(args) - return configParseObj def parse_args(): parser = argparse.ArgumentParser(description="load exposure PostGIS to ES") - parser.add_argument("--type", - type=str, - help="assets building(s) or people", - required=True) + # parser.add_argument("--type", + # type=str, + # help="assets building(s) or people", + # required=True) parser.add_argument("--aggregation", type=str, help="building or Sauid", @@ -198,10 +102,10 @@ def parse_args(): type=str, help="geom_point or geom_poly", required=True) - parser.add_argument("--idField", - type=str, - help="Field to use as Index ID. AssetID or Sauid", - required=True) + # parser.add_argument("--idField", + # type=str, + # help="Field to use as Index ID. AssetID or Sauid", + # required=True) args = parser.parse_args() return args diff --git a/python/hazardThreat_postgres2es.py b/python/hazardThreat_postgres2es.py index bdc09aa3..f59df93f 100644 --- a/python/hazardThreat_postgres2es.py +++ b/python/hazardThreat_postgres2es.py @@ -140,14 +140,16 @@ def main(): connection.close() # create index - if es.indices.exists(view): - es.indices.delete(view) + if es.indices.exists("opendrr_" + view): + es.indices.delete("opendrr_" + view) - es.indices.create(index=view, body=settings, request_timeout=90) + es.indices.create(index="opendrr_" + view, body=settings, request_timeout=90) d = json.loads(geojsonobject) - helpers.bulk(es, gendata(d, view, id_field), raise_on_error=False) + helpers.bulk(es, + gendata(d, "opendrr_" + view, id_field), + raise_on_error=False) return diff --git a/python/hexgrid_100km_postgres2es.py b/python/hexgrid_100km_postgres2es.py new file mode 100644 index 00000000..79b75a3f --- /dev/null +++ b/python/hexgrid_100km_postgres2es.py @@ -0,0 +1,45 @@ +# ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden +# ================================================================= + + +import utils + + +def main(): + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } + } + } + }), + view="opendrr_hexgrid_100km", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom) \ + FROM boundaries."HexGrid_100km" \ + ORDER BY "HexGrid_100km"."gridid_100" \ + LIMIT {limit} \ + OFFSET {offset}' + ) + + table.postgis2es() + + return + + +if __name__ == '__main__': + main() diff --git a/python/hexgrid_10km_postgres2es.py b/python/hexgrid_10km_postgres2es.py new file mode 100644 index 00000000..eb60b65b --- /dev/null +++ b/python/hexgrid_10km_postgres2es.py @@ -0,0 +1,45 @@ +# ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden +# ================================================================= + + +import utils + + +def main(): + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } + } + } + }), + view="opendrr_hexgrid_10km", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom) \ + FROM boundaries."HexGrid_10km" \ + ORDER BY "HexGrid_10km"."gridid_10" \ + LIMIT {limit} \ + OFFSET {offset}' + ) + + table.postgis2es() + + return + + +if __name__ == '__main__': + main() diff --git a/python/hexgrid_25km_postgres2es.py b/python/hexgrid_25km_postgres2es.py new file mode 100644 index 00000000..fbe38ed1 --- /dev/null +++ b/python/hexgrid_25km_postgres2es.py @@ -0,0 +1,45 @@ +# ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden +# ================================================================= + + +import utils + + +def main(): + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } + } + } + }), + view="opendrr_hexgrid_25km", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom) \ + FROM boundaries."HexGrid_25km" \ + ORDER BY "HexGrid_25km"."gridid_25" \ + LIMIT {limit} \ + OFFSET {offset}' + ) + + table.postgis2es() + + return + + +if __name__ == '__main__': + main() diff --git a/python/hexgrid_50km_postgres2es.py b/python/hexgrid_50km_postgres2es.py new file mode 100644 index 00000000..e5bfa971 --- /dev/null +++ b/python/hexgrid_50km_postgres2es.py @@ -0,0 +1,45 @@ +# ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden +# ================================================================= + + +import utils + + +def main(): + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } + } + } + }), + view="opendrr_hexgrid_50km", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom) \ + FROM boundaries."HexGrid_50km" \ + ORDER BY "HexGrid_50km"."gridid_50" \ + LIMIT {limit} \ + OFFSET {offset}' + ) + + table.postgis2es() + + return + + +if __name__ == '__main__': + main() diff --git a/python/hexgrid_5km_postgres2es.py b/python/hexgrid_5km_postgres2es.py new file mode 100644 index 00000000..1a4609a2 --- /dev/null +++ b/python/hexgrid_5km_postgres2es.py @@ -0,0 +1,45 @@ +# ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden +# ================================================================= + + +import utils + + +def main(): + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } + } + } + }), + view="opendrr_hexgrid_5km", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom) \ + FROM boundaries."HexGrid_5km" \ + ORDER BY "HexGrid_5km"."gridid_5" \ + LIMIT {limit} \ + OFFSET {offset}' + ) + + table.postgis2es() + + return + + +if __name__ == '__main__': + main() diff --git a/python/hexgrid_sauid_postgres2es.py b/python/hexgrid_sauid_postgres2es.py new file mode 100644 index 00000000..be8203ed --- /dev/null +++ b/python/hexgrid_sauid_postgres2es.py @@ -0,0 +1,45 @@ +# ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden +# ================================================================= + + +import utils + + +def main(): + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } + } + } + }), + view="opendrr_sauid_hexgrid", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom) \ + FROM boundaries."SAUID_HexGrid" \ + ORDER BY "SAUID_HexGrid"."sauid" \ + LIMIT {limit} \ + OFFSET {offset}' + ) + + table.postgis2es() + + return + + +if __name__ == '__main__': + main() diff --git a/python/hmaps_postgres2es.py b/python/hmaps_postgres2es.py index e29861b7..fcc6d956 100644 --- a/python/hmaps_postgres2es.py +++ b/python/hmaps_postgres2es.py @@ -1,196 +1,43 @@ # ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT # -# Authors: Drew Rotheram +# Copyright (C) 2020-2021 Government of Canada # +# Main Authors: Drew Rotheram +# Joost van Ulden # ================================================================= -import json -import os -import sys -import psycopg2 -import configparser -import logging -import argparse -import decimal -from elasticsearch import Elasticsearch -from elasticsearch import helpers +import utils -''' -Script to convert hmap indicator views to ElasticSearch Index -Can be run from the command line with mandatory arguments -Run this script with a command like: -python3 hmap_postgres2es.py --province=${PT} -''' - -# Main Function def main(): - logFileName = '{}.log'.format(os.path.splitext(sys.argv[0])[0]) - logging.basicConfig(level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[logging.FileHandler(logFileName), - logging.StreamHandler()]) - auth = get_config_params('config.ini') - args = parse_args() - view = "psra_{province}_hmaps".format(**{ - 'province': args.province.lower()}) - limit = 10000 - offset = 0 - - # create index - es = Elasticsearch([auth.get('es', 'es_endpoint')], - http_auth=(auth.get('es', 'es_un'), - auth.get('es', 'es_pw'))) - if es.indices.exists(view): - es.indices.delete(view) - # if args.idField == 'sauid': - # id_field = 'Sauid' - # settings = { - # 'settings': { - # 'number_of_shards': 1, - # 'number_of_replicas': 0 - # }, - # 'mappings': { - # 'properties': { - # 'geometry': { - # 'type': 'geo_shape' - # } - # } - # } - # } - # elif args.idField == 'building': - # id_field = 'AssetID' - settings = { - 'settings': { - 'number_of_shards': 1, - 'number_of_replicas': 0 - }, - 'mappings': { - 'properties': { - 'coordinates': { - 'type': 'geo_point' - }, - 'geometry': { - 'type': 'geo_shape' + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } } } - } - } - es.indices.create(index=view, body=settings, request_timeout=90) - - while True: - # if args.idField == 'sauid': - # id_field = 'Sauid' - # sqlquerystring = 'SELECT *, ST_AsGeoJSON(geom_poly) \ - # FROM results_hmap_{eqScenario}.{view} \ - # ORDER BY {view}."Sauid" \ - # LIMIT {limit} \ - # OFFSET {offset}'.format(**{'eqScenario': args.eqScenario, - # 'view': view, - # 'limit': limit, - # 'offset': offset}) - - # elif args.idField == 'building': - # id_field = 'AssetID' - sqlquerystring = 'SELECT *, ST_AsGeoJSON(geom) \ - FROM results_psra_{province}.{view} \ - ORDER BY {view}."geom" \ - LIMIT {limit} \ - OFFSET {offset}'.format(**{'province': args.province.lower(), - 'view': view, - 'limit': limit, - 'offset': offset}) - offset += limit - connection = None - try: - # Connect to the PostGIS database - connection = psycopg2.connect(user=auth.get('rds', - 'postgres_un'), - password=auth.get('rds', - 'postgres_pw'), - host=auth.get('rds', - 'postgres_host'), - port=auth.get('rds', - 'postgres_port'), - database=auth.get('rds', - 'postgres_db')) - # Query the entire view with the geometries in geojson format - cur = connection.cursor() - cur.execute(sqlquerystring) - rows = cur.fetchall() - if rows: - columns = [name[0] for name in cur.description] - geomIndex = columns.index('st_asgeojson') - feature_collection = {'type': 'FeatureCollection', - 'features': []} - - # Format table into a geojson format for ES/Kibana consumption - for row in rows: - coordinates = json.loads(row[geomIndex])['coordinates'] - feature = { - 'type': 'Feature', - 'geometry': json.loads(row[geomIndex]), - 'coordinates': coordinates, - 'properties': {}, - } - for index, column in enumerate(columns): - if column != "st_asgeojson": - value = row[index] - feature['properties'][column] = value - - feature_collection['features'].append(feature) - geojsonobject = json.dumps(feature_collection, - indent=2, - default=decimal_default) - d = json.loads(geojsonobject) - helpers.bulk(es, - gendata(d, view), - raise_on_error=False) - - else: - if(connection): - # cursor.close() - connection.close() - return - - except (Exception, psycopg2.Error) as error: - logging.error(error) - - -def gendata(data, view): - for item in data['features']: - yield { - "_index": view, - "_source": item - } - - -# Function to handle decimal encoder error -def decimal_default(obj): - if isinstance(obj, decimal.Decimal): - return float(obj) - raise TypeError - - -def get_config_params(args): - """ - Parse Input/Output columns from supplied *.ini file - """ - configParseObj = configparser.ConfigParser() - configParseObj.read(args) - return configParseObj - - -def parse_args(): - parser = argparse.ArgumentParser(description="script description") - parser.add_argument("--province", - type=str, - help="Two letters only", - required=True) - args = parser.parse_args() - return args - + }), + view="opendrr_psra_hmaps", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom) \ + FROM results_psra_national."psra_hmaps" \ + ORDER BY "psra_hmaps"."geom" \ + LIMIT {limit} \ + OFFSET {offset}' + ) + table.postgis2es() + + return if __name__ == '__main__': main() diff --git a/python/psra_postgres2es.py b/python/psra_postgres2es.py index c3a1251c..50be3405 100644 --- a/python/psra_postgres2es.py +++ b/python/psra_postgres2es.py @@ -1,47 +1,19 @@ # ================================================================= - +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden # ================================================================= -import json -import os -import sys -import psycopg2 -import configparser -import logging -import argparse -import decimal - -from elasticsearch import Elasticsearch -from elasticsearch import helpers - -''' -Script to convert PSRA indicator views to ElasticSearch Index -Can be run from the command line with mandatory arguments -Run this script with a command like: -python3 psra_postgres2es.py --province={PT} -''' - +import utils -# Main Function def main(): - logFileName = '{}.log'.format(os.path.splitext(sys.argv[0])[0]) - logging.basicConfig(level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[logging.FileHandler(logFileName), - logging.StreamHandler()]) - auth = get_config_params('config.ini') - args = parse_args() - view = "psra_{province}_{dbview}_{idField}".format(**{ - 'province': args.province, - 'dbview': args.dbview, - 'idField': args.idField[0]}).lower() - if args.idField.lower() == 'sauid': - id_field = 'Sauid' - sqlquerystring = 'SELECT *, ST_AsGeoJSON(geom_poly) \ - FROM results_psra_{province}.{view}'.format(**{ - 'province': args.province, - 'view': view}) - settings = { + psraTable = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ 'settings': { 'number_of_shards': 1, 'number_of_replicas': 0 @@ -53,15 +25,20 @@ def main(): } } } - } - - elif args.idField.lower() == 'building': - id_field = 'AssetID' - sqlquerystring = 'SELECT *, ST_AsGeoJSON(geom_point) \ - FROM results_psra_{province}.{view}'.format(**{ - 'province': args.province, - 'view': view}) - settings = { + }), + view="opendrr_psra_indicators_s", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom_poly) \ + FROM results_psra_national.psra_all_indicators_s \ + ORDER BY psra_all_indicators_s."Sauid" \ + LIMIT {limit} \ + OFFSET {offset}' + ) + + psraTable.postgis2es() + + psraTable = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ 'settings': { 'number_of_shards': 1, 'number_of_replicas': 0 @@ -76,123 +53,18 @@ def main(): } } } - } - - # es = Elasticsearch() - es = Elasticsearch([auth.get('es', 'es_endpoint')], - http_auth=(auth.get('es', 'es_un'), - auth.get('es', 'es_pw'))) - connection = None - try: - # Connect to the PostGIS database hosted on RDS - connection = psycopg2.connect(user=auth.get('rds', 'postgres_un'), - password=auth.get('rds', 'postgres_pw'), - host=auth.get('rds', 'postgres_host'), - port=auth.get('rds', 'postgres_port'), - database=auth.get('rds', 'postgres_db')) - # Query the entire view with the geometries in geojson format - cur = connection.cursor() - cur.execute(sqlquerystring) - rows = cur.fetchall() - columns = [name[0] for name in cur.description] - geomIndex = columns.index('st_asgeojson') - feature_collection = {'type': 'FeatureCollection', 'features': []} - - # Format the table into a geojson format for ES/Kibana consumption - for row in rows: - if args.idField.lower() == 'sauid': - feature = { - 'type': 'Feature', - 'geometry': json.loads(row[geomIndex]), - 'properties': {}, - } - for index, column in enumerate(columns): - if column != "st_asgeojson": - value = row[index] - feature['properties'][column] = value - - elif args.idField.lower() == 'building': - coordinates = json.loads(row[geomIndex])['coordinates'] - feature = { - 'type': 'Feature', - 'geometry': json.loads(row[geomIndex]), - 'coordinates': coordinates, - 'properties': {}, - } - for index, column in enumerate(columns): - if column != "st_asgeojson": - value = row[index] - feature['properties'][column] = value - - feature_collection['features'].append(feature) - geojsonobject = json.dumps(feature_collection, - indent=2, - default=decimal_default) - - except (Exception, psycopg2.Error) as error: - logging.error(error) - - finally: - if(connection): - # cursor.close() - connection.close() + }), + view="opendrr_psra_indicators_b", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom_point) \ + FROM results_psra_national.psra_all_indicators_b \ + ORDER BY psra_all_indicators_b."AssetID" \ + LIMIT {limit} \ + OFFSET {offset}' + ) - # create index - if es.indices.exists(view): - es.indices.delete(view) - - es.indices.create(index=view, body=settings, request_timeout=90) - - d = json.loads(geojsonobject) - - helpers.bulk(es, gendata(d, view, id_field), raise_on_error=False) + psraTable.postgis2es() return - -def gendata(data, view, id_field): - for item in data['features']: - yield { - "_index": view, - "_id": item['properties'][id_field], - "_source": item - } - - -# Function to handle decimal encoder error -def decimal_default(obj): - if isinstance(obj, decimal.Decimal): - return float(obj) - raise TypeError - - -def get_config_params(args): - """ - Parse Input/Output columns from supplied *.ini file - """ - configParseObj = configparser.ConfigParser() - configParseObj.read(args) - return configParseObj - - -def parse_args(): - parser = argparse.ArgumentParser(description="script description") - parser.add_argument("--province", - type=str, - help="Two letters only", - required=True) - parser.add_argument("--dbview", - type=str, - help=" Thematic Database View", - required=True) - parser.add_argument("--idField", - type=str, - help="Field to use as ElasticSearch Index ID", - required=True) - args = parser.parse_args() - - return args - - if __name__ == '__main__': main() diff --git a/python/riskDynamics_postgres2es.py b/python/riskDynamics_postgres2es.py index 979d89ee..32c362c8 100644 --- a/python/riskDynamics_postgres2es.py +++ b/python/riskDynamics_postgres2es.py @@ -139,14 +139,18 @@ def main(): connection.close() # create index - if es.indices.exists(view): - es.indices.delete(view) + if es.indices.exists("opendrr_" + view): + es.indices.delete("opendrr_" + view) - es.indices.create(index=view, body=settings, request_timeout=90) + es.indices.create(index="opendrr_" + view, + body=settings, + request_timeout=90) d = json.loads(geojsonobject) - helpers.bulk(es, gendata(d, view, id_field), raise_on_error=False) + helpers.bulk(es, + gendata(d, "opendrr_" + view, id_field), + raise_on_error=False) return diff --git a/python/sauid_postgres2es.py b/python/sauid_postgres2es.py new file mode 100644 index 00000000..d386ce2e --- /dev/null +++ b/python/sauid_postgres2es.py @@ -0,0 +1,45 @@ +# ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden +# ================================================================= + + +import utils + + +def main(): + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } + } + } + }), + view="opendrr_geometry_sauid", + sqlquerystring='SELECT *, ST_AsGeoJSON(geom) \ + FROM boundaries."Geometry_SAUID" \ + ORDER BY "Geometry_SAUID"."OBJECTID" \ + LIMIT {limit} \ + OFFSET {offset}' + ) + + table.postgis2es() + + return + + +if __name__ == '__main__': + main() diff --git a/python/socialFabric_postgres2es.py b/python/socialFabric_postgres2es.py index 5ed2d2ae..5b1764f7 100644 --- a/python/socialFabric_postgres2es.py +++ b/python/socialFabric_postgres2es.py @@ -1,205 +1,120 @@ - - -import json -import os -import sys -import psycopg2 -import configparser -import logging +# ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden +# ================================================================= + +import utils import argparse -import decimal -from elasticsearch import Elasticsearch -from elasticsearch import helpers ''' Script to convert Social Fabric Views to ElasticSearch Index Can be run from the command line with mandatory arguments Run this script with a command like: python3 socialFabric_postgres2es.py - --type="family_structure" + --type="all_indicators" --aggregation="sauid" --geometry=geom_poly --idField="Sauid" ''' - # Main Function def main(): - logFileName = '{}.log'.format(os.path.splitext(sys.argv[0])[0]) - logging.basicConfig(level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[logging.FileHandler(logFileName), - logging.StreamHandler()]) - auth = get_config_params('config.ini') args = parse_args() - view = "nhsl_social_fabric_{type}_{aggregation}".format(**{ - 'type': args.type, - 'aggregation': args.aggregation[0].lower()}) - - if args.idField.lower() == 'sauid': - id_field = 'Sauid' - sqlquerystring = 'SELECT *, ST_AsGeoJSON(geom_poly) \ - FROM results_nhsl_social_fabric.{view}'.format(**{ - 'view': view}) - settings = { - 'settings': { - 'number_of_shards': 1, - 'number_of_replicas': 0 - }, - 'mappings': { - 'properties': { - 'geometry': { - 'type': 'geo_shape' - }, - 'geom_poly': { - 'type': 'geo_shape' + if args.aggregation.lower() == "sauid": + aggregation = args.aggregation[0].lower() + else: + aggregation = args.aggregation + + # index settings + if args.geometry == "geom_poly" or "geom": + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + } } } - } - } - elif args.idField.lower() == 'building': - id_field = 'AssetID' - sqlquerystring = 'SELECT *, ST_AsGeoJSON(geom_point) \ - FROM results_nhsl_hazard_threat.{view}'.format(**{ - 'view': view}) - settings = { - 'settings': { - 'number_of_shards': 1, - 'number_of_replicas': 0 - }, - 'mappings': { - 'properties': { - 'coordinates': { - 'type': 'geo_point' - }, - 'geometry': { - 'type': 'geo_shape' + }), + view="opendrr_nhsl_social_fabric_all_indicators_{agg}".format(**{ + 'agg': aggregation}), + sqlquerystring='SELECT *, ST_AsGeoJSON({geom}) \ + FROM \ + results_nhsl_social_fabric.nhsl_social_fabric_all_indicators_{agg} \ + ORDER BY "{sort_field}" \ + LIMIT {{limit}} \ + OFFSET {{offset}}'.format(**{ + 'geom': args.geometry, + 'agg': aggregation, + 'sort_field': args.sortfield}) + ) + + elif args.geometry == "geom_point": + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'coordinates': { + 'type': 'geo_point' + }, + 'geometry': { + 'type': 'geo_shape' + } } } - } - } - - # es=Elasticsearch() - es = Elasticsearch([auth.get('es', 'es_endpoint')], - http_auth=(auth.get('es', 'es_un'), - auth.get('es', 'es_pw'))) - connection = None - try: - # Connect to the PostGIS database hosted on RDS - connection = psycopg2.connect(user=auth.get('rds', 'postgres_un'), - password=auth.get('rds', 'postgres_pw'), - host=auth.get('rds', 'postgres_host'), - port=auth.get('rds', 'postgres_port'), - database=auth.get('rds', 'postgres_db')) - # Query the entire view with the geometries in geojson format - cur = connection.cursor() - cur.execute(sqlquerystring) - rows = cur.fetchall() - columns = [name[0] for name in cur.description] - geomIndex = columns.index('st_asgeojson') - feature_collection = {'type': 'FeatureCollection', 'features': []} - - # Format the table into a geojson format for ES/Kibana consumption - for row in rows: - if args.idField.lower() == 'sauid': - feature = { - 'type': 'Feature', - 'geometry': json.loads(row[geomIndex]), - 'properties': {}, - } - for index, column in enumerate(columns): - if column != "st_asgeojson": - value = row[index] - feature['properties'][column] = value - - elif args.idField.lower() == 'building': - coordinates = json.loads(row[geomIndex])['coordinates'] - feature = { - 'type': 'Feature', - 'geometry': json.loads(row[geomIndex]), - 'coordinates': coordinates, - 'properties': {}, - } - for index, column in enumerate(columns): - if column != "st_asgeojson": - value = row[index] - feature['properties'][column] = value - - feature_collection['features'].append(feature) - geojsonobject = json.dumps(feature_collection, - indent=2, - default=decimal_default) - - except (Exception, psycopg2.Error) as error: - logging.error(error) - - finally: - if(connection): - # cursor.close() - connection.close() - - # create index - if es.indices.exists(view): - es.indices.delete(view) - - es.indices.create(index=view, body=settings, request_timeout=90) - - d = json.loads(geojsonobject) - - helpers.bulk(es, gendata(d, view, id_field), raise_on_error=False) + }), + view="opendrr_nhsl_social_fabric_all_indicators_{agg}".format(**{ + 'agg': args.aggregation[0].lower()}), + sqlquerystring='SELECT *, ST_AsGeoJSON(geom_point) \ + FROM \ + results_nhsl_social_fabric.nhsl_social_fabric_all_indicators_{agg} \ + ORDER BY "{sort_field}" \ + LIMIT {{limit}} \ + OFFSET {{offset}}'.format(**{ + 'agg': aggregation, + 'sort_field': args.sortfield}) + ) + + table.postgis2es() return -def gendata(data, view, id_field): - for item in data['features']: - yield { - "_index": view, - "_id": item['properties'][id_field], - "_source": item - } - - -# Function to handle decimal encoder error -def decimal_default(obj): - if isinstance(obj, decimal.Decimal): - return float(obj) - raise TypeError - - -def get_config_params(args): - """ - Parse Input/Output columns from supplied *.ini file - """ - configParseObj = configparser.ConfigParser() - configParseObj.read(args) - return configParseObj - - def parse_args(): - parser = argparse.ArgumentParser(description="load data PostGIS to ES") - parser.add_argument("--type", - type=str, - help="Social Fabric layer (i.e. eq_threat_to_assets)", - required=True) + parser = argparse.ArgumentParser(description="load exposure PostGIS to ES") parser.add_argument("--aggregation", type=str, - help="building or Sauid", + help="sauid or hexgrid_xxkm", required=True) parser.add_argument("--geometry", type=str, help="geom_point or geom_poly", required=True) - parser.add_argument("--idField", + parser.add_argument("--sortfield", type=str, - help="Field for ES Index ID. AssetID or Sauid", + help="Sauid or gridid_100, gridid_25 etc.", required=True) args = parser.parse_args() return args - if __name__ == '__main__': main() diff --git a/python/srcLoss_postgres2es.py b/python/srcLoss_postgres2es.py index 6e0e7846..a3cdd93a 100644 --- a/python/srcLoss_postgres2es.py +++ b/python/srcLoss_postgres2es.py @@ -1,136 +1,51 @@ # ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT # -# Authors: Drew Rotheram +# Copyright (C) 2020-2021 Government of Canada # +# Main Authors: Drew Rotheram +# Joost van Ulden # ================================================================= -import json -import os -import sys -import psycopg2 -import configparser -import logging +import utils import argparse -import decimal -from elasticsearch import Elasticsearch -from elasticsearch import helpers - -''' -Script to convert src_loss tables to ElasticSearch Index -Can be run from the command line with mandatory arguments -Run this script with a command like: -python3 srcLoss_postgres2es.py --province={PT} -''' - - -# Main Function def main(): - logFileName = '{}.log'.format(os.path.splitext(sys.argv[0])[0]) - logging.basicConfig(level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[logging.FileHandler(logFileName), - logging.StreamHandler()]) - auth = get_config_params('config.ini') args = parse_args() - view = "psra_{province}_src_loss".format(**{ - 'province': args.province.lower()}) - sqlquerystring = 'SELECT * \ - FROM results_psra_{province}.{view}'.format(**{ - 'province': args.province.lower(), - 'view': view}) - settings = { - 'settings': { - 'number_of_shards': 1, - 'number_of_replicas': 0 - } - } - - es = Elasticsearch([auth.get('es', 'es_endpoint')], - http_auth=(auth.get('es', 'es_un'), - auth.get('es', 'es_pw'))) - connection = None - try: - # Connect to the PostGIS database hosted on RDS - connection = psycopg2.connect(user=auth.get('rds', 'postgres_un'), - password=auth.get('rds', 'postgres_pw'), - host=auth.get('rds', 'postgres_host'), - port=auth.get('rds', 'postgres_port'), - database=auth.get('rds', 'postgres_db')) - # Query the entire view with the geometries in geojson format - cur = connection.cursor() - cur.execute(sqlquerystring) - rows = cur.fetchall() - columns = [name[0] for name in cur.description] - feature_collection = {'type': 'FeatureCollection', 'features': []} - - # Format the table into a geojson format for ES/Kibana consumption - for row in rows: - feature = { - 'type': 'Feature', - 'properties': {}, + table = utils.PostGISTable( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'coordinates': { + 'type': 'geo_point' + }, + 'geometry': { + 'type': 'geo_shape' + } + } } - for index, column in enumerate(columns): - value = row[index] - feature['properties'][column] = value - - feature_collection['features'].append(feature) - geojsonobject = json.dumps(feature_collection, - indent=2, - default=decimal_default) - - except (Exception, psycopg2.Error) as error: - logging.error(error) - - finally: - if(connection): - connection.close() - - # create index - if es.indices.exists(view): - es.indices.delete(view) - - es.indices.create(index=view, body=settings, request_timeout=90) + }), + view="opendrr_psra_src_loss", + sqlquerystring='SELECT * \ + FROM results_psra_national.psra_src_loss \ + ORDER BY psra_src_loss."region" \ + LIMIT {limit} \ + OFFSET {offset}' + ) - d = json.loads(geojsonobject) - - helpers.bulk(es, gendata(d, view), raise_on_error=False) + table.postgis2es() return - -def gendata(data, view): - for item in data['features']: - yield { - "_index": view, - "_source": item - } - - -# Function to handle decimal encoder error -def decimal_default(obj): - if isinstance(obj, decimal.Decimal): - return float(obj) - raise TypeError - - -def get_config_params(args): - """ - Parse Input/Output columns from supplied *.ini file - """ - configParseObj = configparser.ConfigParser() - configParseObj.read(args) - return configParseObj - - def parse_args(): parser = argparse.ArgumentParser(description="script description") - parser.add_argument("--province", - type=str, - help="Two letters only", - required=True) args = parser.parse_args() - return args diff --git a/python/uhs_postgres2es.py b/python/uhs_postgres2es.py index 8edfbb35..bcae6743 100644 --- a/python/uhs_postgres2es.py +++ b/python/uhs_postgres2es.py @@ -1,164 +1,50 @@ # ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT # -# Authors: Drew Rotheram +# Copyright (C) 2020-2021 Government of Canada # +# Main Authors: Drew Rotheram +# Joost van Ulden # ================================================================= -import json -import os -import sys -import psycopg2 -import configparser -import logging +import utils import argparse -import decimal -from elasticsearch import Elasticsearch -from elasticsearch import helpers - -''' -Script to convert uhs views to ElasticSearch Index -Can be run from the command line with mandatory arguments -Run this script with a command like: -python3 uhs_postgres2es.py --province=${PT} -''' - - -# Main Function def main(): - logFileName = '{}.log'.format(os.path.splitext(sys.argv[0])[0]) - logging.basicConfig(level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[logging.FileHandler(logFileName), - logging.StreamHandler()]) - auth = get_config_params('config.ini') args = parse_args() - view = "psra_{province}_uhs".format(**{ - 'province': args.province.lower()}) - limit = 10000 - offset = 0 - - # create index - es = Elasticsearch([auth.get('es', 'es_endpoint')], - http_auth=(auth.get('es', 'es_un'), - auth.get('es', 'es_pw'))) - if es.indices.exists(view): - es.indices.delete(view) - - # id_field = 'AssetID' - settings = { - 'settings': { - 'number_of_shards': 1, - 'number_of_replicas': 0 - }, - 'mappings': { - 'properties': { - 'coordinates': { - 'type': 'geo_point' - }, - 'geometry': { - 'type': 'geo_shape' + table = utils.PostGISdataset( + utils.PostGISConnection(), + utils.ESConnection(settings={ + 'settings': { + 'number_of_shards': 1, + 'number_of_replicas': 0 + }, + 'mappings': { + 'properties': { + 'coordinates': { + 'type': 'geo_point' + }, + 'geometry': { + 'type': 'geo_shape' + } } } - } - } - es.indices.create(index=view, body=settings, request_timeout=90) - - while True: + }), + view="opendrr_psra_uhs", sqlquerystring = 'SELECT *, ST_AsGeoJSON(geom) \ - FROM results_psra_{province}.{view} \ - ORDER BY {view}."geom" \ - LIMIT {limit} \ - OFFSET {offset}'.format(**{'province': args.province.lower(), - 'view': view, - 'limit': limit, - 'offset': offset}) - offset += limit - connection = None - try: - # Connect to the PostGIS database - connection = psycopg2.connect(user=auth.get('rds', - 'postgres_un'), - password=auth.get('rds', - 'postgres_pw'), - host=auth.get('rds', - 'postgres_host'), - port=auth.get('rds', - 'postgres_port'), - database=auth.get('rds', - 'postgres_db')) - # Query the entire view with the geometries in geojson format - cur = connection.cursor() - cur.execute(sqlquerystring) - rows = cur.fetchall() - if rows: - columns = [name[0] for name in cur.description] - geomIndex = columns.index('st_asgeojson') - feature_collection = {'type': 'FeatureCollection', - 'features': []} - # Format table into a geojson format for ES/Kibana consumption - for row in rows: - coordinates = json.loads(row[geomIndex])['coordinates'] - feature = { - 'type': 'Feature', - 'geometry': json.loads(row[geomIndex]), - 'coordinates': coordinates, - 'properties': {}, - } - for index, column in enumerate(columns): - if column != "st_asgeojson": - value = row[index] - feature['properties'][column] = value - - feature_collection['features'].append(feature) - geojsonobject = json.dumps(feature_collection, - indent=2, - default=decimal_default) - d = json.loads(geojsonobject) - helpers.bulk(es, - gendata(d, view), - raise_on_error=False) - - else: - if(connection): - connection.close() - return - - except (Exception, psycopg2.Error) as error: - logging.error(error) - - -def gendata(data, view): - for item in data['features']: - yield { - "_index": view, - # "_id": item['properties'][id_field], - "_source": item - } - - -# Function to handle decimal encoder error -def decimal_default(obj): - if isinstance(obj, decimal.Decimal): - return float(obj) - raise TypeError - + FROM results_psra_national.psra_uhs \ + ORDER BY psra_uhs."geom" \ + LIMIT {limit} \ + OFFSET {offset}' + ) -def get_config_params(args): - """ - Parse Input/Output columns from supplied *.ini file - """ - configParseObj = configparser.ConfigParser() - configParseObj.read(args) - return configParseObj + table.postgis2es() + return def parse_args(): parser = argparse.ArgumentParser(description="script description") - parser.add_argument("--province", - type=str, - help="Two letters only", - required=True) args = parser.parse_args() return args diff --git a/python/utils.py b/python/utils.py new file mode 100644 index 00000000..45ee1891 --- /dev/null +++ b/python/utils.py @@ -0,0 +1,253 @@ +# ================================================================= +# !/bin/bash +# SPDX-License-Identifier: MIT +# +# Copyright (C) 2020-2021 Government of Canada +# +# Main Authors: Drew Rotheram +# Joost van Ulden +# ================================================================= + +import configparser +import psycopg2 +import json +import decimal + +from elasticsearch import Elasticsearch +from elasticsearch import helpers + + +class ESConnection: + def __init__(self, settings): + self._settings = settings + self._auth = get_config_params('config.ini') + + def settings(self): + return self._settings + + pass + + +class PostGISConnection: + def __init__(self): + self._auth = get_config_params('config.ini') + self._pgConnection = psycopg2.connect( + user=self._auth.get('rds', 'postgres_un'), + password=self._auth.get('rds', 'postgres_pw'), + host=self._auth.get('rds', 'postgres_host'), + port=self._auth.get('rds', 'postgres_port'), + database=self._auth.get('rds', 'postgres_db')) + + def auth(self): + return self._auth + + def pgConnection(self): + return self._pgConnection + + pass + + +class PostGISdataset: + """A class to represent a dataset stored + in PostGIS with methods to connect to + a PostGIS database, query the table into + a geojson object and post that geojson + to an ElasticSearch instance + """ + LIMIT = 10000 + OFFSET = 0 + + def __init__(self, PostGISConnection, ESConnection, view, sqlquerystring): + self._pgConnection = PostGISConnection + self._esConnection = ESConnection + self._view = view + self._sqlquerystring = sqlquerystring + self._auth = get_config_params('config.ini') + + def pgConnection(self): + return self._pgConnection + + def esConnection(self): + return self._esConnection + + def view(self): + return self._view + + def auth(self): + return self._auth + + def sqlquerystring(self): + return self._sqlquerystring + + def getGeoJson(self, sqlquerystring, pgConnection): + cur = pgConnection.pgConnection().cursor() + cur.execute(sqlquerystring) + rows = cur.fetchall() + if rows: + columns = [name[0] for name in cur.description] + geomIndex = columns.index('st_asgeojson') + feature_collection = {'type': 'FeatureCollection', + 'features': []} + + # Format table into a geojson format for ES/Kibana consumption + for row in rows: + feature = { + 'type': 'Feature', + 'geometry': json.loads(row[geomIndex]), + 'properties': {}, + } + for index, column in enumerate(columns): + if column != "st_asgeojson": + value = row[index] + feature['properties'][column] = value + feature_collection['features'].append(feature) + geojsonobject = json.dumps(feature_collection, + indent=2, + default=decimal_default) + return geojsonobject + else: + return None + + def initializeElasticSearchIndex(self, esConnection, auth, view): + es = Elasticsearch([auth.get('es', 'es_endpoint')], + http_auth=(auth.get('es', 'es_un'), + auth.get('es', 'es_pw'))) + if es.indices.exists(view): + es.indices.delete(view) + settings = esConnection.settings() + es.indices.create(index=view, body=settings, request_timeout=90) + return + + def populateElasticSearchIndex(self, + esConnection, + geojsonobject, + auth, + view): + d = json.loads(geojsonobject) + es = Elasticsearch([auth.get('es', 'es_endpoint')], + http_auth=(auth.get('es', 'es_un'), + auth.get('es', 'es_pw')), + timeout=30, + max_retries=10, + retry_on_timeout=True) + helpers.bulk(es, + gendata(d, view), + raise_on_error=False) + return + + def postgis2es(self): + self.initializeElasticSearchIndex(self.esConnection(), + self.auth(), + self.view()) + sqlquerystring = self.sqlquerystring().format( + **{'limit': self.LIMIT, + 'offset': self.OFFSET}) + geojsonobject = self.getGeoJson(sqlquerystring, self.pgConnection()) + while geojsonobject is not None: + + print(sqlquerystring) + self.populateElasticSearchIndex(self.esConnection(), + geojsonobject, + self.auth(), + self.view()) + self.OFFSET += self.LIMIT + + sqlquerystring = self.sqlquerystring().format( + **{'limit': self.LIMIT, + 'offset': self.OFFSET}) + geojsonobject = self.getGeoJson(sqlquerystring, + self.pgConnection()) + + return + + +class PostGISPointDataset(PostGISdataset): + + def getGeoJson(self, sqlquerystring, pgConnection): + cur = pgConnection.pgConnection().cursor() + cur.execute(sqlquerystring) + rows = cur.fetchall() + if rows: + columns = [name[0] for name in cur.description] + geomIndex = columns.index('st_asgeojson') + feature_collection = {'type': 'FeatureCollection', + 'features': []} + + # Format table into a geojson format for ES/Kibana consumption + for row in rows: + coordinates = json.loads(row[geomIndex])['coordinates'] + feature = { + 'type': 'Feature', + 'geometry': json.loads(row[geomIndex]), + 'coordinates': coordinates, + 'properties': {}, + } + for index, column in enumerate(columns): + if column != "st_asgeojson": + value = row[index] + feature['properties'][column] = value + feature_collection['features'].append(feature) + geojsonobject = json.dumps(feature_collection, + indent=2, + default=decimal_default) + return geojsonobject + else: + return None + + +class PostGISTable(PostGISdataset): + + def getGeoJson(self, sqlquerystring, pgConnection): + cur = pgConnection.pgConnection().cursor() + cur.execute(sqlquerystring) + rows = cur.fetchall() + if rows: + columns = [name[0] for name in cur.description] + # geomIndex = columns.index('st_asgeojson') + feature_collection = {'type': 'FeatureCollection', + 'features': []} + + # Format table into a geojson format for ES/Kibana consumption + for row in rows: + # coordinates = json.loads(row[geomIndex])['coordinates'] + feature = { + 'type': 'Feature', + # 'geometry': json.loads(row[geomIndex]), + # 'coordinates': coordinates, + 'properties': {}, + } + for index, column in enumerate(columns): + if column != "st_asgeojson": + value = row[index] + feature['properties'][column] = value + feature_collection['features'].append(feature) + geojsonobject = json.dumps(feature_collection, + indent=2, + default=decimal_default) + return geojsonobject + else: + return None + + +def gendata(data, view): + for item in data['features']: + yield { + "_index": view, + "_source": item + } + + +# Function to handle decimal encoder error +def decimal_default(obj): + if isinstance(obj, decimal.Decimal): + return float(obj) + raise TypeError + + +def get_config_params(args): + """ + Parse Input/Output columns from supplied *.ini file + """ + configParseObj = configparser.ConfigParser() + configParseObj.read(args) + return configParseObj diff --git a/sample.env b/sample.env index 76242d86..89a0c5d4 100644 --- a/sample.env +++ b/sample.env @@ -19,6 +19,7 @@ loadHazardThreat=false loadPhysicalExposure=true loadRiskDynamics=true loadSocialFabric=true +loadHexGrid = true # For testing and debugging #ADD_DATA_DRY_RUN=true