From fff7e4b3206ad58b40cd1c8196b488327e52c336 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Mon, 18 Nov 2024 22:42:24 +0000 Subject: [PATCH 01/17] add model_runs as additional hierarchy --- pipelines/forecast_state.py | 4 ++-- pipelines/prep_data.py | 14 ++++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pipelines/forecast_state.py b/pipelines/forecast_state.py index 5ba898c9..21f365b1 100644 --- a/pipelines/forecast_state.py +++ b/pipelines/forecast_state.py @@ -180,7 +180,7 @@ def main( model_batch_dir = Path(output_data_dir, model_batch_dir_name) - model_run_dir = Path(model_batch_dir, state) + model_run_dir = Path(model_batch_dir, "model_runs", state) os.makedirs(model_run_dir, exist_ok=True) @@ -198,7 +198,7 @@ def main( first_training_date=first_training_date, last_training_date=last_training_date, param_estimates=param_estimates, - model_batch_dir=model_batch_dir, + model_run_dir=model_run_dir, logger=logger, ) logger.info("Data preparation complete.") diff --git a/pipelines/prep_data.py b/pipelines/prep_data.py index b7810b76..ed815856 100644 --- a/pipelines/prep_data.py +++ b/pipelines/prep_data.py @@ -219,7 +219,7 @@ def process_and_save_state( first_training_date: datetime.date, last_training_date: datetime.date, param_estimates: pl.LazyFrame, - model_batch_dir: Path, + model_run_dir: Path, logger: Logger = None, facility_level_nssp_data: pl.LazyFrame = None, state_level_nssp_data: pl.LazyFrame = None, @@ -333,13 +333,15 @@ def process_and_save_state( "right_truncation_offset": right_truncation_offset, } - state_dir = os.path.join(model_batch_dir, state_abb) - os.makedirs(state_dir, exist_ok=True) + os.makedirs(model_run_dir, exist_ok=True) - logger.info(f"Saving {state_abb} to {state_dir}") - data_to_save.write_csv(Path(state_dir, "data.csv")) + if logger is not None: + logger.info(f"Saving {state_abb} to {model_run_dir}") + data_to_save.write_csv(Path(model_run_dir, "data.csv")) - with open(Path(state_dir, "data_for_model_fit.json"), "w") as json_file: + with open( + Path(model_run_dir, "data_for_model_fit.json"), "w" + ) as json_file: json.dump(data_for_model_fit, json_file) return None From 88e117e3de5235c2f1d966994662758b4670d12d Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Tue, 19 Nov 2024 00:11:04 +0000 Subject: [PATCH 02/17] fix default tag --- pipelines/batch/setup_test_prod_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/batch/setup_test_prod_job.py b/pipelines/batch/setup_test_prod_job.py index c406c90b..987139e4 100644 --- a/pipelines/batch/setup_test_prod_job.py +++ b/pipelines/batch/setup_test_prod_job.py @@ -18,7 +18,7 @@ description="Test production pipeline on small subset of locations" ) parser.add_argument( - "tag", + "--tag", type=str, help="The tag name to use for the container image version", default=Path(Repository(os.getcwd()).head.name).stem, From dd92acc608ae6550ab079b48f9174a217d34d0b5 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Tue, 19 Nov 2024 18:04:29 +0000 Subject: [PATCH 03/17] fix path in postprocess_state_forecast.R --- pipelines/postprocess_state_forecast.R | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pipelines/postprocess_state_forecast.R b/pipelines/postprocess_state_forecast.R index 747cca34..69de4616 100644 --- a/pipelines/postprocess_state_forecast.R +++ b/pipelines/postprocess_state_forecast.R @@ -273,10 +273,15 @@ model_run_dir <- path(argv$model_run_dir) base_dir <- path_dir(model_run_dir) -disease_name_raw <- base_dir |> - path_file() |> +# replace this with functionality from hewr +disease_name_raw <- model_run_dir |> + path_split() |> + pluck(1) |> + tail(3) |> + head(1) |> str_extract("^.+(?=_r_)") + disease_name_nssp <- unname(disease_name_nssp_map[disease_name_raw]) disease_name_pretty <- unname(disease_name_formatter[disease_name_raw]) From 776af6c3bfbf7f70611793aedfdb3fd49dd3eb14 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Tue, 19 Nov 2024 18:11:41 +0000 Subject: [PATCH 04/17] remove unused base_dir definition --- pipelines/postprocess_state_forecast.R | 2 -- 1 file changed, 2 deletions(-) diff --git a/pipelines/postprocess_state_forecast.R b/pipelines/postprocess_state_forecast.R index 69de4616..fc2c4567 100644 --- a/pipelines/postprocess_state_forecast.R +++ b/pipelines/postprocess_state_forecast.R @@ -271,8 +271,6 @@ p <- arg_parser("Generate forecast figures") |> argv <- parse_args(p) model_run_dir <- path(argv$model_run_dir) -base_dir <- path_dir(model_run_dir) - # replace this with functionality from hewr disease_name_raw <- model_run_dir |> path_split() |> From 7a9306a4b466b1b9ca6bbb6b3f3d570042c79567 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Tue, 19 Nov 2024 18:12:32 +0000 Subject: [PATCH 05/17] correct_path in timeseries_forecasts --- pipelines/timeseries_forecasts.R | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pipelines/timeseries_forecasts.R b/pipelines/timeseries_forecasts.R index 50f92290..2edfb378 100644 --- a/pipelines/timeseries_forecasts.R +++ b/pipelines/timeseries_forecasts.R @@ -250,10 +250,12 @@ disease_name_nssp_map <- c( "influenza" = "Influenza" ) -base_dir <- path_dir(model_run_dir) - -disease_name_raw <- base_dir |> - path_file() |> +# replace this with functionality from hewr +disease_name_raw <- model_run_dir |> + path_split() |> + pluck(1) |> + tail(3) |> + head(1) |> str_extract("^.+(?=_r_)") disease_name_nssp <- unname(disease_name_nssp_map[disease_name_raw]) From 915284e2886e78e3e199e53d8c8b8eb0ca7c41bc Mon Sep 17 00:00:00 2001 From: Samuel Brand <48288458+SamuelBrand1@users.noreply.github.com> Date: Tue, 19 Nov 2024 15:44:55 +0000 Subject: [PATCH 06/17] Issue 137: unify argument patterns (#138) --- pipelines/build_model.py | 2 +- pipelines/fit_model.py | 2 +- pipelines/generate_predictive.py | 2 +- pipelines/iteration_helpers/loop_fit.sh | 2 +- pipelines/iteration_helpers/loop_generate_predictive.sh | 2 +- pipelines/iteration_helpers/loop_postprocess.sh | 2 +- pipelines/score_forecast.R | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pipelines/build_model.py b/pipelines/build_model.py index 5450cf46..148bddbd 100644 --- a/pipelines/build_model.py +++ b/pipelines/build_model.py @@ -48,7 +48,7 @@ def build_model_from_dir(model_dir): - 1 ) - priors = runpy.run_path(prior_path) + priors = runpy.run_path(str(prior_path)) right_truncation_offset = model_data["right_truncation_offset"] diff --git a/pipelines/fit_model.py b/pipelines/fit_model.py index cdeb7250..9ecefd2a 100644 --- a/pipelines/fit_model.py +++ b/pipelines/fit_model.py @@ -54,7 +54,7 @@ def fit_and_save_model( description="Fit the hospital-only wastewater model." ) parser.add_argument( - "--model_run_dir", + "model_run_dir", type=Path, help=( "Path to a directory containing model fitting data. " diff --git a/pipelines/generate_predictive.py b/pipelines/generate_predictive.py index d16e4733..5a0d82d3 100644 --- a/pipelines/generate_predictive.py +++ b/pipelines/generate_predictive.py @@ -53,7 +53,7 @@ def generate_and_save_predictions( description=("Do posterior prediction from a pyrenew-hew fit.") ) parser.add_argument( - "model-run-dir", + "model_run_dir", type=Path, help=( "Path to a directory containing the model fitting data " diff --git a/pipelines/iteration_helpers/loop_fit.sh b/pipelines/iteration_helpers/loop_fit.sh index 4ff7f1cf..837d5cf0 100755 --- a/pipelines/iteration_helpers/loop_fit.sh +++ b/pipelines/iteration_helpers/loop_fit.sh @@ -13,5 +13,5 @@ BASE_DIR="$1" for SUBDIR in "$BASE_DIR"/*/; do # Run the Python script with the current subdirectory as the model_dir argument echo "$SUBDIR" - python fit_model.py --model_run_dir "$SUBDIR" + python fit_model.py "$SUBDIR" done diff --git a/pipelines/iteration_helpers/loop_generate_predictive.sh b/pipelines/iteration_helpers/loop_generate_predictive.sh index 7c0aac22..b2510d21 100755 --- a/pipelines/iteration_helpers/loop_generate_predictive.sh +++ b/pipelines/iteration_helpers/loop_generate_predictive.sh @@ -14,5 +14,5 @@ BASE_DIR="$1" for SUBDIR in "$BASE_DIR"/*/; do # Run the Python script with the current subdirectory as the model_dir argument echo "$SUBDIR" - python generate_predictive.py --model_dir "$SUBDIR" --n_forecast_points 28 + python generate_predictive.py "$SUBDIR" --n-forecast-points 28 done diff --git a/pipelines/iteration_helpers/loop_postprocess.sh b/pipelines/iteration_helpers/loop_postprocess.sh index 7a25d032..215d1a00 100755 --- a/pipelines/iteration_helpers/loop_postprocess.sh +++ b/pipelines/iteration_helpers/loop_postprocess.sh @@ -13,5 +13,5 @@ BASE_DIR="$1" for SUBDIR in "$BASE_DIR"/*/; do # Run the R script with the current subdirectory as the model_dir argument echo "$SUBDIR" - Rscript postprocess_state_forecast.R --model-run-dir "$SUBDIR" + Rscript postprocess_state_forecast.R "$SUBDIR" done diff --git a/pipelines/score_forecast.R b/pipelines/score_forecast.R index 9bf90e6f..62afd9a9 100644 --- a/pipelines/score_forecast.R +++ b/pipelines/score_forecast.R @@ -236,7 +236,7 @@ read_and_score_location <- function(model_run_dir, # Create a parser p <- arg_parser("Score a single location forecast") |> add_argument( - "model-run-dir", + "model_run_dir", help = "Directory containing the model data and output." ) From ac4d25cd55478c10b7d7b52917c356e8f0d5fb2a Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Tue, 19 Nov 2024 13:13:30 -0600 Subject: [PATCH 07/17] add check to all subprocess commands (#143) --- pipelines/forecast_state.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pipelines/forecast_state.py b/pipelines/forecast_state.py index 21f365b1..f09f258f 100644 --- a/pipelines/forecast_state.py +++ b/pipelines/forecast_state.py @@ -29,7 +29,8 @@ def baseline_forecasts( f"{n_forecast_days}", "--n-samples", f"{n_samples}", - ] + ], + check=True, ) return None @@ -40,7 +41,8 @@ def convert_inferencedata_to_parquet(model_run_dir: Path) -> None: "Rscript", "pipelines/convert_inferencedata_to_parquet.R", f"{model_run_dir}", - ] + ], + check=True, ) return None @@ -51,7 +53,8 @@ def postprocess_forecast(model_run_dir: Path) -> None: "Rscript", "pipelines/postprocess_state_forecast.R", f"{model_run_dir}", - ] + ], + check=True, ) return None @@ -62,7 +65,8 @@ def score_forecast(model_run_dir: Path) -> None: "Rscript", "pipelines/score_forecast.R", f"{model_run_dir}", - ] + ], + check=True, ) return None From 546046314a3a1a2e0134869a602aa9b9234c10bc Mon Sep 17 00:00:00 2001 From: "Dylan H. Morris" Date: Tue, 19 Nov 2024 14:52:56 -0500 Subject: [PATCH 08/17] Organize helper functions / utilities (#141) --- hewr/DESCRIPTION | 4 + hewr/NAMESPACE | 6 + hewr/R/directory_utils.R | 127 +++++++++++++++ hewr/R/parse_path.R | 63 -------- hewr/R/to_epiweekly_quantile_table.R | 155 ++++++++++++++++++ hewr/man/disease_map_lower.Rd | 18 +++ hewr/man/get_all_model_batch_dirs.Rd | 24 +++ hewr/man/parse_model_batch_dir_path.Rd | 23 +++ hewr/man/parse_model_run_dir_path.Rd | 22 +++ hewr/man/to_epiweekly_quantile_table.Rd | 24 +++ hewr/man/to_epiweekly_quantiles.Rd | 30 ++++ hewr/tests/testthat.R | 12 ++ hewr/tests/testthat/test_directory_utils.R | 177 +++++++++++++++++++++ pipelines/collate_plots.py | 28 ++-- pipelines/collate_score_tables.R | 33 +--- pipelines/create_hubverse_table.R | 159 +++--------------- pipelines/utils.py | 41 +++++ 17 files changed, 702 insertions(+), 244 deletions(-) create mode 100644 hewr/R/directory_utils.R delete mode 100644 hewr/R/parse_path.R create mode 100644 hewr/R/to_epiweekly_quantile_table.R create mode 100644 hewr/man/disease_map_lower.Rd create mode 100644 hewr/man/get_all_model_batch_dirs.Rd create mode 100644 hewr/man/parse_model_batch_dir_path.Rd create mode 100644 hewr/man/parse_model_run_dir_path.Rd create mode 100644 hewr/man/to_epiweekly_quantile_table.Rd create mode 100644 hewr/man/to_epiweekly_quantiles.Rd create mode 100644 hewr/tests/testthat.R create mode 100644 hewr/tests/testthat/test_directory_utils.R diff --git a/hewr/DESCRIPTION b/hewr/DESCRIPTION index e961c674..b8d64bc8 100644 --- a/hewr/DESCRIPTION +++ b/hewr/DESCRIPTION @@ -39,3 +39,7 @@ Imports: urca Remotes: https://github.com/cdcgov/forecasttools +Suggests: + testthat (>= 3.0.0), + withr +Config/testthat/edition: 3 diff --git a/hewr/NAMESPACE b/hewr/NAMESPACE index e651b944..a8b620d1 100644 --- a/hewr/NAMESPACE +++ b/hewr/NAMESPACE @@ -1 +1,7 @@ # Generated by roxygen2: do not edit by hand + +export(get_all_model_batch_dirs) +export(parse_model_batch_dir_path) +export(parse_model_run_dir_path) +export(to_epiweekly_quantile_table) +export(to_epiweekly_quantiles) diff --git a/hewr/R/directory_utils.R b/hewr/R/directory_utils.R new file mode 100644 index 00000000..b8bf8cae --- /dev/null +++ b/hewr/R/directory_utils.R @@ -0,0 +1,127 @@ +#' Utilities for handling and parsing directory names +#' based on pyrenew-hew pipeline conventions. + +disease_map_lower <- c( + "covid-19" = "COVID-19", + "influenza" = "Influenza" +) + +#' Parse model batch directory name. +#' +#' Parse the name of a model batch directory +#' (i.e. a directory representing a single +#' report date and disease pair, but potentially +#' with fits for multiple locations), returning +#' a named list of quantities of interest. +#' +#' @param model_batch_dir_path Path to the model batch +#' directory to parse. Will parse only the basename. +#' @return A list of quantities: `disease`, `report_date`, +#' `first_training_date`, and `last_training_date`. +#' @export +parse_model_batch_dir_path <- function(model_batch_dir_path) { + pattern <- "(.+)_r_(.+)_f_(.+)_t_(.+)" + model_batch_dir_name <- fs::path_file(model_batch_dir_path) + matches <- stringr::str_match( + model_batch_dir_name, + pattern + ) + + if (any(is.na(matches))) { + stop( + "Invalid format for model batch directory name; ", + "could not parse. Expected ", + "'_r__f__t_", + "'." + ) + } + + result <- list( + disease = disease_map_lower[matches[2]] |> unname(), + # disease_map_lower + # is a named vector + # but we want disease + # just to be a string + report_date = lubridate::ymd(matches[3], quiet = TRUE), + first_training_date = lubridate::ymd(matches[4], quiet = TRUE), + last_training_date = lubridate::ymd(matches[5], quiet = TRUE) + ) + + if (any(is.na(result))) { + stop( + "Could not parse extracted disease and/or date ", + "values expected 'disease' to be one of 'covid-19' ", + "and 'influenza' and all dates to be valid dates in ", + "YYYY-MM-DD format. Got: ", + glue::glue( + "disease: {matches[2]}, ", + "report_date: {matches[3]}, ", + "first_training_date: {matches[4]}, ", + "last_training_date: {matches[5]}." + ) + ) + } + + return(result) +} + +#' Parse model run directory path. +#' +#' Parse path to a model run directory +#' (i.e. a directory representing a run for a +#' particular location, disease, and reference +#' date, and extract key quantities of interest. +#' +#' @param model_run_dir_path Path to parse. +#' @return A list of parsed attributes: +#' `location`, `disease`, `report_date`, +#' `first_training_date`, and `last_training_date`. +#' +#' @export +parse_model_run_dir_path <- function(model_run_dir_path) { + batch_dir <- fs::path_dir(model_run_dir_path) |> + fs::path_file() + location <- fs::path_file(model_run_dir_path) + + return(c( + location = location, + parse_model_batch_dir_path(batch_dir) + )) +} + + +#' Get forecast directories. +#' +#' Get all the subdirectories within a parent directory +#' that match the pattern for a forecast run for a +#' given disease and optionally a given report date. +#' +#' @param dir_of_batch_dirs Directory in which to look for +#' "model batch" directories, each of which represents an +#' individual forecast date / pathogen / dataset combination. +#' @param diseases Names of the diseases to match, as a vector of strings, +#' or a single disease as a string. +#' @return A vector of paths to the forecast subdirectories. +#' @export +get_all_model_batch_dirs <- function(dir_of_batch_dirs, + diseases) { + # disease names are lowercase by convention + match_patterns <- stringr::str_c(tolower(diseases), + "_r", + collapse = "|" + ) + + dirs <- tibble::tibble( + dir_path = fs::dir_ls( + dir_of_batch_dirs, + type = "directory" + ) + ) |> + dplyr::filter(stringr::str_starts( + fs::path_file(dir_path), + match_patterns + )) |> + dplyr::pull(dir_path) + + return(dirs) +} diff --git a/hewr/R/parse_path.R b/hewr/R/parse_path.R deleted file mode 100644 index 0629214c..00000000 --- a/hewr/R/parse_path.R +++ /dev/null @@ -1,63 +0,0 @@ -disease_map_lower <- list( - "covid-19" = "COVID-19", - "influenza" = "Influenza" -) - -#' Parse the name of a model batch directory -#' (i.e. a directory representing a single -#' report date and disease pair, but potentially -#' with fits for multiple locations), returning -#' a named list of quantities of interest. -#' -#' @param model_batch_dir_name Name of the model batch -#' directory (not the full path to it, just the directory -#' base name) to parse. -#' @return A list of quantities: `disease`, `report_date`, -#' `first_training_date`, and `last_training_date`. -#' @export -parse_model_batch_dir <- function(model_batch_dir_name) { - pattern <- "(.+)_r_(.+)_f_(.+)_t_(.+)" - - matches <- stringr::str_match( - model_batch_dir_name, - pattern - ) - - if (is.na(matches[1])) { - stop( - "Invalid format for model batch directory name; ", - "could not parse. Expected ", - "'_r__f__t_", - "'." - ) - } - - return(list( - disease = disease_map_lower[[matches[2]]], - report_date = lubridate::ymd(matches[3]), - first_training_date = lubridate::ymd(matches[4]), - last_training_date = lubridate::ymd(matches[5]) - )) -} - -#' Parse path to a model run directory -#' (i.e. a directory representing a run for a -#' particular location, disease, and reference -#' date, and extract key quantities of interest. -#' -#' @param model_run_dir_path Path to parse. -#' @return A list of parsed attributes: -#' `location`, `disease`, `report_date`, -#' `first_training_date`, and `last_training_date`. -#' -#' @export -parse_model_run_dir <- function(model_run_dir_path) { - batch_dir <- fs::path_dir(model_run_dir_path) |> - fs::path_file() - location <- fs::path_file(model_run_dir_path) - - return(c( - list(location = location), - parse_model_batch_dir(batch_dir) - )) -} diff --git a/hewr/R/to_epiweekly_quantile_table.R b/hewr/R/to_epiweekly_quantile_table.R new file mode 100644 index 00000000..9e9149f4 --- /dev/null +++ b/hewr/R/to_epiweekly_quantile_table.R @@ -0,0 +1,155 @@ +#' Read in daily forecast draws from a model run directory +#' and output a set of epiweekly quantiles, as a +#' [`tibbble`][tibble::tibble()]. +#' +#' @param model_run_dir Path to a directory containing +#' forecast draws to process, whose basename is the forecasted +#' location. +#' @param report_date Report date for which to generate epiweekly quantiles. +#' @param max_lookback_days How many days before the report date +#' to look back when generating epiweekly quantiles (determines how +#' many negative epiweekly forecast horizons (i.e. nowcast/backcast) +#' quantiles will be generated. +#' @return A [`tibble`][tibble::tibble()] of quantiles. +#' @export +to_epiweekly_quantiles <- function(model_run_dir, + report_date, + max_lookback_days) { + message(glue::glue("Processing {model_run_dir}...")) + draws_path <- fs::path(model_run_dir, + "forecast_samples", + ext = "parquet" + ) + location <- fs::path_file(model_run_dir) + + draws <- arrow::read_parquet(draws_path) |> + dplyr::filter(.data$date >= lubridate::ymd(!!report_date) - + lubridate::days(!!max_lookback_days)) + + if (nrow(draws) < 1) { + return(NULL) + } + + epiweekly_disease_draws <- draws |> + dplyr::filter( + disease == "Disease" + ) |> + forecasttools::daily_to_epiweekly( + date_col = "date", + value_col = ".value", + id_cols = ".draw", + weekly_value_name = "epiweekly_disease", + strict = TRUE + ) + + epiweekly_total_draws <- draws |> + dplyr::filter(.data$disease == "Other") |> + forecasttools::daily_to_epiweekly( + date_col = "date", + value_col = ".value", + id_cols = ".draw", + weekly_value_name = "epiweekly_total", + strict = TRUE + ) + + epiweekly_prop_draws <- dplyr::inner_join( + epiweekly_disease_draws, + epiweekly_total_draws, + by = c( + "epiweek", + "epiyear", + ".draw" + ) + ) |> + dplyr::mutate( + "epiweekly_proportion" = + .data$epiweekly_disease / .data$epiweekly_total + ) + + + epiweekly_quantiles <- epiweekly_prop_draws |> + forecasttools::trajectories_to_quantiles( + timepoint_cols = c("epiweek", "epiyear"), + value_col = "epiweekly_proportion" + ) |> + dplyr::mutate( + "location" = !!location + ) + + message(glue::glue("Done processing {model_run_dir}")) + return(epiweekly_quantiles) +} + +#' Create an epiweekly hubverse-format forecast quantile table +#' from a model batch directory containing forecasts +#' for multiple locations as daily MCMC draws. +#' +#' @param model_batch_dir Model batch directory containing +#' the individual location forecast directories +#' ("model run directories") to process. Name should be in the format +#' `{disease}_r_{reference_date}_f_{first_data_date}_t_{last_data_date}`. +#' @param exclude Locations to exclude, if any, as a list of strings. +#' Default `NULL` (exclude nothing). +#' +#' @export +to_epiweekly_quantile_table <- function(model_batch_dir, + exclude = NULL) { + locations_to_process <- fs::dir_ls(model_batch_dir, + type = "directory" + ) + + if (!is.null(exclude)) { + locations_to_process <- locations_to_process[ + !(fs::path_file(locations_to_process) %in% exclude) + ] + } + + batch_params <- hewr::parse_model_batch_dir_path( + model_batch_dir + ) + report_date <- batch_params$report_date + disease <- batch_params$disease + disease_abbr <- dplyr::case_when( + disease == "Influenza" ~ "flu", + disease == "COVID-19" ~ "covid", + TRUE ~ disease + ) + + report_epiweek <- lubridate::epiweek(report_date) + report_epiyear <- lubridate::epiyear(report_date) + report_epiweek_end <- forecasttools::epiweek_to_date( + report_epiweek, + report_epiyear, + day_of_week = 7 + ) + + hubverse_table <- purrr::map( + locations_to_process, + \(x) { + to_epiweekly_quantiles( + x, + report_date = report_date, + max_lookback_days = 8 + ) + } + ## ensures we get the full -1 horizon but do not + ## waste time quantilizing draws that will not be + ## included in the final table. + ) |> + dplyr::bind_rows() |> + forecasttools::get_hubverse_table( + report_epiweek_end, + target_name = + glue::glue("wk inc {disease_abbr} prop ed visits") + ) |> + dplyr::arrange( + .data$target, + .data$output_type, + .data$location, + .data$reference_date, + .data$horizon, + .data$output_type_id + ) + + return(hubverse_table) +} diff --git a/hewr/man/disease_map_lower.Rd b/hewr/man/disease_map_lower.Rd new file mode 100644 index 00000000..dbf5de27 --- /dev/null +++ b/hewr/man/disease_map_lower.Rd @@ -0,0 +1,18 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/directory_utils.R +\docType{data} +\name{disease_map_lower} +\alias{disease_map_lower} +\title{Utilities for handling and parsing directory names +based on pyrenew-hew pipeline conventions.} +\format{ +An object of class \code{list} of length 2. +} +\usage{ +disease_map_lower +} +\description{ +Utilities for handling and parsing directory names +based on pyrenew-hew pipeline conventions. +} +\keyword{datasets} diff --git a/hewr/man/get_all_model_batch_dirs.Rd b/hewr/man/get_all_model_batch_dirs.Rd new file mode 100644 index 00000000..4997d907 --- /dev/null +++ b/hewr/man/get_all_model_batch_dirs.Rd @@ -0,0 +1,24 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/directory_utils.R +\name{get_all_model_batch_dirs} +\alias{get_all_model_batch_dirs} +\title{Get forecast directories.} +\usage{ +get_all_model_batch_dirs(dir_of_batch_dirs, diseases) +} +\arguments{ +\item{dir_of_batch_dirs}{Directory in which to look for +"model batch" directories, each of which represents an +individual forecast date / pathogen / dataset combination.} + +\item{diseases}{Names of the diseases to match, as a vector of strings, +or a single disease as a string.} +} +\value{ +A vector of paths to the forecast subdirectories. +} +\description{ +Get all the subdirectories within a parent directory +that match the pattern for a forecast run for a +given disease and optionally a given report date. +} diff --git a/hewr/man/parse_model_batch_dir_path.Rd b/hewr/man/parse_model_batch_dir_path.Rd new file mode 100644 index 00000000..a247b597 --- /dev/null +++ b/hewr/man/parse_model_batch_dir_path.Rd @@ -0,0 +1,23 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/directory_utils.R +\name{parse_model_batch_dir_path} +\alias{parse_model_batch_dir_path} +\title{Parse model batch directory name.} +\usage{ +parse_model_batch_dir_path(model_batch_dir_path) +} +\arguments{ +\item{model_batch_dir_path}{Path to the model batch +directory to parse. Will parse only the basename.} +} +\value{ +A list of quantities: \code{disease}, \code{report_date}, +\code{first_training_date}, and \code{last_training_date}. +} +\description{ +Parse the name of a model batch directory +(i.e. a directory representing a single +report date and disease pair, but potentially +with fits for multiple locations), returning +a named list of quantities of interest. +} diff --git a/hewr/man/parse_model_run_dir_path.Rd b/hewr/man/parse_model_run_dir_path.Rd new file mode 100644 index 00000000..4795c6a3 --- /dev/null +++ b/hewr/man/parse_model_run_dir_path.Rd @@ -0,0 +1,22 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/directory_utils.R +\name{parse_model_run_dir_path} +\alias{parse_model_run_dir_path} +\title{Parse model run directory path.} +\usage{ +parse_model_run_dir_path(model_run_dir_path) +} +\arguments{ +\item{model_run_dir_path}{Path to parse.} +} +\value{ +A list of parsed attributes: +\code{location}, \code{disease}, \code{report_date}, +\code{first_training_date}, and \code{last_training_date}. +} +\description{ +Parse path to a model run directory +(i.e. a directory representing a run for a +particular location, disease, and reference +date, and extract key quantities of interest. +} diff --git a/hewr/man/to_epiweekly_quantile_table.Rd b/hewr/man/to_epiweekly_quantile_table.Rd new file mode 100644 index 00000000..56d017c1 --- /dev/null +++ b/hewr/man/to_epiweekly_quantile_table.Rd @@ -0,0 +1,24 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/to_epiweekly_quantile_table.R +\name{to_epiweekly_quantile_table} +\alias{to_epiweekly_quantile_table} +\title{Create an epiweekly hubverse-format forecast quantile table +from a model batch directory containing forecasts +for multiple locations as daily MCMC draws.} +\usage{ +to_epiweekly_quantile_table(model_batch_dir, exclude = NULL) +} +\arguments{ +\item{model_batch_dir}{Model batch directory containing +the individual location forecast directories +("model run directories") to process. Name should be in the format +\verb{\{disease\}_r_\{reference_date\}_f_\{first_data_date\}_t_\{last_data_date\}}.} + +\item{exclude}{Locations to exclude, if any, as a list of strings. +Default \code{NULL} (exclude nothing).} +} +\description{ +Create an epiweekly hubverse-format forecast quantile table +from a model batch directory containing forecasts +for multiple locations as daily MCMC draws. +} diff --git a/hewr/man/to_epiweekly_quantiles.Rd b/hewr/man/to_epiweekly_quantiles.Rd new file mode 100644 index 00000000..47738ad5 --- /dev/null +++ b/hewr/man/to_epiweekly_quantiles.Rd @@ -0,0 +1,30 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/to_epiweekly_quantile_table.R +\name{to_epiweekly_quantiles} +\alias{to_epiweekly_quantiles} +\title{Read in daily forecast draws from a model run directory +and output a set of epiweekly quantiles, as a +\code{\link[tibble:tibble]{tibbble}}.} +\usage{ +to_epiweekly_quantiles(model_run_dir, report_date, max_lookback_days) +} +\arguments{ +\item{model_run_dir}{Path to a directory containing +forecast draws to process, whose basename is the forecasted +location.} + +\item{report_date}{Report date for which to generate epiweekly quantiles.} + +\item{max_lookback_days}{How many days before the report date +to look back when generating epiweekly quantiles (determines how +many negative epiweekly forecast horizons (i.e. nowcast/backcast) +quantiles will be generated.} +} +\value{ +A \code{\link[tibble:tibble]{tibble}} of quantiles. +} +\description{ +Read in daily forecast draws from a model run directory +and output a set of epiweekly quantiles, as a +\code{\link[tibble:tibble]{tibbble}}. +} diff --git a/hewr/tests/testthat.R b/hewr/tests/testthat.R new file mode 100644 index 00000000..588af7fd --- /dev/null +++ b/hewr/tests/testthat.R @@ -0,0 +1,12 @@ +# This file is part of the standard setup for testthat. +# It is recommended that you do not modify it. +# +# Where should you do additional test configuration? +# Learn more about the roles of various files in: +# * https://r-pkgs.org/testing-design.html#sec-tests-files-overview +# * https://testthat.r-lib.org/articles/special-files.html + +library(testthat) +library(hewr) + +test_check("hewr") diff --git a/hewr/tests/testthat/test_directory_utils.R b/hewr/tests/testthat/test_directory_utils.R new file mode 100644 index 00000000..17f30e37 --- /dev/null +++ b/hewr/tests/testthat/test_directory_utils.R @@ -0,0 +1,177 @@ +valid_model_batch_dirs <- list( + list( + dirname = "covid-19_r_2024-02-03_f_2021-04-01_t_2024-01-23", + expected = list( + disease = "COVID-19", + report_date = lubridate::ymd("2024-02-03"), + first_training_date = lubridate::ymd("2021-04-1"), + last_training_date = lubridate::ymd("2024-01-23") + ) + ), + list( + dirname = "influenza_r_2022-12-11_f_2021-02-05_t_2027-12-30", + expected = list( + disease = "Influenza", + report_date = lubridate::ymd("2022-12-11"), + first_training_date = lubridate::ymd("2021-02-5"), + last_training_date = lubridate::ymd("2027-12-30") + ) + ) +) + +invalid_model_batch_dirs <- c( + "qcovid-19_r_2024-02-03_f_2021-04-01_t_2024-01-23", + "influenza_r_2022-12-33_f_2021-02-05_t_2027-12-30" +) + +to_valid_run_dir <- function(valid_batch_dir_entry, location) { + x <- valid_batch_dir_entry + x$dirpath <- fs::path(x$dirname, location) + x$expected <- c( + location = location, + x$expected + ) + return(x) +} + +valid_model_run_dirs <- c( + lapply( + valid_model_batch_dirs, to_valid_run_dir, + location = "ME" + ), + lapply( + valid_model_batch_dirs, to_valid_run_dir, + location = "US" + ) +) + + +test_that("parse_model_batch_dir_path() works as expected.", { + for (valid_pair in valid_model_batch_dirs) { + ## should work with base dirnames that are valid + expect_equal( + parse_model_batch_dir_path(valid_pair$dirname), + valid_pair$expected + ) + + ## should work identically with a full path rather + ## than just base dir + also_valid <- fs::path("this", "is", "a", "test", valid_pair$dirname) + expect_equal( + parse_model_batch_dir_path(also_valid), + valid_pair$expected + ) + + ## should error if the terminal directory is not + ## what is to be parsed + not_valid <- fs::path(valid_pair$dirname, "test") + expect_error( + { + parse_model_batch_dir_path(not_valid) + }, + regex = "Invalid format for model batch directory name" + ) + } + + ## should error if entries cannot be parsed as what is expected + + for (invalid_entry in invalid_model_batch_dirs) { + expect_error( + { + parse_model_batch_dir_path(invalid_entry) + }, + regex = "Could not parse extracted disease and/or date values" + ) + } +}) + +test_that("parse_model_run_dir_path() works as expected.", { + for (valid_pair in valid_model_run_dirs) { + expect_equal( + parse_model_run_dir_path(valid_pair$dirpath), + valid_pair$expected + ) + + ## should work identically with a longer path + expect_equal( + parse_model_run_dir_path(fs::path( + "this", "is", "a", "test", + valid_pair$dirpath + )), + valid_pair$expected + ) + + ## should fail if there is additional terminal pathing + expect_error( + { + parse_model_run_dir_path(fs::path(valid_pair$dirpath, "test")) + }, + regex = "Invalid format for model batch directory name" + ) + } +}) + +test_that("get_all_model_batch_dirs() returns expected output.", { + withr::with_tempdir({ + ## create some directories + valid_covid <- c( + "covid-19_r_2024-02-01_f_2021-01-01_t_2024-01-31", + "covid-19_r" + ) + valid_flu <- c( + "influenza_r_2022-11-12_f_2022-11-01_t_2022_11_10", + "influenza_r" + ) + valid_dirs <- c(valid_flu, valid_covid) + + invalid_dirs <- c( + "this_is_not_valid", + "covid19_r", + "covid-19-r", + "influenza-r", + "influnza_r", + "covid-19", + "influenza" + ) + + invalid_files <- c( + "covid-19_r.txt", + "influenza_r.txt" + ) + fs::dir_create(c(valid_dirs, invalid_dirs)) + fs::file_create(invalid_files) + expected_all_files <- c( + valid_dirs, + invalid_dirs, + invalid_files + ) + + result_all <- fs::dir_ls(".") |> fs::path_file() + + result_valid <- get_all_model_batch_dirs( + ".", + c("COVID-19", "Influenza") + ) + + result_valid_alt <- get_all_model_batch_dirs( + ".", + c("Influenza", "COVID-19") + ) + + result_valid_covid <- get_all_model_batch_dirs( + ".", + "COVID-19" + ) + + result_valid_flu <- get_all_model_batch_dirs( + ".", + "Influenza" + ) + + expect_setequal(result_all, expected_all_files) + expect_setequal(result_valid, c(valid_flu, valid_covid)) + expect_setequal(result_valid_alt, c(valid_flu, valid_covid)) + expect_setequal(result_valid_covid, valid_covid) + expect_setequal(result_valid_flu, valid_flu) + }) +}) diff --git a/pipelines/collate_plots.py b/pipelines/collate_plots.py index 3fa1e4f0..39248f15 100644 --- a/pipelines/collate_plots.py +++ b/pipelines/collate_plots.py @@ -208,21 +208,28 @@ def collate_from_all_subdirs( def main( - model_base_dir: str | Path, + dir_of_forecast_dirs: str | Path, single_forecast_dir: str | Path, target_filenames: list[str], - disease: str, + disease: str = None, ) -> None: - if not ((model_base_dir is None) ^ (single_forecast_dir is None)): + if not ((dir_of_forecast_dirs is None) ^ (single_forecast_dir is None)): raise ValueError( "Must provide exactly one of " - "'--model-base-dir' (to process multiple " + "'dir_of_forecast_dirs' (to process multiple " "groups of forecasts) or " - "'--single-forecast-dir' " + "'single_forecast_dir' " "(to process a single set of forecasts" ) - elif model_base_dir is not None: - collate_from_all_subdirs(model_base_dir, disease, target_filenames) + elif dir_of_forecast_dirs is not None: + if disease is None: + raise ValueError( + "'disease' must not be None when collating plots " + "from multiple forecast subdirectories" + ) + collate_from_all_subdirs( + dir_of_forecast_dirs, disease, target_filenames + ) elif single_forecast_dir is not None: process_dir(single_forecast_dir, target_filenames) return None @@ -233,7 +240,7 @@ def main( ) parser.add_argument( - "--model-base-dir", + "--dir-of-forecast-dirs", type=Path, help=( "Base directory containing subdirectories that represent " @@ -251,7 +258,10 @@ def main( ) parser.add_argument( - "disease", type=str, help="Name of the disease for which to collate plots" + "--disease", + type=str, + help="Name of the disease for which to collate plots.", + default=None, ) parser.add_argument( diff --git a/pipelines/collate_score_tables.R b/pipelines/collate_score_tables.R index 70e0f9da..5c0370af 100644 --- a/pipelines/collate_score_tables.R +++ b/pipelines/collate_score_tables.R @@ -12,41 +12,12 @@ purrr::walk(script_packages, \(pkg) { }) -#' Get all the subdirectories within a parent directory -#' that match the pattern for a forecast run for a -#' given disease and optionally a given report date. -#' -#' @param parent_dir Directory in which to look for forecast subdirectories. -#' @param diseases Names of the diseases to match, as a vector of strings, -#' or a single disease as a string. -#' @return A vector of paths to the forecast subdirectories. -get_all_forecast_dirs <- function(dir_of_forecast_date_dirs, - diseases) { - # disease names are lowercase by convention - match_patterns <- str_c(tolower(diseases), "_r", collapse = "|") - - dirs <- tibble::tibble( - dir_path = fs::dir_ls( - dir_of_forecast_date_dirs, - type = "directory" - ) - ) |> - dplyr::filter(str_starts( - fs::path_file(dir_path), - match_patterns - )) |> - dplyr::pull(dir_path) - - return(dirs) -} - - process_loc_date_score_table <- function(model_run_dir) { table_path <- fs::path(model_run_dir, "score_table", ext = "rds" ) - parsed <- hewr::parse_model_run_dir(model_run_dir) + parsed <- hewr::parse_model_run_dir_path(model_run_dir) if (!(fs::file_exists(table_path))) { warning(glue::glue( @@ -137,7 +108,7 @@ collate_scores_for_date <- function(model_run_dir, collate_all_score_tables <- function(model_base_dir, disease, score_file_save_path = NULL) { - date_dirs_to_process <- get_all_forecast_dirs( + date_dirs_to_process <- hewr::get_all_model_batch_dirs( model_base_dir, diseases = disease ) diff --git a/pipelines/create_hubverse_table.R b/pipelines/create_hubverse_table.R index a706cd66..db724c39 100644 --- a/pipelines/create_hubverse_table.R +++ b/pipelines/create_hubverse_table.R @@ -1,144 +1,21 @@ -draws_to_quantiles <- function(forecast_dir, - report_date, - max_lookback_days) { - message(glue::glue("Processing {forecast_dir}...")) - draws_path <- fs::path(forecast_dir, - "forecast_samples", - ext = "parquet" - ) - location <- fs::path_file(forecast_dir) - - draws <- arrow::read_parquet(draws_path) |> - dplyr::filter(date >= lubridate::ymd(report_date) - - lubridate::days(max_lookback_days)) - - if (nrow(draws) < 1) { - return(NULL) - } - - epiweekly_disease_draws <- draws |> - dplyr::filter( - disease == "Disease" - ) |> - forecasttools::daily_to_epiweekly( - date_col = "date", - value_col = ".value", - id_cols = ".draw", - weekly_value_name = "epiweekly_disease", - strict = TRUE - ) - - epiweekly_total_draws <- draws |> - dplyr::filter(disease == "Other") |> - forecasttools::daily_to_epiweekly( - date_col = "date", - value_col = ".value", - id_cols = ".draw", - weekly_value_name = "epiweekly_total", - strict = TRUE - ) - - epiweekly_prop_draws <- dplyr::inner_join( - epiweekly_disease_draws, - epiweekly_total_draws, - by = c( - "epiweek", - "epiyear", - ".draw" - ) - ) |> - dplyr::mutate( - epiweekly_proportion = - epiweekly_disease / epiweekly_total - ) - - - epiweekly_quantiles <- epiweekly_prop_draws |> - forecasttools::trajectories_to_quantiles( - timepoint_cols = c("epiweek", "epiyear"), - value_col = "epiweekly_proportion" - ) |> - dplyr::mutate( - location = !!location - ) - - message(glue::glue("Done processing {forecast_dir}")) - return(epiweekly_quantiles) -} - -create_hubverse_table <- function(model_run_dir, - exclude = NULL) { - locations_to_process <- fs::dir_ls(model_run_dir, - type = "directory" - ) - - if (!is.null(exclude)) { - locations_to_process <- locations_to_process[ - !(fs::path_file(locations_to_process) %in% exclude) - ] - } - - report_date <- stringr::str_match( - model_run_dir, - "r_(([0-9]|-)+)_f" - )[2] - - report_epiweek <- lubridate::epiweek(report_date) - report_epiyear <- lubridate::epiyear(report_date) - report_epiweek_end <- forecasttools::epiweek_to_date( - report_epiweek, - report_epiyear, - day_of_week = 7 - ) - - disease <- dplyr::case_when( - stringr::str_starts( - fs::path_file(model_run_dir), - "covid-19" - ) ~ "covid", - stringr::str_starts( - fs::path_file(model_run_dir), - "influenza" - ) ~ "flu", - TRUE ~ NA - ) - - hubverse_table <- purrr::map( - locations_to_process, - \(x) { - draws_to_quantiles( - x, - report_date = report_date, - max_lookback_days = 8 - ) - } - ## ensures we get the full -1 horizon but do not - ## waste time quantilizing draws that will not be - ## included in the final table. - ) |> - dplyr::bind_rows() |> - forecasttools::get_hubverse_table( - report_epiweek_end, - target_name = - glue::glue("wk inc {disease} prop ed visits") - ) |> - dplyr::arrange( - target, - output_type, - location, - reference_date, - horizon, - output_type_id - ) - - return(hubverse_table) -} - - -main <- function(model_run_dir, +#!/usr/bin/env Rscript + + +#' Create a hubverse table from model output, using +#' utilities from `hewr`. +#' +#' @param model_batch_dir Model batch directory from which +#' to create a hubverse table +#' @param output_path path to save the table as a tsv +#' @param exclude Locations to exclude, as a vector of strings. +#' @return Nothing, saving the table as a side effect. +main <- function(model_batch_dir, output_path, exclude = NULL) { - create_hubverse_table(model_run_dir, exclude = exclude) |> + hewr::to_epiweekly_quantile_table( + model_batch_dir, + exclude = exclude + ) |> readr::write_tsv(output_path) } @@ -147,7 +24,7 @@ p <- argparser::arg_parser( "Create a hubverse table from location specific forecast draws." ) |> argparser::add_argument( - "model_run_dir", + "model_batch_dir", help = paste0( "Directory containing subdirectories that represent ", "individual forecast locations, with a directory name ", @@ -166,7 +43,7 @@ p <- argparser::arg_parser( argv <- argparser::parse_args(p) main( - argv$model_run_dir, + argv$model_batch_dir, argv$output_path, stringr::str_split_1(argv$exclude, " ") ) diff --git a/pipelines/utils.py b/pipelines/utils.py index 9a15a777..c18215cc 100644 --- a/pipelines/utils.py +++ b/pipelines/utils.py @@ -5,9 +5,12 @@ import datetime import os +import re from collections.abc import MutableSequence from pathlib import Path +disease_map_lower_ = {"influenza": "Influenza", "covid-19": "COVID-19"} + def ensure_listlike(x): """ @@ -36,6 +39,44 @@ def ensure_listlike(x): return x if isinstance(x, MutableSequence) else [x] +def parse_model_batch_dir_name(model_batch_dir_name): + """ + Parse the name of a model batch directory, + returning a dictionary of parsed values. + + Parameters + ---------- + model_batch_dir_name + Model batch directory name to parse. + + Returns + ------- + dict + A dictionary with keys 'disease', 'report_date', + 'first_training_date', and 'last_training_date'. + """ + regex_match = re.match(r"(.+)_r_(.+)_f_(.+)_t_(.+)", model_batch_dir_name) + if regex_match: + disease, report_date, first_training_date, last_training_date = ( + regex_match.groups() + ) + else: + raise ValueError( + "Invalid model batch directory name format: " + f"{model_batch_dir_name}" + ) + return dict( + disease=disease_map_lower_[disease], + report_date=datetime.strptime(report_date, "%Y-%m-%d").date(), + first_training_date=datetime.strptime( + first_training_date, "%Y-%m-%d" + ).date(), + last_training_date=datetime.strptime( + last_training_date, "%Y-%m-%d" + ).date(), + ) + + def get_all_forecast_dirs( parent_dir: Path | str, diseases: str | list[str], From 9634b33f5283f5709bd5c1d4d1fc5e96b92b1f35 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Tue, 19 Nov 2024 20:43:37 +0000 Subject: [PATCH 09/17] provide missing namespace --- pipelines/timeseries_forecasts.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelines/timeseries_forecasts.R b/pipelines/timeseries_forecasts.R index 2edfb378..9e915054 100644 --- a/pipelines/timeseries_forecasts.R +++ b/pipelines/timeseries_forecasts.R @@ -252,11 +252,11 @@ disease_name_nssp_map <- c( # replace this with functionality from hewr disease_name_raw <- model_run_dir |> - path_split() |> - pluck(1) |> + fs::path_split() |> + purrr::pluck(1) |> tail(3) |> head(1) |> - str_extract("^.+(?=_r_)") + stringr::str_extract("^.+(?=_r_)") disease_name_nssp <- unname(disease_name_nssp_map[disease_name_raw]) From eae81c4d741f9fdbbcef5c11d134cd256df980c8 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Tue, 19 Nov 2024 15:04:17 -0600 Subject: [PATCH 10/17] merge main into model_runs_2 (#146) * Issue 137: unify argument patterns (#138) * add check to all subprocess commands (#143) * Organize helper functions / utilities (#141) --------- Co-authored-by: Samuel Brand <48288458+SamuelBrand1@users.noreply.github.com> Co-authored-by: Dylan H. Morris From 877ff117b959dd4ccc1b7007d48fe39a2e57d9e2 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Tue, 19 Nov 2024 21:14:19 +0000 Subject: [PATCH 11/17] load required packages --- pipelines/timeseries_forecasts.R | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/pipelines/timeseries_forecasts.R b/pipelines/timeseries_forecasts.R index 9e915054..7d9319b2 100644 --- a/pipelines/timeseries_forecasts.R +++ b/pipelines/timeseries_forecasts.R @@ -11,11 +11,14 @@ script_packages <- c( "arrow", "glue", "epipredict", - "epiprocess" + "epiprocess", + "purrr", + "rlang", + "glue" ) ## load in packages without messages -purrr::walk(script_packages, \(pkg) { +walk(script_packages, \(pkg) { suppressPackageStartupMessages( library(pkg, character.only = TRUE) ) @@ -29,12 +32,12 @@ to_prop_forecast <- function(forecast_disease_count, other_count_col = "other_ed_visits", output_col = "prop_disease_ed_visits") { - result <- dplyr::inner_join( + result <- inner_join( forecast_disease_count, forecast_other_count, by = c(".draw", "date") ) |> - dplyr::mutate( + mutate( !!output_col := .data[[disease_count_col]] / (.data[[disease_count_col]] + @@ -68,9 +71,9 @@ fit_and_forecast <- function(data, n_samples = 2000, target_col = "ed_visits", output_col = "other_ed_visits") { - forecast_horizon <- glue::glue("{n_forecast_days} days") - target_sym <- rlang::sym(target_col) - output_sym <- rlang::sym(output_col) + forecast_horizon <- glue("{n_forecast_days} days") + target_sym <- sym(target_col) + output_sym <- sym(output_col) max_visits <- data |> pull(!!target_sym) |> @@ -200,7 +203,7 @@ main <- function(model_run_dir, n_forecast_days = 28, n_samples = 2000) { aheads = 1:n_forecast_days ) - to_save <- tibble::tribble( + to_save <- tribble( ~basename, ~value, "other_ed_visits_forecast", forecast_other, "baseline_ts_count_ed_visits_forecast", baseline_ts_count, @@ -208,13 +211,13 @@ main <- function(model_run_dir, n_forecast_days = 28, n_samples = 2000) { "baseline_cdc_count_ed_visits_forecast", baseline_cdc_count, "baseline_cdc_prop_ed_visits_forecast", baseline_cdc_prop ) |> - dplyr::mutate(save_path = path( + mutate(save_path = path( !!model_run_dir, basename, ext = "parquet" )) - purrr::walk2( + walk2( to_save$value, to_save$save_path, write_parquet @@ -252,11 +255,11 @@ disease_name_nssp_map <- c( # replace this with functionality from hewr disease_name_raw <- model_run_dir |> - fs::path_split() |> - purrr::pluck(1) |> + path_split() |> + pluck(1) |> tail(3) |> head(1) |> - stringr::str_extract("^.+(?=_r_)") + str_extract("^.+(?=_r_)") disease_name_nssp <- unname(disease_name_nssp_map[disease_name_raw]) From 980ad06936b0b127d022eda7a2199a1f7f87e744 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Tue, 19 Nov 2024 22:27:21 +0000 Subject: [PATCH 12/17] more namespace fixes --- pipelines/timeseries_forecasts.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/timeseries_forecasts.R b/pipelines/timeseries_forecasts.R index 7d9319b2..4cc59a72 100644 --- a/pipelines/timeseries_forecasts.R +++ b/pipelines/timeseries_forecasts.R @@ -18,7 +18,7 @@ script_packages <- c( ) ## load in packages without messages -walk(script_packages, \(pkg) { +purrr::walk(script_packages, \(pkg) { suppressPackageStartupMessages( library(pkg, character.only = TRUE) ) From 3f8139b920dbba75c425e6851be835b47d22f050 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Wed, 20 Nov 2024 18:51:53 +0000 Subject: [PATCH 13/17] use hewr functionality --- hewr/R/directory_utils.R | 5 ++++- hewr/tests/testthat/test_directory_utils.R | 2 +- pipelines/postprocess_state_forecast.R | 21 +++++---------------- pipelines/timeseries_forecasts.R | 13 +++---------- 4 files changed, 13 insertions(+), 28 deletions(-) diff --git a/hewr/R/directory_utils.R b/hewr/R/directory_utils.R index ba1e6549..d31b8fa0 100644 --- a/hewr/R/directory_utils.R +++ b/hewr/R/directory_utils.R @@ -79,8 +79,11 @@ parse_model_batch_dir_path <- function(model_batch_dir_path) { #' #' @export parse_model_run_dir_path <- function(model_run_dir_path) { - batch_dir <- fs::path_dir(model_run_dir_path) |> + batch_dir <- model_run_dir_path |> + fs::path_dir() |> + fs::path_dir() |> fs::path_file() + location <- fs::path_file(model_run_dir_path) return(c( diff --git a/hewr/tests/testthat/test_directory_utils.R b/hewr/tests/testthat/test_directory_utils.R index 17f30e37..5ae9c1fa 100644 --- a/hewr/tests/testthat/test_directory_utils.R +++ b/hewr/tests/testthat/test_directory_utils.R @@ -26,7 +26,7 @@ invalid_model_batch_dirs <- c( to_valid_run_dir <- function(valid_batch_dir_entry, location) { x <- valid_batch_dir_entry - x$dirpath <- fs::path(x$dirname, location) + x$dirpath <- fs::path(x$dirname, "model_runs", location) x$expected <- c( location = location, x$expected diff --git a/pipelines/postprocess_state_forecast.R b/pipelines/postprocess_state_forecast.R index fc2c4567..2e023322 100644 --- a/pipelines/postprocess_state_forecast.R +++ b/pipelines/postprocess_state_forecast.R @@ -13,7 +13,8 @@ script_packages <- c( "tidyr", "readr", "here", - "forcats" + "forcats", + "hewr" ) ## load in packages without messages @@ -255,12 +256,6 @@ postprocess_state_forecast <- function(model_run_dir) { theme_set(theme_minimal_grid()) -disease_name_formatter <- c("covid-19" = "COVID-19", "influenza" = "Flu") -disease_name_nssp_map <- c( - "covid-19" = "COVID-19", - "influenza" = "Influenza" -) - # Create a parser p <- arg_parser("Generate forecast figures") |> add_argument( @@ -271,16 +266,10 @@ p <- arg_parser("Generate forecast figures") |> argv <- parse_args(p) model_run_dir <- path(argv$model_run_dir) -# replace this with functionality from hewr -disease_name_raw <- model_run_dir |> - path_split() |> - pluck(1) |> - tail(3) |> - head(1) |> - str_extract("^.+(?=_r_)") +disease_name_nssp <- parse_model_run_dir_path(model_run_dir)$disease -disease_name_nssp <- unname(disease_name_nssp_map[disease_name_raw]) -disease_name_pretty <- unname(disease_name_formatter[disease_name_raw]) +disease_name_formatter <- c("COVID-19" = "COVID-19", "Influenza" = "Flu") +disease_name_pretty <- unname(disease_name_formatter[disease_name_nssp]) postprocess_state_forecast(model_run_dir) diff --git a/pipelines/timeseries_forecasts.R b/pipelines/timeseries_forecasts.R index 4cc59a72..8c8e1c0a 100644 --- a/pipelines/timeseries_forecasts.R +++ b/pipelines/timeseries_forecasts.R @@ -14,7 +14,8 @@ script_packages <- c( "epiprocess", "purrr", "rlang", - "glue" + "glue", + "hewr" ) ## load in packages without messages @@ -253,14 +254,6 @@ disease_name_nssp_map <- c( "influenza" = "Influenza" ) -# replace this with functionality from hewr -disease_name_raw <- model_run_dir |> - path_split() |> - pluck(1) |> - tail(3) |> - head(1) |> - str_extract("^.+(?=_r_)") - -disease_name_nssp <- unname(disease_name_nssp_map[disease_name_raw]) +disease_name_nssp <- parse_model_run_dir_path(model_run_dir)$disease main(model_run_dir, n_forecast_days, n_samples) From 26cf4aeca1bc715891c306579edbc5aee4c88675 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Wed, 20 Nov 2024 19:29:57 +0000 Subject: [PATCH 14/17] first collate plots changes --- pipelines/collate_plots.py | 50 ++++++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/pipelines/collate_plots.py b/pipelines/collate_plots.py index 39248f15..174927e5 100644 --- a/pipelines/collate_plots.py +++ b/pipelines/collate_plots.py @@ -39,6 +39,7 @@ def merge_pdfs_and_save( def merge_pdfs_from_subdirs( base_dir: str | Path, file_name: str, + save_dir: str | Path = None, output_file_name: str = None, subdirs_only: list[str] = None, subdir_pattern="*", @@ -59,6 +60,11 @@ def merge_pdfs_from_subdirs( Name of the files to merge. Must be an exact match. + save_dir + Directory in which to save the merged PDF. + If ``None``, use ``base_dir``. + Default ``None``. + output_file_name Name for the merged PDF file, which will be saved within ``base_dir``. If ``None``, @@ -84,6 +90,10 @@ def merge_pdfs_from_subdirs( ------- None """ + + if save_dir is None: + save_dir = base_dir + subdirs = [ f.name for f in Path(base_dir).glob(subdir_pattern) if f.is_dir() ] @@ -101,14 +111,15 @@ def merge_pdfs_from_subdirs( output_file_name = file_name if len(to_merge) > 0: - merge_pdfs_and_save(to_merge, Path(base_dir, output_file_name)) + merge_pdfs_and_save(to_merge, Path(save_dir, output_file_name)) return None def process_dir( - dir_path: Path | str, + base_dir: Path | str, target_filenames: str | list[str], + save_dir: Path | str = None, file_prefix: str = "", subdirs_only: list[str] = None, ) -> None: @@ -119,14 +130,17 @@ def process_dir( Parameters ---------- - dir_path - Path to the base directory, in which the merged - PDFs will be saved. + base_dir + Path to the base directory in which to look target_filenames One or more PDFs filenames to look for in the subdirectories and merge. + save_dir + Directory in which to save the merged PDFs. + If ``None``, use ``dir_path``. Default ``None``. + file_prefix Prefix to append to the names in `target_filenames` when naming the merged files. @@ -136,17 +150,24 @@ def process_dir( named subdirectories. If ``None``, look in all subdirectories of ``base_dir``. Default ``None``. """ + if save_dir is None: + save_dir = base_dir + for file_name in ensure_listlike(target_filenames): merge_pdfs_from_subdirs( - dir_path, + base_dir, file_name, + save_dir, output_file_name=file_prefix + file_name, subdirs_only=subdirs_only, ) def collate_from_all_subdirs( - model_base_dir: str | Path, disease: str, target_filenames: str | list[str] + model_base_dir: str | Path, + disease: str, + target_filenames: str | list[str], + save_dir: str | Path = None, ) -> None: """ Collate target plots for a given disease @@ -156,8 +177,7 @@ def collate_from_all_subdirs( ---------- model_base_dir Path to the base directory in whose subdirectories - the script will look for PDFs to merge and in which - the merged PDFs will be saved. + the script will look for PDFs to merge. disease Name of the target disease. Merged PDFs will be named @@ -167,10 +187,17 @@ def collate_from_all_subdirs( One or more PDFs filenames to look for in the subdirectories and merge. + save_dir + Directory in which to save the merged PDFs. + If ``None``, use ``model_base_dir``. Default ``None``. + Returns ------- None """ + if save_dir is None: + save_dir = model_base_dir + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -188,6 +215,7 @@ def collate_from_all_subdirs( process_dir( dir_path=Path(model_base_dir, f_dir), target_filenames=target_filenames, + save_dir=save_dir, ) logger.info("Done collating across locations by date.") @@ -197,11 +225,13 @@ def collate_from_all_subdirs( # for multiple diseases. logger.info("Collating plots from forecast date directories...") process_dir( - dir_path=model_base_dir, + base_dir=model_base_dir, target_filenames=target_filenames, + save_dir=save_dir, file_prefix=f"{disease}_", subdirs_only=forecast_dirs, ) + logger.info("Done collating plots from forecast date directories.") return None From 51e43d7193b4ae8901d20f010af7a5a2dfc6adae Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Wed, 20 Nov 2024 19:33:41 +0000 Subject: [PATCH 15/17] use parent dir for saving by default --- pipelines/collate_plots.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pipelines/collate_plots.py b/pipelines/collate_plots.py index 174927e5..1f1c25a0 100644 --- a/pipelines/collate_plots.py +++ b/pipelines/collate_plots.py @@ -62,7 +62,7 @@ def merge_pdfs_from_subdirs( save_dir Directory in which to save the merged PDF. - If ``None``, use ``base_dir``. + If ``None``, use the parent directory of ``base_dir``. Default ``None``. output_file_name @@ -92,7 +92,7 @@ def merge_pdfs_from_subdirs( """ if save_dir is None: - save_dir = base_dir + save_dir = Path(base_dir).parent subdirs = [ f.name for f in Path(base_dir).glob(subdir_pattern) if f.is_dir() @@ -139,7 +139,7 @@ def process_dir( save_dir Directory in which to save the merged PDFs. - If ``None``, use ``dir_path``. Default ``None``. + If ``None``, use the parent directory of ``base_dir``. Default ``None``. file_prefix Prefix to append to the names in `target_filenames` @@ -151,7 +151,7 @@ def process_dir( subdirectories of ``base_dir``. Default ``None``. """ if save_dir is None: - save_dir = base_dir + save_dir = Path(base_dir).parent for file_name in ensure_listlike(target_filenames): merge_pdfs_from_subdirs( @@ -189,14 +189,14 @@ def collate_from_all_subdirs( save_dir Directory in which to save the merged PDFs. - If ``None``, use ``model_base_dir``. Default ``None``. + If ``None``, use the parent directory of ``model_base_dir``. Default ``None``. Returns ------- None """ if save_dir is None: - save_dir = model_base_dir + save_dir = Path(model_base_dir).parent logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -213,7 +213,7 @@ def collate_from_all_subdirs( for f_dir in forecast_dirs: logger.info(f"Collating plots from {f_dir}") process_dir( - dir_path=Path(model_base_dir, f_dir), + base_dir=Path(model_base_dir, f_dir), target_filenames=target_filenames, save_dir=save_dir, ) From 2afe9c0608ea276f82651da1b5bc2c8d90c7731d Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Wed, 20 Nov 2024 19:49:07 +0000 Subject: [PATCH 16/17] put figures in figures dir --- pipelines/collate_plots.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pipelines/collate_plots.py b/pipelines/collate_plots.py index 1f1c25a0..8e4b39bf 100644 --- a/pipelines/collate_plots.py +++ b/pipelines/collate_plots.py @@ -62,7 +62,7 @@ def merge_pdfs_from_subdirs( save_dir Directory in which to save the merged PDF. - If ``None``, use the parent directory of ``base_dir``. + If ``None``, use a "figures" directory in the parent directory of ``base_dir``. Default ``None``. output_file_name @@ -92,7 +92,10 @@ def merge_pdfs_from_subdirs( """ if save_dir is None: - save_dir = Path(base_dir).parent + save_dir = Path(base_dir).parent / "figures" + + if not os.path.exists(save_dir): + os.makedirs(save_dir) subdirs = [ f.name for f in Path(base_dir).glob(subdir_pattern) if f.is_dir() @@ -139,7 +142,7 @@ def process_dir( save_dir Directory in which to save the merged PDFs. - If ``None``, use the parent directory of ``base_dir``. Default ``None``. + If ``None``, use a "figures" directory in the parent directory of ``base_dir``. Default ``None``. file_prefix Prefix to append to the names in `target_filenames` @@ -151,7 +154,7 @@ def process_dir( subdirectories of ``base_dir``. Default ``None``. """ if save_dir is None: - save_dir = Path(base_dir).parent + save_dir = Path(base_dir).parent / "figures" for file_name in ensure_listlike(target_filenames): merge_pdfs_from_subdirs( @@ -189,14 +192,14 @@ def collate_from_all_subdirs( save_dir Directory in which to save the merged PDFs. - If ``None``, use the parent directory of ``model_base_dir``. Default ``None``. + If ``None``, use a "figures" directory in the parent directory of ``model_base_dir``. Default ``None``. Returns ------- None """ if save_dir is None: - save_dir = Path(model_base_dir).parent + save_dir = Path(model_base_dir).parent / "figures" logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) From a23945bc70b60e45db72e255e41ae1fcf9493307 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Wed, 20 Nov 2024 19:54:21 +0000 Subject: [PATCH 17/17] update score tables collation --- pipelines/collate_score_tables.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/collate_score_tables.R b/pipelines/collate_score_tables.R index 5c0370af..9a6aaf3c 100644 --- a/pipelines/collate_score_tables.R +++ b/pipelines/collate_score_tables.R @@ -83,7 +83,7 @@ collate_scores_for_date <- function(model_run_dir, score_file_ext = "rds", save = FALSE) { message(glue::glue("Processing scores from {model_run_dir}...")) - locations_to_process <- fs::dir_ls(model_run_dir, + locations_to_process <- fs::dir_ls(model_run_dir, "model_runs", type = "directory" ) date_score_table <- purrr::map(