diff --git a/.gitignore b/.gitignore index 6d9b786..5c18a1a 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ .Ruserdata .env .DS_Store +*parameters.json +github_pat.txt diff --git a/.lintr b/.lintr new file mode 100644 index 0000000..52ff41d --- /dev/null +++ b/.lintr @@ -0,0 +1,4 @@ +linters: linters_with_defaults( + line_length_linter = NULL, + object_length_linter = NULL + ) diff --git a/ACI/.gitignore b/ACI/.gitignore new file mode 100644 index 0000000..14badde --- /dev/null +++ b/ACI/.gitignore @@ -0,0 +1,3 @@ +inputs/ +outputs/ +dataprep_inputs/ diff --git a/ACI/Dockerfile.ACI b/ACI/Dockerfile.ACI new file mode 100644 index 0000000..a7d7d34 --- /dev/null +++ b/ACI/Dockerfile.ACI @@ -0,0 +1,57 @@ +FROM rocker/tidyverse:4.3.1 + +# install system dependencies for R packages +RUN apt-get update && apt-get install --no-install-recommends -y \ + curl=7.81.* \ + git=1:2.34.* \ + gnupg=2.2.* \ + libcurl4-openssl-dev=7.81.* \ + libfontconfig1-dev=2.13.* \ + libfreetype6-dev=2.11.* \ + libfribidi-dev=1.0.* \ + libgit2-dev=1.1.* \ + libharfbuzz-dev=2.7.* \ + libicu-dev=70.1-* \ + libjpeg-dev=8c-* \ + libpng-dev=1.6.* \ + libssl-dev=3.0.* \ + libtiff-dev=4.3.* \ + libxml2-dev=2.9.* \ + make=4.3-* \ + pandoc=2.9.2.* \ + zlib1g-dev=1:1.2.* \ + && rm -rf /var/lib/apt/lists/* + +RUN curl -fsSL -o /tmp/google-chrome.deb https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb \ + && apt-get update \ + && DEBIAN_FRONTEND='noninteractive' apt-get install --no-install-recommends -y /tmp/google-chrome.deb \ + && rm /tmp/google-chrome.deb \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /workflow.data.preparation + +# set frozen CRAN repo and other R options() +COPY ./ACI/Rprofile.site "/usr/local/lib/R/etc/Rprofile.site" + +# Install R dependencies +COPY DESCRIPTION DESCRIPTION + +# install pak, find dependencises from DESCRIPTION, and install them. +RUN --mount=type=secret,id=github_pat \ + Rscript -e "\ + Sys.setenv(GITHUB_PAT = readLines('/run/secrets/github_pat')); \ + install.packages('pak'); \ + deps <- pak::local_deps(root = '.'); \ + pkg_deps <- deps[!deps[['direct']], 'ref']; \ + cat(pkg_deps); \ + pak::pak(pkg_deps); \ + Sys.unsetenv('GITHUB_PAT'); \ + " + +COPY ./run_pacta_data_preparation.R run_pacta_data_preparation.R +COPY ./config.yml config.yml +COPY ./ACI/copy_raw_data.R copy_raw_data.R + +COPY ./ACI/copy_files_and_run_data_prep.sh /usr/local/bin/copy_files_and_run_data_prep + +CMD ["copy_files_and_run_data_prep"] diff --git a/ACI/RProfile.site b/ACI/RProfile.site new file mode 100644 index 0000000..852d498 --- /dev/null +++ b/ACI/RProfile.site @@ -0,0 +1,9 @@ +options( + pkg.sysreqs = FALSE, + pkg.sysreqs_db_update = FALSE, + pkg.sysreqs_update = FALSE, + readr.show_progress = FALSE, + repos = c( + CRAN = "https://packagemanager.posit.co/cran/__linux__/jammy/2023-10-30" + ) +) diff --git a/ACI/azure-deploy.json b/ACI/azure-deploy.json new file mode 100644 index 0000000..bd06cd8 --- /dev/null +++ b/ACI/azure-deploy.json @@ -0,0 +1,186 @@ +{ + "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#", + "contentVersion": "0.0.0.5", + "parameters": { + "location": { + "type": "string", + "defaultValue": "[resourceGroup().location]", + "metadata": { + "description": "Location for all resources." + } + }, + "identity": { + "type": "string", + "metadata": { + "description": "The ID of the user assigned identity to use for the container group." + } + }, + "containerGroupName": { + "type": "string", + "metadata": { + "description": "The name of the container group." + } + }, + "restartPolicy": { + "type": "string", + "defaultValue": "OnFailure", + "allowedValues": [ + "Always", + "Never", + "OnFailure" + ], + "metadata": { + "description": "The behavior of Azure runtime if container has stopped." + } + }, + "rawdata-storageaccountkey": { + "type": "securestring", + "metadata": { + "description": "The storage account key for the rawdata storage account." + } + }, + "dataprepoutputs-storageaccountkey": { + "type": "securestring", + "metadata": { + "description": "The storage account key for the rawdata storage account." + } + }, + "starttime": { + "type": "string", + "defaultValue": "[utcNow()]", + "metadata": { + "description": "The time to start the container group." + } + } + }, + "variables": { + "azurecontainerregistry": "transitionmonitordockerregistry.azurecr.io" + }, + "functions": [], + "resources": [ + { + "type": "Microsoft.ContainerInstance/containerGroups", + "apiVersion": "2021-09-01", + "name": "[parameters('containerGroupName')]", + "location": "[parameters('location')]", + "identity": { + "type": "UserAssigned", + "userAssignedIdentities": { + "[parameters('identity')]": {} + } + }, + "metadata": { + "data-prep environmentVariables description": { + "DEPLOY_START_TIME": "The time the container was deployed.", + "R_CONFIG_ACTIVE": "The active config for the container.", + "R_CONFIG_FILE": "The config file for the container.", + "LOG_LEVEL": "The log level for the container. See {rlog} docs." + } + }, + "properties": { + "containers": [ + { + "name": "data-prep", + "properties": { + "image": "[concat(variables('azurecontainerregistry'),'/workflow.data.preparation_aci:latest')]", + "ports": [], + "resources": { + "requests": { + "cpu": 1, + "memoryInGB": 32 + "gpu": { + "count": 1, + "sku": "V100" + } + } + }, + "environmentVariables": [ + { + "name": "DEPLOY_START_TIME", + "value": "[parameters('starttime')]" + }, + { + "name": "R_CONFIG_ACTIVE", + "value": "2022Q4_CICD" + }, + { + "name": "R_CONFIG_FILE", + "value": "/workflow.data.preparation/config.yml" + }, + { + "name": "LOG_LEVEL", + "value": "TRACE" + } + ], + "volumeMounts": [ + { + "name": "factset-extracted", + "mountPath": "/mnt/factset-extracted/" + }, + { + "name": "rawdatavolume", + "mountPath": "/mnt/rawdata/" + }, + { + "name": "dataprepinputsvolume", + "mountPath": "/mnt/dataprep_inputs" + }, + { + "name": "outputsvolume", + "mountPath": "/mnt/outputs/" + } + ] + } + } + ], + "imageRegistryCredentials": [ + { + "server": "[variables('azurecontainerregistry')]", + "identity": "[parameters('identity')]" + } + ], + "restartPolicy": "[parameters('restartPolicy')]", + "osType": "Linux", + "volumes": [ + { + "name": "factset-extracted", + "azureFile": { + "shareName": "factset-extracted", + "readOnly": true, + "storageAccountName": "pactarawdata", + "storageAccountKey": "[parameters('rawdata-storageaccountkey')]" + } + }, + { + "name": "rawdatavolume", + "azureFile": { + "shareName": "rawdata", + "readOnly": true, + "storageAccountName": "pactarawdata", + "storageAccountKey": "[parameters('rawdata-storageaccountkey')]" + } + }, + { + "name": "dataprepinputsvolume", + "azureFile": { + "shareName": "dataprep-inputs", + "readOnly": false, + "storageAccountName": "pactarawdata", + "storageAccountKey": "[parameters('rawdata-storageaccountkey')]" + } + }, + { + "name": "outputsvolume", + "azureFile": { + "shareName": "data-prep-outputs", + "readOnly": false, + "storageAccountName": "pactadata", + "storageAccountKey": "[parameters('dataprepoutputs-storageaccountkey')]" + } + } + ] + } + } + ], + "outputs": {} +} diff --git a/ACI/copy_files_and_run_data_prep.sh b/ACI/copy_files_and_run_data_prep.sh new file mode 100755 index 0000000..bcc07d7 --- /dev/null +++ b/ACI/copy_files_and_run_data_prep.sh @@ -0,0 +1,16 @@ +#! /bin/sh +set -e + +# check memory available +free -m | cat + +inputs_dir="/mnt/dataprep_inputs" + +# copy raw data, then run normal data prep script +Rscript /workflow.data.preparation/copy_raw_data.R 2>&1 | \ + tee "$inputs_dir/$DEPLOY_START_TIME-copy.log" + +Rscript /workflow.data.preparation/run_pacta_data_preparation.R 2>&1 | \ + tee "$inputs_dir/$DEPLOY_START_TIME-prep.log" + +exit 0 diff --git a/ACI/copy_raw_data.R b/ACI/copy_raw_data.R new file mode 100644 index 0000000..160f871 --- /dev/null +++ b/ACI/copy_raw_data.R @@ -0,0 +1,137 @@ +logger::log_threshold(Sys.getenv("LOG_LEVEL", "INFO")) +logger::log_formatter(logger::formatter_glue) + +# Check value and format of $DEPLOY_START_TIME +deploy_start_time <- Sys.getenv("DEPLOY_START_TIME", "") +time_pattern <- "^[[:digit:]]{8}T[[:digit:]]{6}Z$" +if (grepl(x = deploy_start_time, pattern = time_pattern)) { + logger::log_debug("DEPLOY_START_TIME: ", deploy_start_time) + logger::log_trace("DEPLOY_START_TIME format is correct. ({time_pattern})") +} else if (nchar(deploy_start_time) == 0L) { + logger::log_error( + "Environment variable $DEPLOY_START_TIME not set or is empty" + ) + stop("Environment variable DEPLOY_START_TIME not set") +} else { + logger::log_warn(" + Environment variable $DEPLOY_START_TIME is not in the expected format. \\ + Expected format: '{time_pattern}'. \\ + Actual value: '{deploy_start_time}'. \\ + This variable is used to ensure consistency in accessing datasets. \\ + ") +} + +logger::log_info("Loading config: ", Sys.getenv("R_CONFIG_ACTIVE", "default")) +cfg <- config::get() +logger::log_trace("Config loaded.") + +masterdata_path <- file.path( + cfg[["raw_data_path"]], + "AssetImpact", + "Masterdata", + cfg[["pacta_financial_timestamp"]] +) +logger::log_trace("masterdata_path: ", masterdata_path) + +masterdata_debt <- file.path( + masterdata_path, + cfg[["masterdata_debt_filename"]] +) +logger::log_trace("masterdata_debt file: ", masterdata_debt) + +masterdata_ownership <- file.path( + masterdata_path, + cfg[["masterdata_ownership_filename"]] +) +logger::log_trace("masterdata_ownership file: ", masterdata_ownership) + +ar_fs_bridge <- file.path( + cfg[["raw_data_path"]], + "AssetImpact", + "FactSet_Bridge", + cfg[["ar_company_id__factset_entity_id_filename"]] +) +logger::log_trace("ar_fs_bridge file: ", ar_fs_bridge) + +factset_files <- list.files( + path = file.path( + cfg[["factset-extracted_path"]], + cfg[["factset_dataset"]] + ), + include.dirs = FALSE, + full.names = TRUE +) +logger::log_trace("factset_file: {factset_files}") + +files_to_copy <- c( + masterdata_debt, + masterdata_ownership, + ar_fs_bridge, + factset_files +) + +missing_files <- !file.exists(files_to_copy) +if (any(missing_files)) { + logger::log_error("The following files are missing:") + logger::log_error("{files_to_copy[missing_files]}") + stop("Please ensure the config points to existing files.") +} + +if (dir.exists(cfg[["data_prep_inputs_path"]])) { + logger::log_warn("data_prep_inputs_path already exists") +} else { + logger::log_debug( + "Creating data_prep_inputs_path: {cfg[['data_prep_inputs_path']]}}" + ) + dir.create(cfg[["data_prep_inputs_path"]]) +} +logger::log_info( + "copying files to data_prep_inputs_path: {cfg[['data_prep_inputs_path']]}}" +) + +logger::log_info("Copying files") +for (source_file in files_to_copy) { + + destination_file <- file.path( + cfg[["data_prep_inputs_path"]], + basename(source_file) + ) + if (file.exists(destination_file)) { + logger::log_warn( + "Destination file already exists: {destination_file}." + ) + } + logger::log_debug("Copying: {source_file} -> {destination_file}") + copy_success <- file.copy( + from = source_file, + to = destination_file, + overwrite = FALSE + ) + if (copy_success) { + logger::log_trace("Copy success") + } else { + logger::log_error("Failed to copy {source_file} to {destination_file}") + stop("File copy error") + } + + source_md5 <- digest::digest( + object = source_file, + algo = "md5", + file = TRUE + ) + destination_md5 <- digest::digest( + object = destination_file, + algo = "md5", + file = TRUE + ) + if (identical(source_md5, destination_md5)) { + logger::log_trace("MD5 match: {unique(source_md5, destination_md5)}") + } else { + logger::log_error(sprintf("MD5 mismatch for %s", basename(source_file))) + logger::log_error("Source MD5: {source_md5} {source_file}") + logger::log_error("Destination MD5: {destination_md5} {destination_file}") + stop("MD5 mismatch.") + } + +} +logger::log_info("Done copying files") diff --git a/ACI/docker-compose.yml b/ACI/docker-compose.yml new file mode 100644 index 0000000..da45c48 --- /dev/null +++ b/ACI/docker-compose.yml @@ -0,0 +1,29 @@ +version: "3.2" + +services: + workflow.data.preparation_aci: + build: + context: .. + dockerfile: ACI/Dockerfile.ACI + secrets: + - github_pat + environment: + LOG_LEVEL: TRACE + R_CONFIG_ACTIVE: 2022Q4_CICD + volumes: + - type: bind + source: ./dataprep_inputs + target: /mnt/dataprep_inputs + - type: bind + source: ./inputs/factset-extracted + target: /mnt/factset-extracted + - type: bind + source: ./inputs/rawdata + target: /mnt/rawdata + - type: bind + source: ./outputs + target: /mnt/outputs + +secrets: + github_pat: + file: ./github_pat.txt diff --git a/ACI/example-azure-deploy.parameters.json b/ACI/example-azure-deploy.parameters.json new file mode 100644 index 0000000..17016e0 --- /dev/null +++ b/ACI/example-azure-deploy.parameters.json @@ -0,0 +1,52 @@ +{ + "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentParameters.json#", + "contentVersion": "1.0.0.0", + "parameters": { + "identity": { + "value": "/subscriptions//resourcegroups//providers/Microsoft.ManagedIdentity/userAssignedIdentities/" + }, + "serviceprincipal": { + "value": "" + }, + "rawdata-storageaccountkey": { + "reference": { + "keyVault": { + "id": "/subscriptions//resourceGroups//providers/Microsoft.KeyVault/vaults/" + }, + "secretName": "rawdata-storageaccountkey" + } + }, + "dataprepinputs-storageaccountkey": { + "reference": { + "keyVault": { + "id": "/subscriptions//resourceGroups//providers/Microsoft.KeyVault/vaults/" + }, + "secretName": "dataprepinputs-storageaccountkey" + } + }, + "dataprepoutputs-storageaccountkey": { + "reference": { + "keyVault": { + "id": "/subscriptions//resourceGroups//providers/Microsoft.KeyVault/vaults/" + }, + "secretName": "dataprepoutputs-storageaccountkey" + } + }, + "factset-database-user": { + "reference": { + "keyVault": { + "id": "/subscriptions//resourceGroups//providers/Microsoft.KeyVault/vaults/" + }, + "secretName": "factset-database-user" + } + }, + "factset-database-password": { + "reference": { + "keyVault": { + "id": "/subscriptions//resourceGroups//providers/Microsoft.KeyVault/vaults/" + }, + "secretName": "factset-database-password" + } + } + } +} diff --git a/DESCRIPTION b/DESCRIPTION new file mode 100644 index 0000000..53460e8 --- /dev/null +++ b/DESCRIPTION @@ -0,0 +1,32 @@ +Package: workflow.data.preparation +Title: What the Package Does (One Line, Title Case) +Version: 0.0.0.9000 +Authors@R: + person("First", "Last", , "first.last@example.com", role = c("aut", "cre"), + comment = c(ORCID = "YOUR-ORCID-ID")) +Description: What the package does (one paragraph). +License: `use_mit_license()`, `use_gpl3_license()` or friends to pick a + license +Encoding: UTF-8 +Roxygen: list(markdown = TRUE) +RoxygenNote: 7.2.3 +Imports: + DBI, + RSQLite, + config, + digest, + dplyr, + glue, + logger, + pacta.data.preparation, + pacta.data.scraping, + pacta.scenario.preparation, + readr, + rlang, + stats, + stringr, + tidyr +Remotes: + RMI-PACTA/pacta.data.preparation, + RMI-PACTA/pacta.data.scraping, + RMI-PACTA/pacta.scenario.preparation diff --git a/README.md b/README.md index 8399005..f4dd990 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # workflow.data.preparation Running the workflow requires a file `.env` to exist in the root directory, that looks like... + ``` sh HOST_INPUTS_PATH=/PATH/TO/AR_YYYYQQ HOST_OUTPUTS_PATH=/PATH/TO/YYYYQQ_pacta_analysis_inputs_YYYY-MM-DD/YYYYQQ @@ -23,3 +24,103 @@ R_CONFIG_ACTIVE=YYYYQQ Run `docker-compose up` from the root directory, and docker will build the image (if necessary), and then run the data.prep process given the specified options in the .env file. Use `docker-compose build --no-cache` to force a rebuild of the Docker image. + +## Docker image for Azure Container Instance + +`Dockerfile.ACI` is intended to be built and run as an Azure Container Instance. + +Please note that this Dockerfile is intended to be built using [buildkit](https://docs.docker.com/build/buildkit/), since it relies on passing secrets. + +To build this image, create a file containing the _value_ of the GitHub PAT (with access to necessary repos), and build using buildkit. + +Up-to-date installations of docker on MacOS and Windows likely already have buildkit enabled. +It is possible to check your docker configuration, and check for `buildkit: true`. +If it is not enabled on your system, then you can either `export DOCKER_BUILDKIT=1 docker build`, or replace the `docker build` commands below with `docker buildx build` (either works). + +If your installed docker engine (found by running `docker version`) is > 20.10.0, then the secret can read from your local `GITHUB_PAT` envvar (must be `export`ed). + +```sh + +# must be built with buildkit +# run from repo root +docker build \ + --secret id=github_pat,env=GITHUB_PAT \ + --progress=plain \ + --tag workflow.data.preparation_aci \ + -f ACI/Dockerfile.ACI . + +``` + +For older docker versions that support buildkit, you can write the _value_ of the token to a file, and specifiy the absolute path to that file instead. + +```sh +# Note that path to secretfile must be an absolute path +# or use $(pwd) if in working dir + +# must be built with buildkit +# run from repo root +docker build \ + --secret id=github_pat,src=$(pwd)/ACI/github_pat.txt \ + --progress=plain \ + --tag transitionmonitordockerregistry.azurecr.io/workflow.data.preparation_aci \ + -f ACI/Dockerfile.ACI . + +``` + +The image then needs to be pushed to a registry, for use with `azure-deploy.json` + +### Deploy process + +#### Prerequisites + +[Containers ARM Schema](https://learn.microsoft.com/en-us/azure/templates/microsoft.containerinstance/containergroups?pivots=deployment-language-arm-template#resource-format) + +[secrets](https://learn.microsoft.com/en-us/azure/container-apps/manage-secrets?tabs=azure-portal) + +- Azure Key Vault: the deploy process reads secrets from an Azure Key vault. The essential values refenced in the ARM template are: + - Storage Account Key for raw data storage (`rawdata-storageAccountKey`) + - Storage Account Key for "input" data storage (`dataprepinputs-storageAccountKey`) + - Storage Account Key for "output" data storage (`dataprepoutputs-storageAccountKey`) + - Username for FactSet database (`factset-database-user`) + - Password for FactSet database (`factset-database-password`) +Note that the Storage account keys are passed as parameters via `azure-deploy.parameters.json`, while the database credentials are used by the application itself, are are __freely readable__ if accessing the container (via `exec`, for example). + +To get the storage keys: + +```sh +# replace these values with storage account name and resource group appropriate to your deployment +ACI_PERS_STORAGE_ACCOUNT_NAME="pactadata" +ACI_PERS_RESOURCE_GROUP="pacta-data" + +STORAGE_KEY=$(az storage account keys list --resource-group "$ACI_PERS_RESOURCE_GROUP" --account-name "$ACI_PERS_STORAGE_ACCOUNT_NAME" --query "[0].value" --output tsv) +echo "$STORAGE_KEY" +``` + +#### Deploy + +```sh +# change this value as needed. +RESOURCEGROUP="myResourceGroup" + +# run from repo root +az deployment group create --resource-group "$RESOURCEGROUP" --template-file ACI/azure-deploy.json --parameters @ACI/azure-deploy.parameters.json + +``` + +### Helpful tips + +To attach to the container and execute commands interactively (for debugging) + +```sh + +az container exec --resource-group "$RESOURCEGROUP" --name "" --container-name "data-prep" --exec-command "/bin/bash" + +``` + +To start a long-running process (to allow for attaching and debugging), add this to `properties` for the container: + +```json + "command": [ + "tail", "-f", "/dev/null" + ] +``` diff --git a/config.yml b/config.yml index 2ea2947..01e4ca6 100644 --- a/config.yml +++ b/config.yml @@ -1,3 +1,4 @@ +--- default: data_prep_inputs_path: "/inputs" data_prep_outputs_path: "/outputs" @@ -25,6 +26,7 @@ default: scenario_geographies_list: ["Global", "NonOECD", "OECD"] global_aggregate_scenario_sources_list: ["ETP2020", "GECO2021", "IPR2021", "ISF2021", "WEO2021"] global_aggregate_sector_list: ["Power"] + create_tar: true 2021Q4: @@ -46,6 +48,7 @@ default: scenario_geographies_list: ["Global", "NonOECD", "OECD"] global_aggregate_scenario_sources_list: ["ETP2020", "GECO2021", "IPR2021", "ISF2021", "WEO2021"] global_aggregate_sector_list: ["Power"] + create_tar: true 2021Q4_dev_vm: inherits: 2021Q4 @@ -80,6 +83,7 @@ default: scenario_geographies_list: ["Global", "NonOECD", "OECD"] global_aggregate_scenario_sources_list: ["ETP2020", "GECO2021", "IPR2021", "ISF2021", "WEO2021"] global_aggregate_sector_list: ["Power"] + create_tar: true 2022Q4: dbname: "fds_20230705" @@ -94,3 +98,12 @@ default: scenario_sources_list: ["GECO2022", "IPR2021", "ISF2021", "WEO2022"] scenario_raw_data_to_include: ["geco_2022", "ipr_2021", "isf_2021", "weo_2022"] global_aggregate_scenario_sources_list: ["WEO2022"] + +2022Q4_CICD: + inherits: 2022Q4 + raw_data_path: "/mnt/rawdata" + data_prep_inputs_path: !expr file.path("/mnt", "dataprep_inputs", Sys.getenv("DEPLOY_START_TIME")) + data_prep_outputs_path: !expr file.path("/mnt", "outputs", Sys.getenv("DEPLOY_START_TIME")) + factset-extracted_path: "/mnt/factset-extracted" + factset_dataset: "factset-pacta_timestamp-20221231T000000Z_pulled-20231221T195325Z" + update_factset: false diff --git a/run_pacta_data_preparation.R b/run_pacta_data_preparation.R index b01e3f5..12bf0c3 100644 --- a/run_pacta_data_preparation.R +++ b/run_pacta_data_preparation.R @@ -3,6 +3,7 @@ logger::log_formatter(logger::formatter_glue) # necessary packages ----------------------------------------------------------- +logger::log_debug("Loading necessary packages.") suppressPackageStartupMessages({ library(pacta.data.preparation) library(pacta.data.scraping) @@ -15,19 +16,25 @@ suppressPackageStartupMessages({ library(stringr) library(tidyr) }) - +logger::log_trace("Necessary packages loaded.") # config ----------------------------------------------------------------------- +# if any essential envvars are missing, read the .env file. +# These should be set already as part of an ACI deployment. +logger::log_debug("Reading .env file.") readRenviron(".env") +logger::log_debug("Loading config.") config <- config::get( - file = "config.yml", + file = Sys.getenv("R_CONFIG_FILE", "config.yml"), config = Sys.getenv("R_CONFIG_ACTIVE"), use_parent = FALSE ) +logger::log_trace("Config loaded.") +logger::log_debug("Setting config values as R objects.") data_prep_inputs_path <- config$data_prep_inputs_path data_prep_outputs_path <- config$data_prep_outputs_path masterdata_ownership_filename <- config$masterdata_ownership_filename @@ -57,17 +64,31 @@ tech_exclude <- config$tech_exclude scenario_geographies_list <- config$scenario_geographies_list global_aggregate_scenario_sources_list <- config$global_aggregate_scenario_sources_list global_aggregate_sector_list <- config$global_aggregate_sector_list - +create_tar <- config$create_tar +logger::log_trace("Config values set as R objects.") + +#ensure data_prep_outputs_path exists +logger::log_debug("Checking data prep outputs path.") +if (dir.exists(data_prep_outputs_path)) { + logger::log_warn("Data prep outputs path already exists.") +} else { + logger::log_debug("Creating data prep outputs path.") + dir.create(data_prep_outputs_path) +} +logger::log_info("Data prep outputs path: {data_prep_outputs_path}") # input filepaths -------------------------------------------------------------- +logger::log_debug("Setting input filepaths.") masterdata_ownership_path <- file.path(data_prep_inputs_path, masterdata_ownership_filename) +logger::log_trace("Masterdata ownership path: {masterdata_ownership_path}") masterdata_debt_path <- file.path(data_prep_inputs_path, masterdata_debt_filename) +logger::log_trace("Masterdata debt path: {masterdata_debt_path}") ar_company_id__factset_entity_id_path <- file.path(data_prep_inputs_path, ar_company_id__factset_entity_id_filename) - +logger::log_trace("AR company ID to FactSet entity ID path: {ar_company_id__factset_entity_id_path}") # pre-flight filepaths --------------------------------------------------------- @@ -81,50 +102,70 @@ factset_fund_data_path <- file.path(data_prep_inputs_path, "factset_fund_data.rd factset_isin_to_fund_table_path <- file.path(data_prep_inputs_path, "factset_isin_to_fund_table.rds") factset_iss_emissions_data_path <- file.path(data_prep_inputs_path, "factset_iss_emissions.rds") - # computed options ------------------------------------------------------------- relevant_years <- sort( unique( - market_share_target_reference_year:(market_share_target_reference_year + time_horizon) + seq( + from = market_share_target_reference_year, + to = (market_share_target_reference_year + time_horizon), + by = 1L + ) ) ) logger::log_info( "Full time horizon set to: {paste0(relevant_years, collapse = ', ')}." ) +logger::log_debug("Getting scenario data.") +logger::log_trace("Scenario data to include: {scenario_raw_data_to_include}") scenario_raw_data_to_include <- lapply(scenario_raw_data_to_include, get, envir = asNamespace("pacta.scenario.preparation")) - +logger::log_trace("Scenario data retrieved.") # check that everything is ready to go ----------------------------------------- +logger::log_debug("Checking that AI files exist.") stopifnot(file.exists(masterdata_ownership_path)) stopifnot(file.exists(masterdata_debt_path)) stopifnot(file.exists(ar_company_id__factset_entity_id_path)) +logger::log_trace("AI files exist.") -if (!update_currencies) { +if (update_currencies) { + logger::log_debug( + "update_currencies is TRUE. Skipping preflight check for currency file." + ) +} else { + logger::log_debug("Checking that currencies file exist.") stopifnot(file.exists(currencies_data_path)) + logger::log_trace("Currencies file exist.") } -if (!update_factset) { +if (update_factset) { + logger::log_debug( + "update_factset is TRUE. Skipping preflight check for FactSet files." + ) +} else { + logger::log_debug("Checking that FactSet files exist.") stopifnot(file.exists(factset_financial_data_path)) stopifnot(file.exists(factset_entity_info_path)) stopifnot(file.exists(factset_entity_financing_data_path)) stopifnot(file.exists(factset_fund_data_path)) stopifnot(file.exists(factset_isin_to_fund_table_path)) stopifnot(file.exists(factset_iss_emissions_data_path)) + logger::log_trace("FactSet files exist.") } - # pre-flight ------------------------------------------------------------------- logger::log_info("Fetching pre-flight data.") - logger::log_info("Preparing scenario data.") +logger::log_debug("Binding raw scenario data.") scenario_raw_data <- bind_rows(scenario_raw_data_to_include) +logger::log_trace("Raw scenario data bound.") # scenario values will be linearly interpolated for each group below +logger::log_debug("Setting interpolation groups.") interpolation_groups <- c( "source", "scenario", @@ -134,30 +175,52 @@ interpolation_groups <- c( "indicator", "units" ) +logger::log_trace("Interpolation groups set: {interpolation_groups}") +logger::log_debug(" + Preparing and writing scenario raw data to intermediary file: \\ + \"{scenario_regions_path}\". +") scenario_raw_data %>% pacta.scenario.preparation::interpolate_yearly(!!!rlang::syms(interpolation_groups)) %>% filter(.data$year >= .env$market_share_target_reference_year) %>% pacta.scenario.preparation::add_market_share_columns(reference_year = market_share_target_reference_year) %>% pacta.scenario.preparation::format_p4i(green_techs) %>% write_csv(scenarios_analysis_input_path, na = "") +logger::log_trace( + "Scenario raw data written: \"{scenarios_analysis_input_path}\"." +) +logger::log_debug(" + Preparing and writing scenario regions to intermediary file: \\ + \"{scenario_regions_path}\". +") pacta.scenario.preparation::scenario_regions %>% write_csv(scenario_regions_path, na = "") +logger::log_trace( + "Scenario regions written: \"{scenarios_analysis_input_path}\"." +) # web scraping ----------------------------------------------------------------- if (update_currencies) { - logger::log_info("Fetching currency data.") + logger::log_info("Fetching and writing currency data to intermediate file: \\ + \"{currencies_data_path}\". + ") pacta.data.scraping::get_currency_exchange_rates( quarter = imf_quarter_timestamp ) %>% saveRDS(currencies_data_path) + logger::log_trace( + "Currency data written: \"{currencies_data_path}\"." + ) +} else { + logger::log_info("Skipping currency data update.") } logger::log_info("Scraping index regions.") index_regions <- pacta.data.scraping::get_index_regions() - +logger::log_trace("Index regions scraped.") # pull factset data ------------------------------------------------------------ @@ -224,13 +287,18 @@ if (update_factset) { logger::log_info("Pre-flight data prepared.") - # intermediary files ----------------------------------------------------------- logger::log_info("Preparing scenario data.") +logger::log_debug(" + Reading scenario regions from intermediary file: \\ + \"{scenario_regions_path}\". +") scenario_regions <- readr::read_csv(scenario_regions_path, na = "", show_col_types = FALSE) +logger::log_trace("Scenario regions read.") +logger::log_debug("preparing factset_issue_code_bridge.") factset_issue_code_bridge <- pacta.data.preparation::factset_issue_code_bridge %>% select(issue_type_code, asset_type) %>% @@ -243,23 +311,31 @@ factset_issue_code_bridge <- TRUE ~ "Others" ) ) +logger::log_trace("factset_issue_code_bridge prepared.") +logger::log_debug("preparing factset_industry_map_bridge.") factset_industry_map_bridge <- pacta.data.preparation::factset_industry_map_bridge %>% select(factset_industry_code, pacta_sector) +logger::log_trace("factset_industry_map_bridge prepared.") # scenarios_analysisinput_inputs +logger::log_debug("Reading raw scenario data from intermediary file: \\ + \"{scenarios_analysis_input_path}\". +") scenario_raw <- readr::read_csv(scenarios_analysis_input_path, show_col_types = FALSE) +logger::log_trace("Raw scenario data read.") # filter for relevant scenario data +logger::log_debug("Filtering raw scenario data and joining geography bridge.") scenarios_long <- scenario_raw %>% inner_join( pacta.scenario.preparation::scenario_source_pacta_geography_bridge, by = c( scenario_source = "source", scenario_geography = "scenario_geography_source" - ) - ) %>% + ) + ) %>% select(-"scenario_geography") %>% rename(scenario_geography = "scenario_geography_pacta") %>% filter( @@ -270,16 +346,16 @@ scenarios_long <- scenario_raw %>% c(.env$relevant_years, .env$market_share_target_reference_year + 10) ) ) +logger::log_trace("Raw scenario data filtered and geography bridge joined.") logger::log_info("Scenario data prepared.") - # currency data output --------------------------------------------------------- -logger::log_info("Saving file: \"currencies.rds\".") +logger::log_info("Exporting file: \"currencies.rds\".") readRDS(currencies_data_path) %>% saveRDS(file.path(data_prep_outputs_path, "currencies.rds")) - +logger::log_debug("Currency data exported.") # financial data output -------------------------------------------------------- @@ -287,52 +363,62 @@ logger::log_info("Preparing financial data.") # read raw FactSet financial data, filter to unique rows, merge AR company_id, # merge PACTA sectors from AR data -logger::log_info("Formatting and saving file: \"financial_data.rds\".") +logger::log_info("Formatting and exporting file: \"financial_data.rds\".") readRDS(factset_financial_data_path) %>% pacta.data.preparation::prepare_financial_data(factset_issue_code_bridge) %>% saveRDS(file.path(data_prep_outputs_path, "financial_data.rds")) +logger::log_debug("Financial data exported.") -logger::log_info("Formatting and saving file: \"entity_financing.rds\".") +logger::log_info("Formatting and exporting file: \"entity_financing.rds\".") readRDS(factset_entity_financing_data_path) %>% saveRDS(file.path(data_prep_outputs_path, "entity_financing.rds")) +logger::log_debug("Entity financing data exported.") -logger::log_info("Formatting and saving file: \"entity_info.rds\".") +logger::log_debug("Reading AR company ID to FactSet entity ID mapping.") factset_entity_id__ar_company_id <- readr::read_csv(ar_company_id__factset_entity_id_path, col_types = "c") %>% select( factset_entity_id = "factset_id", ar_company_id = "company_id" ) +logger::log_trace("AR company ID to FactSet entity ID mapping read.") +logger::log_info("Formatting and exporting file: \"entity_info.rds\".") readRDS(factset_entity_info_path) %>% pacta.data.preparation::prepare_entity_info(factset_entity_id__ar_company_id) %>% saveRDS(file.path(data_prep_outputs_path, "entity_info.rds")) +logger::log_debug("Entity info data exported.") logger::log_info("Financial data prepared.") - # ABCD data output ------------------------------------------------------------- -logger::log_info("Preparing ABCD.") +logger::log_info("Preparing Asset Based Company Data (ABCD).") +logger::log_debug("Reading entity info.") entity_info <- readRDS(file.path(data_prep_outputs_path, "entity_info.rds")) +logger::log_trace("Entity info read.") +logger::log_debug("Preparing AR company ID to country of domicile mapping.") ar_company_id__country_of_domicile <- entity_info %>% select("ar_company_id", "country_of_domicile") %>% filter(!is.na(.data$ar_company_id)) %>% distinct() +logger::log_trace("AR company ID to country of domicile mapping prepared.") +logger::log_debug("Preparing AR company ID to credit parent mapping.") ar_company_id__credit_parent_ar_company_id <- entity_info %>% select("ar_company_id", "credit_parent_ar_company_id") %>% filter(!is.na(.data$ar_company_id)) %>% distinct() +logger::log_trace("AR company ID to credit parent mapping prepared.") +logger::log_trace("removing entity_info to clear memory.") rm(entity_info) - logger::log_info( - "Formatting and saving file: \"masterdata_ownership_datastore.rds\"." + "Formatting and exporting file: \"masterdata_ownership_datastore.rds\"." ) readr::read_csv(masterdata_ownership_path, na = "", show_col_types = FALSE) %>% pacta.data.preparation::prepare_masterdata( @@ -341,20 +427,23 @@ readr::read_csv(masterdata_ownership_path, na = "", show_col_types = FALSE) %>% zero_emission_factor_techs ) %>% saveRDS(file.path(data_prep_outputs_path, "masterdata_ownership_datastore.rds")) +logger::log_debug("Masterdata ownership exported.") - -logger::log_info( - "Formatting and saving file: \"masterdata_debt_datastore.rds\"." -) - +logger::log_debug("Reading masterdata debt.") masterdata_debt <- readr::read_csv(masterdata_debt_path, na = "", show_col_types = FALSE) +logger::log_trace("Masterdata debt read.") +logger::log_debug("Preparing AR company ID to creditor company ID mapping.") company_id__creditor_company_id <- masterdata_debt %>% select("company_id", "creditor_company_id") %>% distinct() %>% mutate(across(.cols = dplyr::everything(), .fns = as.character)) +logger::log_trace("AR company ID to creditor company ID mapping prepared.") +logger::log_info( + "Formatting and saving file: \"masterdata_debt_datastore.rds\"." +) masterdata_debt %>% pacta.data.preparation::prepare_masterdata( ar_company_id__country_of_domicile, @@ -377,35 +466,42 @@ masterdata_debt %>% .groups = "drop" ) %>% saveRDS(file.path(data_prep_outputs_path, "masterdata_debt_datastore.rds")) +logger::log_debug("Masterdata debt exported.") +logger::log_trace("removing objects to clear memory.") rm(masterdata_debt) rm(company_id__creditor_company_id) - rm(ar_company_id__country_of_domicile) rm(ar_company_id__credit_parent_ar_company_id) logger::log_info("ABCD prepared.") - # abcd_flags ------------------------------------------------------------------- logger::log_info("Preparing ABCD flags.") + +logger::log_debug("Reading financial data.") financial_data <- readRDS(file.path(data_prep_outputs_path, "financial_data.rds")) +logger::log_trace("Financial data read.") +logger::log_debug("Reading entity info.") entity_info <- readRDS(file.path(data_prep_outputs_path, "entity_info.rds")) +logger::log_trace("Entity info read.") +logger::log_debug("Preparing AR company ID to FactSet entity ID mapping.") factset_entity_id__ar_company_id <- entity_info %>% select(factset_entity_id, ar_company_id) %>% filter(!is.na(ar_company_id)) +logger::log_trace("AR company ID to FactSet entity ID mapping prepared.") +logger::log_debug("Preparing FactSet entity ID to security sector mapping.") factset_entity_id__security_mapped_sector <- entity_info %>% select(factset_entity_id, security_mapped_sector) +logger::log_trace("FactSet entity ID to security sector mapping prepared.") - -logger::log_info("Formatting and saving file: \"abcd_flags_equity.rds\".") - +logger::log_debug("Preparing AR Ownership company ID to sector mapping.") ar_company_id__sectors_with_assets__ownership <- readRDS(file.path(data_prep_outputs_path, "masterdata_ownership_datastore.rds")) %>% filter(year %in% relevant_years) %>% @@ -413,7 +509,9 @@ ar_company_id__sectors_with_assets__ownership <- distinct() %>% group_by(ar_company_id) %>% summarise(sectors_with_assets = paste(unique(ald_sector), collapse = " + ")) +logger::log_trace("AR ownership company ID to sector mapping prepared.") +logger::log_info("Formatting and exporting file: \"abcd_flags_equity.rds\".") financial_data %>% left_join(factset_entity_id__ar_company_id, by = "factset_entity_id") %>% left_join(factset_entity_id__security_mapped_sector, by = "factset_entity_id") %>% @@ -427,10 +525,9 @@ financial_data %>% sectors_with_assets ) %>% saveRDS(file.path(data_prep_outputs_path, "abcd_flags_equity.rds")) +logger::log_debug("Equity ABCD flags exported.") - -logger::log_info("Formatting and saving file: \"abcd_flags_bonds.rds\".") - +logger::log_debug("Preparing AR Debt company ID to sector mapping.") ar_company_id__sectors_with_assets__debt <- readRDS(file.path(data_prep_outputs_path, "masterdata_debt_datastore.rds")) %>% filter(year %in% relevant_years) %>% @@ -438,7 +535,9 @@ ar_company_id__sectors_with_assets__debt <- distinct() %>% group_by(ar_company_id) %>% summarise(sectors_with_assets = paste(unique(ald_sector), collapse = " + ")) +logger::log_trace("AR debt company ID to sector mapping prepared.") +logger::log_info("Formatting and exporting file: \"abcd_flags_bonds.rds\".") financial_data %>% left_join(factset_entity_id__ar_company_id, by = "factset_entity_id") %>% left_join(factset_entity_id__security_mapped_sector, by = "factset_entity_id") %>% @@ -461,29 +560,37 @@ financial_data %>% ) %>% ungroup() %>% saveRDS(file.path(data_prep_outputs_path, "abcd_flags_bonds.rds")) +logger::log_debug("Bonds ABCD flags exported.") - +logger::log_trace("removing objects to clear memory.") rm(financial_data) rm(entity_info) rm(factset_entity_id__ar_company_id) rm(factset_entity_id__security_mapped_sector) logger::log_info("ABCD flags prepared.") - # fund data output ------------------------------------------------------------- logger::log_info("Preparing fund data.") +logger::log_debug("Reading fund data.") fund_data <- readRDS(factset_fund_data_path) +logger::log_trace("Fund data read.") +logger::log_debug(" + Filtering fund data to include funds with reported holdings appoximately \\ + equal to reported market value + ") # remove funds above the threshold fund_data <- fund_data %>% group_by(factset_fund_id, fund_reported_mv) %>% filter((fund_reported_mv[[1]] - sum(holding_reported_mv)) / fund_reported_mv[[1]] > -1e-5) %>% ungroup() +logger::log_trace("Fund data filtered.") # build MISSINGWEIGHT for under and over +logger::log_debug("Building MISSINGWEIGHT for under and over.") fund_missing_mv <- fund_data %>% group_by(factset_fund_id, fund_reported_mv) %>% @@ -494,20 +601,20 @@ fund_missing_mv <- ) %>% ungroup() %>% filter(holding_reported_mv != 0) +logger::log_trace("MISSINGWEIGHT built.") +logger::log_info("Preparing and exporting file: \"fund_data.rds\".") fund_data %>% bind_rows(fund_missing_mv) %>% saveRDS(file.path(data_prep_outputs_path, "fund_data.rds")) +logger::log_debug("Fund data exported.") - -logger::log_info("Saving file: \"total_fund_list.rds\".") +logger::log_info("Preparing and exporting file: \"total_fund_list.rds\".") fund_data %>% select(factset_fund_id) %>% distinct() %>% saveRDS(file.path(data_prep_outputs_path, "total_fund_list.rds")) - - -logger::log_info("Saving file: \"isin_to_fund_table.rds\".") +logger::log_debug("Total fund list exported.") isin_to_fund_table <- readRDS(factset_isin_to_fund_table_path) @@ -531,20 +638,24 @@ isin_to_fund_table <- ungroup() %>% select(-n, -has_fund_data) +logger::log_info("Exporting file: \"isin_to_fund_table.rds\".") isin_to_fund_table %>% saveRDS(file.path(data_prep_outputs_path, "isin_to_fund_table.rds")) +logger::log_debug("ISIN to fund table exported.") +logger::log_info("Fund data prepared.") +logger::log_trace("removing objects to clear memory.") rm(fund_data) rm(isin_to_fund_table) -logger::log_info("Fund data prepared.") - - # emission data output --------------------------------------------------------- +logger::log_debug("Reading currencies data.") currencies <- readRDS(file.path(data_prep_outputs_path, "currencies.rds")) +logger::log_trace("Currencies data read.") +logger::log_debug("Preparing ISS company emissions data.") iss_company_emissions <- readRDS(factset_iss_emissions_data_path) %>% group_by(factset_entity_id) %>% @@ -553,11 +664,9 @@ iss_company_emissions <- .groups = "drop" ) %>% mutate(icc_total_emissions_units = "tCO2e") # units are defined in the ISS/FactSet documentation (see #144) +logger::log_trace("ISS company emissions data prepared.") -logger::log_info( - "Formatting and saving file: \"iss_entity_emission_intensities.rds\"." -) - +logger::log_debug("Preparing ISS entity emission intensities.") iss_entity_emission_intensities <- readRDS(factset_entity_financing_data_path) %>% left_join(currencies, by = "currency") %>% @@ -590,19 +699,24 @@ iss_entity_emission_intensities <- ff_debt, units = paste0(icc_total_emissions_units, " / ", "$ USD") ) +logger::log_trace("ISS entity emission intensities prepared.") +logger::log_info( + "Formatting and exporting file: \"iss_entity_emission_intensities.rds\"." +) saveRDS( select(iss_entity_emission_intensities, -c("ff_mkt_val", "ff_debt")), file.path(data_prep_outputs_path, "iss_entity_emission_intensities.rds") ) +logger::log_debug("ISS entity emission intensities exported.") +logger::log_debug("Reading entity info.") +factset_entity_info <- readRDS(factset_entity_info_path) +logger::log_trace("Entity info read.") logger::log_info( - "Formatting and saving file: \"iss_average_sector_emission_intensities.rds\"." + "Formatting and exporting file: \"iss_average_sector_emission_intensities.rds\"." ) - -factset_entity_info <- readRDS(factset_entity_info_path) - iss_entity_emission_intensities %>% inner_join(factset_entity_info, by = "factset_entity_id") %>% group_by(sector_code, factset_sector_desc, units) %>% @@ -621,8 +735,9 @@ iss_entity_emission_intensities %>% ) %>% ungroup() %>% saveRDS(file.path(data_prep_outputs_path, "iss_average_sector_emission_intensities.rds")) +logger::log_debug("ISS average sector emission intensities exported.") - +logger::log_trace("removing objects to clear memory.") rm(currencies) rm(iss_company_emissions) rm(iss_entity_emission_intensities) @@ -630,19 +745,27 @@ rm(factset_entity_info) logger::log_info("Emissions data prepared.") - # combined ABCD and scenarios output ------------------------------------------- logger::log_info("Preparing combined ABCD scenario output.") +logger::log_debug("Reading masterdata ownership, filtering to relevant years.") masterdata_ownership_datastore <- readRDS(file.path(data_prep_outputs_path, "masterdata_ownership_datastore.rds")) %>% filter(year %in% relevant_years) +logger::log_trace("Masterdata ownership read and filtered.") +logger::log_debug("Preparing individual equity scenario ABCD files.") for (scenario_source in unique(scenarios_long$scenario_source)) { + logger::log_debug( + "Preparing equity ABCD scenario output for source: \"{scenario_source}\"." + ) filename <- paste0("equity_abcd_scenario_", scenario_source, ".rds") + logger::log_trace("Filtering scenario data: \"{scenario_source}\".") scenarios_long_source <- filter(scenarios_long, .data$scenario_source == .env$scenario_source) - logger::log_info("Formatting and saving file: \"{filename}\".") + logger::log_info( + "Formatting and exporting scenario ABCD file: \"{filename}\"." + ) pacta.data.preparation::dataprep_abcd_scen_connection( abcd_data = masterdata_ownership_datastore, scenario_data = scenarios_long_source, @@ -658,7 +781,11 @@ for (scenario_source in unique(scenarios_long$scenario_source)) { index_regions = index_regions ) %>% saveRDS(file.path(data_prep_outputs_path, filename)) + logger::log_debug( + "equity ABCD scenario output for source: \"{scenario_source}\" exported" + ) } +logger::log_debug("Individual equity scenario ABCD files prepared.") logger::log_info("Formatting and saving file: \"equity_abcd_scenario.rds\".") list.files( @@ -669,16 +796,24 @@ list.files( lapply(readRDS) %>% bind_rows() %>% saveRDS(file.path(data_prep_outputs_path, "equity_abcd_scenario.rds")) +logger::log_debug("Equity ABCD scenario output prepared.") - +logger::log_debug("Reading masterdata debt, filtering to relevant years.") masterdata_debt_datastore <- readRDS(file.path(data_prep_outputs_path, "masterdata_debt_datastore.rds")) %>% filter(year %in% relevant_years) +logger::log_trace("Masterdata debt read and filtered.") +logger::log_debug("Preparing individual bonds scenario ABCD files.") for (scenario_source in unique(scenarios_long$scenario_source)) { + logger::log_debug( + "Preparing bonds ABCD scenario output for source: \"{scenario_source}\"." + ) filename <- paste0("bonds_abcd_scenario_", scenario_source, ".rds") scenarios_long_source <- filter(scenarios_long, .data$scenario_source == .env$scenario_source) - logger::log_info("Formatting and saving file: \"{filename}\".") + logger::log_info( + "Formatting and exporting scenario ABCD file: \"{filename}\"." + ) pacta.data.preparation::dataprep_abcd_scen_connection( abcd_data = masterdata_debt_datastore, scenario_data = scenarios_long_source, @@ -694,6 +829,9 @@ for (scenario_source in unique(scenarios_long$scenario_source)) { index_regions = index_regions ) %>% saveRDS(file.path(data_prep_outputs_path, filename)) + logger::log_debug( + "Bonds ABCD scenario output for source: \"{scenario_source}\" exported" + ) } logger::log_info("Formatting and saving file: \"bonds_abcd_scenario.rds\".") @@ -705,25 +843,32 @@ list.files( lapply(readRDS) %>% bind_rows() %>% saveRDS(file.path(data_prep_outputs_path, "bonds_abcd_scenario.rds")) +logger::log_debug("Bonds ABCD scenario output prepared.") logger::log_info("Combined ABCD scenario output prepared.") - # export SQLite versions of relevant files ------------------------------------- if (export_sqlite_files) { + logger::log_info("Exporting SQLite versions of relevant files.") + # entity_info logger::log_info("Formatting and saving file: \"entity_info.sqlite\".") + logger::log_debug("Reading entity info.") entity_info <- readRDS(file.path(data_prep_outputs_path, "entity_info.rds")) + logger::log_trace("Entity info read.") + logger::log_debug("Establishing SQLite connection.") con <- DBI::dbConnect( drv = RSQLite::SQLite(), dbname = file.path(data_prep_outputs_path, "entity_info.sqlite") ) RSQLite::sqliteSetBusyHandler(con, 3000L) + logger::log_trace("SQLite connection established.") + logger::log_debug("Writing entity info to SQLite file.") dplyr::copy_to( dest = con, df = entity_info, @@ -732,7 +877,9 @@ if (export_sqlite_files) { temporary = FALSE, indexes = list("factset_entity_id") ) + logger::log_trace("Entity info written to SQLite file.") + logger::log_debug("Closing SQLite connection and freeing memory") DBI::dbDisconnect(con) rm(entity_info) @@ -740,16 +887,20 @@ if (export_sqlite_files) { logger::log_info( "Formatting and saving file: \"equity_abcd_scenario.sqlite\"." ) - + logger::log_debug("Reading equity ABCD scenario data.") equity_abcd_scenario <- readRDS(file.path(data_prep_outputs_path, "equity_abcd_scenario.rds")) + logger::log_trace("Equity ABCD scenario data read.") + logger::log_debug("Establishing SQLite connection.") con <- DBI::dbConnect( drv = RSQLite::SQLite(), dbname = file.path(data_prep_outputs_path, "equity_abcd_scenario.sqlite") ) RSQLite::sqliteSetBusyHandler(con, 3000L) + logger::log_trace("SQLite connection established.") + logger::log_debug("Writing equity ABCD scenario data to SQLite file.") dplyr::copy_to( dest = con, df = equity_abcd_scenario, @@ -764,7 +915,9 @@ if (export_sqlite_files) { "ald_sector" ) ) + logger::log_trace("Equity ABCD scenario data written to SQLite file.") + logger::log_debug("Closing SQLite connection and freeing memory") DBI::dbDisconnect(con) rm(equity_abcd_scenario) @@ -773,15 +926,20 @@ if (export_sqlite_files) { "Formatting and saving file: \"bonds_abcd_scenario.sqlite\"." ) + logger::log_debug("Reading bonds ABCD scenario data.") bonds_abcd_scenario <- readRDS(file.path(data_prep_outputs_path, "bonds_abcd_scenario.rds")) + logger::log_trace("Bonds ABCD scenario data read.") + logger::log_debug("Establishing SQLite connection.") con <- DBI::dbConnect( drv = RSQLite::SQLite(), dbname = file.path(data_prep_outputs_path, "bonds_abcd_scenario.sqlite") ) RSQLite::sqliteSetBusyHandler(con, 3000L) + logger::log_trace("SQLite connection established.") + logger::log_debug("Writing bonds ABCD scenario data to SQLite file.") dplyr::copy_to( dest = con, df = bonds_abcd_scenario, @@ -796,23 +954,30 @@ if (export_sqlite_files) { "ald_sector" ) ) + logger::log_trace("Bonds ABCD scenario data written to SQLite file.") + logger::log_debug("Closing SQLite connection and freeing memory") DBI::dbDisconnect(con) rm(bonds_abcd_scenario) +} else { + logger::log_info("Skipping SQLite file export.") } - # manifests of input and output file ------------------------------------------- logger::log_info("Formatting and saving file: \"manifest.json\".") +# get the last update date of the ent_entity_affiliates table +logger::log_debug("Reading ent_entity_affiliates last update.") ent_entity_affiliates_last_update <- readRDS(factset_entity_info_path) %>% filter(!is.na(ent_entity_affiliates_last_update)) %>% pull(ent_entity_affiliates_last_update) %>% unique() +logger::log_trace("ent_entity_affiliates last update read.") # include PACTA packages NEWS.md test in the parameters to export +logger::log_debug("Reading NEWS.md files from relevant PACTA packages.") pacta_packages <- c("pacta.data.preparation", "pacta.scenario.preparation") package_news <- vapply( @@ -824,7 +989,9 @@ package_news <- FUN.VALUE = list(1), USE.NAMES = TRUE ) +logger::log_trace("NEWS.md files read.") +logger::log_debug("Preparing metadata parameters.") parameters <- list( input_filepaths = list( @@ -875,28 +1042,55 @@ parameters <- update_factset = update_factset, package_news = package_news ) +logger::log_trace("Metadata parameters prepared.") +logger::log_debug("Writing manifest file.") pacta.data.preparation::write_manifest( path = file.path(data_prep_outputs_path, "manifest.json"), parameters = parameters, data_prep_inputs_path = data_prep_inputs_path, data_prep_outputs_path = data_prep_outputs_path ) +logger::log_trace("Manifest file written.") - -# copy in NEWs.md files from relevant PACTA packages --------------------------- +# copy in NEWS.md files from relevant PACTA packages --------------------------- logger::log_info("Copying NEWS.md files from relevant PACTA packages.") - # `pacta_packages` defined above to add NEWS text to manifest for (pkg_name in pacta_packages) { + logger::log_debug("Copying NEWS.md file from package: \"{pkg_name}\".") file.copy( system.file("NEWS.md", package = pkg_name), to = file.path(data_prep_outputs_path, paste0(pkg_name, "-NEWS.md")) ) + logger::log_trace("NEWS.md file copied.") } - # ------------------------------------------------------------------------------ +# Create tar file if requested +if (create_tar) { + logger::log_info("Creating tar file.") + tar_file_path <- file.path( + data_prep_outputs_path, + paste0(basename(data_prep_outputs_path), ".tar.gz") + ) + logger::log_trace("Tar file path: \"{tar_file_path}\".") + system2( + command = "tar", + args = c( + "--create", + "--exclude-backups", + "--exclude-vcs", + "--gzip", + "--verbose", + "-C", dirname(data_prep_outputs_path), + paste0("--file=", tar_file_path), + basename(data_prep_outputs_path) + ) + ) + logger::log_info("Tar file created at ", tar_file_path) +} + + logger::log_info("PACTA Data Preparation Complete.")