diff --git a/docker/main/ngen/Dockerfile b/docker/main/ngen/Dockerfile
index b100f6d40..6fa549f8e 100644
--- a/docker/main/ngen/Dockerfile
+++ b/docker/main/ngen/Dockerfile
@@ -133,6 +133,8 @@ RUN dnf update -y \
&& mkdir -p ${SSHDIR} \
# Create DMOD-specific directory structure \
&& mkdir -p /dmod && chown ${USER} /dmod \
+ && mkdir -p /dmod/local_volumes && chown ${USER} /dmod/local_volumes \
+ && mkdir -p /dmod/cluster_volumes && chown ${USER} /dmod/cluster_volumes \
&& mkdir -p /dmod/datasets && chown ${USER} /dmod/datasets \
#&& mkdir -p /dmod/shared_libs && chown ${USER} /dmod/shared_libs \
&& mkdir -p /dmod/lib && chown ${USER} /dmod/lib \
@@ -943,8 +945,29 @@ COPY --chown=${USER} --from=build_bmi_snow_17 /dmod/ /dmod/
COPY --chown=${USER} --from=build_customizations /dmod/ /dmod/
USER root
-# Update path and make sure dataset directory is there
-RUN echo "export PATH=${PATH}" >> /etc/profile \
+# TODO: (later) consider something like this in the future (at least optionally) when downloading the mc client below:
+# ARG MINIO_CLIENT_RELEASE="RELEASE.2024-07-08T20-59-24Z"
+# ...
+# https://dl.min.io/client/mc/release/linux-amd64/archive/mc.${MINIO_CLIENT_RELEASE}
+
+# Setup minio client; also update path and make sure dataset directory is there
+RUN OS_NAME=$(uname -s | tr '[:upper:]' '[:lower:]') \
+ && PLATFORM=$(uname -m | tr '[:upper:]' '[:lower:]') \
+ && case "${PLATFORM:?}" in \
+ aarch64) \
+ MINIO_HW_NAME=arm64 \
+ ;; \
+ x86_64) \
+ MINIO_HW_NAME=amd64 \
+ ;; \
+ *) \
+ MINIO_HW_NAME=${PLATFORM} \
+ ;; \
+ esac \
+ && curl -L -o /dmod/bin/mc https://dl.min.io/client/mc/release/${OS_NAME}-${MINIO_HW_NAME}/mc \
+ && chmod +x /dmod/bin/mc \
+ && mkdir /dmod/.mc \
+ && echo "export PATH=${PATH}" >> /etc/profile \
&& sed -i "s/PasswordAuthentication yes/#PasswordAuthentication yes/g" /etc/ssh/sshd_config \
&& sed -i "s/PermitRootLogin yes/PermitRootLogin no/g" /etc/ssh/sshd_config \
&& sed -i "s/#ClientAliveInterval.*/ClientAliveInterval 60/" /etc/ssh/sshd_config \
@@ -959,9 +982,9 @@ COPY --chown=${USER} --chmod=744 py_funcs.py /dmod/bin/py_funcs
ENV HYDRA_PROXY_RETRY_COUNT=5
-# Change permissions for entrypoint and make sure dataset volume mount parent directories exists
+# Change permissions for entrypoint and make sure data volume mount and dataset symlink parent directories exists
RUN chmod +x ${WORKDIR}/entrypoint.sh \
- && for d in ${DATASET_DIRECTORIES}; do mkdir -p /dmod/datasets/${d}; done \
+ && for p in datasets local_volumes cluster_volumes; do for d in ${DATASET_DIRECTORIES}; do mkdir -p /dmod/${p}/${d}; done; done \
&& pushd /dmod/bin \
# NOTE use of `ln -sf`. \
&& ( ( stat ngen-parallel && ln -sf ngen-parallel ngen ) || ( stat ngen-serial && ln -sf ngen-serial ngen ) ) \
diff --git a/docker/main/ngen/README.md b/docker/main/ngen/README.md
new file mode 100644
index 000000000..12ec8b27b
--- /dev/null
+++ b/docker/main/ngen/README.md
@@ -0,0 +1,100 @@
+# About
+
+TODO:More details about the image.
+
+TODO:Once implemented, details about available image variants.
+
+TODO:Once implemented, details about extending the image with additional modules.
+
+# Image `/dmod/` Directory
+
+## TL;DR - Directories for Configs
+
+Several important pieces details if manually constructing configurations to upload into DMOD datasets:
+* Compiled BMI module libraries will be in `/dmod/shared_libs/`
+* Data used during job execution (e.g., forcings, configs) will (or at least appear) under `/dmod/datasets/`
+ * Paths descend first by data type, then by dataset name
+ * E.g., `/dmod/datasets/config/vpu-01-bmi-config-dataset-01/NoahOWP_cat-9999.namelist`
+ * E.g., `/dmod/datasets/forcing/vpu-01-forcings-dataset-01/cat-1000.csv`
+
+## Directory Structure Details
+
+Several important directories exist in the worker images' file system under the `/dmod/` directory, which are important to the operation of job worker containers.
+
+## Static Directories
+
+Several important DMOD-specific directories in the worker images are static. They contain things either created or copied in during the image build. These are standard/general things, available in advance, that need to be in fixed, well-defined locations. Examples are the ngen executables, Python packages, and compiled BMI modules.
+
+### `/dmod/bin/`
+* Directory containing custom executables and scripts
+* Location for parallel and serial ngen executables, plus the `ngen` symlink that points to one of them (dependent on build config, but parallel by default)
+* Path appended to `PATH` in environment
+
+### `/dmod/bmi_module_data/`
+* Directory containing any necessary generic per-BMI-module data files
+ * `/dmod/bmi_module_data/noah_owp/parameters/`
+ * Nested directory containing parameters files for Noah-OWP-Modular
+ * `/dmod/bmi_module_data/lgar-c/data/data/`
+ * Nested directory for generic LGAR data files
+
+### `/dmod/shared_libs/`
+* Directories containing libraries built during intermediate steps of the image building process that will be needed for worker container execution
+* E.g., Compiled BMI module shared libraries, like `libcfebmi.so` for CFE
+* Path is appended to `LD_LIBRARY_PATH`
+
+### `/dmod/venv/`
+* Directory for Python virtual environment loaded by worker for execution
+
+## Dynamic Directories
+
+Others important DMOD-specific directories in the worker images are dynamic. There is a higher level baseline directory structure that is created by the image build, but the nested contents, which are what is most important to job execution, is put into place when the job worker container is created. Examples of this are configs and forcings.
+
+### `/dmod/datasets/`
+* This contains the paths from which jobs should read their necessary data, and which config files should reference
+* Contains subdirectories for different dataset types
+ * `config`, `forcing`, `hydrofabric`, `observation`, `output` (e.g., `/dmod/datasets/forcing/`)
+* Each subdirectory may contain further "subdirectories" (really symlinks) containing different data needed for the current job
+ * E.g., `/dmod/datasets/config/vpu-01-bmi-config-dataset-01/`
+ * Has data from `vpu-01-bmi-config-dataset-01` dataset
+* Data subdirectories are actually symlinks to an analogous mounted path for either a [cluster volume](#dmodcluster_volumes) or [local volume](#dmodlocal_volumes)
+ * If a dataset can be mounted as a cluster volume and used directly by the job without local copying, the symlink will be to an analogous cluster volume
+ * e.g., `/dmod/datasets/config/real-cfg-ds-01/ -> /dmod/cluster_volumes/config/real-cfg-ds-01/`
+ * If data from a dataset needs to be local or preprocessed in some way by the worker before use, it will be prepared in a local volume, and a symlink here will point to that
+ * e.g., `/dmod/datasets/config/vpu-01-bmi-config-dataset-01/ -> /dmod/local_volumes/config/vpu-01-bmi-config-dataset-01/`
+
+
+### `/dmod/cluster_volumes/`
+* First level subdirectories correspond to DMOD dataset types, as with `/dmod/datasets/`
+ * e.g., `/dmod/cluster_volumes/config/`
+* Second-level subdirectories are mounted Docker cluster volumes that are, in some way or another, synced across all physical nodes of the deployment
+ * e.g., `/dmod/cluster_volumes/config/vpu-01-bmi-config-dataset-01/`
+* Automatic synchronization at the DMOD deployment level
+ * All workers for a job see exactly the same set of mounted volumes here
+ * All workers on all physical nodes see the same contents in each mounted volume directory
+ * Writes done on any worker to a file under a volume subdirectory are seen (essentially) immediately by workers on **all** physical nodes
+* Common scenario (typical case at the time of this writing) for these are DMOD dataset volumes
+ * A dataset is directly mounted as Docker volume via some specialized storage driver (e.g., `s3fs`)
+ * The contents of the dataset can then be interacted with by the worker as if they were a regular file system (even if they aren't)
+ * The mount directory name matches the DMOD dataset name
+ * E.g., the `vpu-01-bmi-config-dataset-01` dataset would be at `/dmod/cluster_volumes/config/vpu-01-bmi-config-dataset-01/`
+
+*** TODO: set up to link or do pre-processing/extraction/etc. as needed on startup ***
+*** TODO: have indication that be that there exists a directory already for the dataset under `/dmod/datasets/`
+TODO: have this be for local Docker volumes that are just on individual hosts that we want to keep synced
+
+### `/dmod/local_volumes/`
+* First level subdirectories correspond to DMOD dataset types, as with `/dmod/datasets/`
+ * e.g., `/dmod/local_volumes/config/`
+* Second-level subdirectories are mounted Docker local volumes
+ * e.g., `/dmod/local_volumes/config/vpu-01-bmi-config-dataset-01/`
+* These local volumes are job-wide but host-specific
+ * All workers for a job see exactly the same set of mounted volumes here
+ * All workers on the same physical nodes are using the same local volume on that physical host
+ * Workers on different physical nodes are using different volumes
+ * This means some coordinated synchronization needs to be performed by a subset of worker
+* The use case is for any data that needs to be local on the container/host (generally for performance reasons) to be copied/extracted to a subdirectory here, typically from an analogous subdirectory under `/dmod/cluster_volumes/`
+ * This at least reduces copying somewhat by allowing workers on same node to share the same local volume
+
+TODO: make scheduler create these per dataset on the fly at job time and mount them in (and have something to clean them up, but maybe not right away)
+TODO: make one worker per physical host extract data when needed from archives in analogous cluster volumes
+
\ No newline at end of file
diff --git a/docker/main/ngen/ngen_cal_entrypoint.sh b/docker/main/ngen/ngen_cal_entrypoint.sh
index 2483f9855..d2b7d5cc9 100755
--- a/docker/main/ngen/ngen_cal_entrypoint.sh
+++ b/docker/main/ngen/ngen_cal_entrypoint.sh
@@ -39,6 +39,10 @@ while [ ${#} -gt 0 ]; do
declare -x CALIBRATION_CONFIG_BASENAME="${2:?}"
shift
;;
+ --primary-workers)
+ declare -x PRIMARY_WORKERS="${2:?}"
+ shift
+ ;;
esac
shift
done
@@ -48,9 +52,15 @@ declare -x JOB_OUTPUT_WRITE_DIR="/tmp/job_output"
# Get some universally applicable functions and constants
source ./funcs.sh
+init_script_mpi_vars
+
+if [ "$(whoami)" != "${MPI_USER:?MPI user not defined}" ]; then
+ # Run make_data_local Python functions to make necessary data local
+ # Called for every worker, but Python code will make sure only one worker per node makes a call that has effect
+ py_funcs make_data_local ${WORKER_INDEX:-0} ${PRIMARY_WORKERS:-0}
+fi
ngen_sanity_checks_and_derived_init
-init_script_mpi_vars
init_ngen_executable_paths
# Move to the output write directory
@@ -62,6 +72,7 @@ cd ${JOB_OUTPUT_WRITE_DIR}
#Needed for routing
if [ ! -e /dmod/datasets/linked_job_output ]; then
ln -s ${JOB_OUTPUT_WRITE_DIR} /dmod/datasets/linked_job_output
+ chown -R ${MPI_USER}:${MPI_USER} /dmod/datasets/linked_job_output
fi
start_calibration() {
@@ -106,25 +117,16 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then
if [ "$(whoami)" = "${MPI_USER:?MPI user not defined}" ]; then
# This will only have an effect when running with multiple MPI nodes, so its safe to have even in serial exec
trap close_remote_workers EXIT
- # Have "main" (potentially only) worker copy config files to output dataset for record keeping
- # TODO: (later) in ngen and ngen-cal entrypoints, consider adding controls for whether this is done or a simpler
- # TODO: 'cp' call, based on whether we write directly to output dataset dir or some other output write dir
- # Do a dry run first to sanity check directory and fail if needed before backgrounding process
- py_funcs tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?}
- # Then actually run the archive and copy function in the background
- py_funcs tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} &
- _CONFIG_COPY_PROC=$!
- # If there is partitioning, which implies multi-processing job ...
- if [ -n "${PARTITION_DATASET_DIR:-}" ]; then
- # Include partition config dataset too if appropriate
- cp -a ${PARTITION_DATASET_DIR}/. ${OUTPUT_DATASET_DIR:?}
- fi
# Run the same function to execute ngen_cal (it's config will handle whether MPI is used internally)
start_calibration
+ # Remember: all these things are done as the root user on worker 0
else
# Start SSHD on the main worker if have an MPI job
if [ -n "${PARTITION_DATASET_DIR:-}" ]; then
+ # Include partition config dataset too if appropriate, though for simplicity, just copy directly
+ cp -a ${PARTITION_DATASET_DIR}/. ${OUTPUT_DATASET_DIR:?}
+
echo "$(print_date) Starting SSH daemon on main worker"
/usr/sbin/sshd -D &
_SSH_D_PID="$!"
@@ -132,6 +134,20 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then
trap cleanup_sshuser_exit EXIT
fi
+ # Have "main" (potentially only) worker copy config files to output dataset for record keeping
+ # TODO: (later) in ngen and ngen-cal entrypoints, consider adding controls for whether this is done or a simpler
+ # TODO: 'cp' call, based on whether we write directly to output dataset dir or some other output write dir
+ # Do a dry run first to sanity check directory and fail if needed before backgrounding process
+ py_funcs tar_and_copy --dry-run --compress config_dataset.tgz ${CONFIG_DATASET_DIR:?Config dataset directory not defined} ${OUTPUT_DATASET_DIR:?}
+ _R_DRY=${?}
+ if [ ${_R_DRY} -ne 0 ]; then
+ echo "$(print_date) Job exec failed due to issue with copying configs to output (code ${_R_DRY})"
+ exit ${_R_DRY}
+ fi
+ # Then actually run the archive and copy function in the background
+ py_funcs tar_and_copy --compress config_dataset.tgz ${CONFIG_DATASET_DIR:?} ${OUTPUT_DATASET_DIR:?} &
+ _CONFIG_COPY_PROC=$!
+
# Make sure we run ngen/ngen-cal as our MPI_USER
echo "$(print_date) Running exec script as '${MPI_USER:?}'"
# Do this by just re-running this script with the same args, but as the other user
@@ -139,6 +155,19 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then
_EXEC_STRING="${0} ${@}"
su ${MPI_USER:?} --session-command "${_EXEC_STRING}"
#time su ${MPI_USER:?} --session-command "${_EXEC_STRING}"
+
+ # TODO: (later) in ngen and ngen-cal entrypoints, add controls for whether this is done base on whether we
+ # TODO: are writing directly to output dataset dir or some other output write dir; this will be
+ # TODO: important once netcdf output works
+ # Then wait at this point (if necessary) for our background config copy to avoid taxing things
+ echo "$(print_date) Waiting for compression and copying of configuration files to output dataset"
+ wait ${_CONFIG_COPY_PROC:-}
+ _R_CONFIG_COPY=${?}
+ if [ ${_R_CONFIG_COPY} -eq 0 ]; then
+ echo "$(print_date) Compression/copying of config data to output dataset complete"
+ else
+ echo "$(print_date) Copying of config data to output dataset exited with error code: ${_R_CONFIG_COPY}"
+ fi
fi
else
run_secondary_mpi_ssh_worker_node
diff --git a/docker/main/ngen/ngen_entrypoint.sh b/docker/main/ngen/ngen_entrypoint.sh
index 0278fd640..f6b67613a 100755
--- a/docker/main/ngen/ngen_entrypoint.sh
+++ b/docker/main/ngen/ngen_entrypoint.sh
@@ -35,6 +35,10 @@ while [ ${#} -gt 0 ]; do
declare -x WORKER_INDEX="${2:?}"
shift
;;
+ --primary-workers)
+ declare -x PRIMARY_WORKERS="${2:?}"
+ shift
+ ;;
esac
shift
done
@@ -44,9 +48,15 @@ declare -x JOB_OUTPUT_WRITE_DIR="/tmp/job_output"
# Get some universally applicable functions and constants
source /ngen/funcs.sh
+init_script_mpi_vars
+
+if [ "$(whoami)" != "${MPI_USER:?MPI user not defined}" ]; then
+ # Run make_data_local Python functions to make necessary data local
+ # Called for every worker, but Python code will make sure only one worker per node makes a call that has effect
+ py_funcs make_data_local ${WORKER_INDEX:-0} ${PRIMARY_WORKERS:-0}
+fi
ngen_sanity_checks_and_derived_init
-init_script_mpi_vars
init_ngen_executable_paths
# Move to the output write directory
@@ -58,59 +68,37 @@ cd ${JOB_OUTPUT_WRITE_DIR}
#Needed for routing
if [ ! -e /dmod/datasets/linked_job_output ]; then
ln -s ${JOB_OUTPUT_WRITE_DIR} /dmod/datasets/linked_job_output
+ chown -R ${MPI_USER}:${MPI_USER} /dmod/datasets/linked_job_output
fi
# We can allow worker index to not be supplied when executing serially
if [ "${WORKER_INDEX:-0}" = "0" ]; then
+ # For in the nested/recursive call to this script with the MPI_USER ...
if [ "$(whoami)" = "${MPI_USER:?MPI user not defined}" ]; then
# This will only have an effect when running with multiple MPI nodes, so its safe to have even in serial exec
trap close_remote_workers EXIT
- # Have "main" (potentially only) worker copy config files to output dataset for record keeping
- # TODO: (later) in ngen and ngen-cal entrypoints, consider adding controls for whether this is done or a simpler
- # TODO: 'cp' call, based on whether we write directly to output dataset dir or some other output write dir
- # Do a dry run first to sanity check directory and fail if needed before backgrounding process
- py_funcs tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?}
- # Then actually run the archive and copy function in the background
- py_funcs tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} &
- _CONFIG_COPY_PROC=$!
+
# If there is partitioning, which implies multi-processing job ...
if [ -n "${PARTITION_DATASET_DIR:-}" ]; then
- # Include partition config dataset too if appropriate, though for simplicity, just copy directly
- cp -a ${PARTITION_DATASET_DIR}/. ${OUTPUT_DATASET_DIR:?}
- # Then run execution
+ # Execute the MPI ngen run
exec_main_worker_ngen_run
# TODO: (later) in ngen and ngen-cal entrypoints, add controls for whether this is done base on whether we
# TODO: are writing directly to output dataset dir or some other output write dir; this will be
# TODO: important once netcdf output works
- # Then gather output from all worker hosts
+ # Once done, gather output from all worker hosts
py_funcs gather_output ${MPI_HOST_STRING:?} ${JOB_OUTPUT_WRITE_DIR:?}
- # Then wait at this point (if necessary) for our background config copy to avoid taxing things
- echo "$(print_date) Waiting for compression and copying of configuration files to output dataset"
- wait ${_CONFIG_COPY_PROC}
- echo "$(print_date) Compression/copying of config data to output dataset complete"
- echo "$(print_date) Copying results to output dataset"
- py_funcs move_job_output --job_id ${JOB_ID:?} ${JOB_OUTPUT_WRITE_DIR} to_directory ${OUTPUT_DATASET_DIR:?}
- echo "$(print_date) Results copied to output dataset"
# Otherwise, we just have a serial job ...
else
- # Execute it first
exec_serial_ngen_run
-
- # TODO: (later) in ngen and ngen-cal entrypoints, add controls for whether this is done base on whether we
- # TODO: are writing directly to output dataset dir or some other output write dir; this will be
- # TODO: important once netcdf output works
- echo "$(print_date) Waiting for compression and copying of configuration files to output dataset"
- wait ${_CONFIG_COPY_PROC}
- echo "$(print_date) Compression/copying of config data to output dataset complete"
-
- echo "$(print_date) Copying results to output dataset"
- py_funcs move_job_output --job_id ${JOB_ID:?} ${JOB_OUTPUT_WRITE_DIR} to_directory ${OUTPUT_DATASET_DIR:?}
- echo "$(print_date) Results copied to output dataset"
fi
+ # Remember: all these things are done as the root user on worker 0
else
# Start SSHD on the main worker if have an MPI job
if [ -n "${PARTITION_DATASET_DIR:-}" ]; then
+ # Include partition config dataset too if appropriate, though for simplicity, just copy directly
+ cp -a ${PARTITION_DATASET_DIR}/. ${OUTPUT_DATASET_DIR:?}
+
echo "$(print_date) Starting SSH daemon on main worker"
/usr/sbin/sshd -D &
_SSH_D_PID="$!"
@@ -118,6 +106,20 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then
trap cleanup_sshuser_exit EXIT
fi
+ # Have "main" (potentially only) worker copy config files to output dataset for record keeping
+ # TODO: (later) in ngen and ngen-cal entrypoints, consider adding controls for whether this is done or a simpler
+ # TODO: 'cp' call, based on whether we write directly to output dataset dir or some other output write dir
+ # Do a dry run first to sanity check directory and fail if needed before backgrounding process
+ py_funcs tar_and_copy --dry-run --compress config_dataset.tgz ${CONFIG_DATASET_DIR:?Config dataset directory not defined} ${OUTPUT_DATASET_DIR:?}
+ _R_DRY=${?}
+ if [ ${_R_DRY} -ne 0 ]; then
+ echo "$(print_date) Job exec failed due to issue with copying configs to output (code ${_R_DRY})"
+ exit ${_R_DRY}
+ fi
+ # Then actually run the archive and copy function in the background
+ py_funcs tar_and_copy --compress config_dataset.tgz ${CONFIG_DATASET_DIR:?} ${OUTPUT_DATASET_DIR:?} &
+ _CONFIG_COPY_PROC=$!
+
# Make sure we run the model as our MPI_USER
echo "$(print_date) Running exec script as '${MPI_USER:?}'"
# Do this by just re-running this script with the same args, but as the other user
@@ -125,6 +127,28 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then
_EXEC_STRING="${0} ${@}"
su ${MPI_USER:?} --session-command "${_EXEC_STRING}"
#time su ${MPI_USER:?} --session-command "${_EXEC_STRING}"
+
+ # TODO: (later) in ngen and ngen-cal entrypoints, add controls for whether this is done base on whether we
+ # TODO: are writing directly to output dataset dir or some other output write dir; this will be
+ # TODO: important once netcdf output works
+ # Then wait at this point (if necessary) for our background config copy to avoid taxing things
+ echo "$(print_date) Waiting for compression and copying of configuration files to output dataset"
+ wait ${_CONFIG_COPY_PROC:-}
+ _R_CONFIG_COPY=${?}
+ if [ ${_R_CONFIG_COPY} -eq 0 ]; then
+ echo "$(print_date) Compression/copying of config data to output dataset complete"
+ else
+ echo "$(print_date) Copying of config data to output dataset exited with error code: ${_R_CONFIG_COPY}"
+ fi
+
+ echo "$(print_date) Copying results to output dataset"
+ py_funcs move_job_output --job_id ${JOB_ID:?} ${JOB_OUTPUT_WRITE_DIR} to_directory ${OUTPUT_DATASET_DIR:?}
+ _R_RESULT_COPY=${?}
+ if [ ${_R_RESULT_COPY} -eq 0 ]; then
+ echo "$(print_date) Results copied to output dataset"
+ else
+ echo "$(print_date) Error copying to output dataset directory '${OUTPUT_DATASET_DIR}' (error code: ${_R_RESULT_COPY})"
+ fi
fi
else
run_secondary_mpi_ssh_worker_node
diff --git a/docker/main/ngen/py_funcs.py b/docker/main/ngen/py_funcs.py
index 676d7fbc0..4b7547712 100644
--- a/docker/main/ngen/py_funcs.py
+++ b/docker/main/ngen/py_funcs.py
@@ -1,16 +1,26 @@
#!/usr/bin/env python3
import argparse
+import json
import logging
import os
import shutil
import tarfile
+import concurrent.futures
+import subprocess
from datetime import datetime
from enum import Enum
from pathlib import Path
from subprocess import Popen
-from typing import Dict, List, Literal, Optional
+from typing import Dict, List, Literal, Optional, Set, Tuple
+
+
+MINIO_ALIAS_NAME = "minio"
+
+
+def get_dmod_date_str_pattern() -> str:
+ return '%Y-%m-%d,%H:%M:%S'
class ArchiveStrategy(Enum):
@@ -56,6 +66,15 @@ def _subparse_move_to_directory(parent_subparser_container):
sub_cmd_parser.add_argument("dest_dir", type=Path, help="Destination directory to which to move the output")
+def _parse_for_make_data_local(parent_subparsers_container):
+ # A parser for the 'tar_and_copy' param itself, underneath the parent 'command' subparsers container
+ desc = "If a primary worker, copy/extract to make dataset data locally available on physical node"
+ helper_cmd_parser = parent_subparsers_container.add_parser('make_data_local', description=desc)
+ helper_cmd_parser.add_argument('worker_index', type=int, help='The index of this particular worker.')
+ helper_cmd_parser.add_argument('primary_workers', type=lambda s: {int(i) for i in s.split(',')},
+ help='Comma-delimited string of primary worker indices.')
+
+
def _parse_for_move_job_output(parent_subparsers_container):
# A parser for the 'tar_and_copy' param itself, underneath the parent 'command' subparsers container
desc = "Move output data files produced by a job to another location, typically to put them into a DMOD dataset."
@@ -92,10 +111,96 @@ def _parse_args() -> argparse.Namespace:
_parse_for_tar_and_copy(parent_subparsers_container=subparsers)
_parse_for_gather_output(parent_subparsers_container=subparsers)
_parse_for_move_job_output(parent_subparsers_container=subparsers)
+ _parse_for_make_data_local(parent_subparsers_container=subparsers)
return parser.parse_args()
+def _get_serial_dataset_dict(serialized_ds_file: Path) -> dict:
+ with serialized_ds_file.open() as s_file:
+ return json.loads(s_file.read())
+
+
+def _make_dataset_dir_local(local_data_dir: Path, do_optimized_object_store_copy: bool):
+ """
+ Make the data in corresponding remotely-backed dataset directory local by placing in a local directory.
+
+ Make a local, optimized copy of data from a dataset, where the data is also locally accessible/mounted but actually
+ stored elsewhere, making it less optimal for use by the worker.
+
+ Function examines the serialized dataset file of the source directory and, if already present (indicating this dest
+ directory has been set up before), the analogous serialized dataset file in the destination directory. First, if
+ there is `dest_dir` version of this file, and if it has the same ``last_updated`` value, the function considers the
+ `dest_dir` contents to already be in sync with the `src_dir` simply returns. Second, it examines whether archiving
+ was used for the entire dataset, and then either extracts or copies data to the local volume as appropriate.
+
+ Note that the function does alter the name of the serialized dataset file on the `dest_dir` side, primarily as an
+ indication that this is meant as a local copy of data, but not a full-fledge DMOD dataset. It also allows for a
+ final deliberate step of renaming (or copying with a different name) this file, which ensures the checked
+ ``last_updated`` value on the `dest_dir` side will have not been updated before a successful sync of the actual data
+ was completed.
+
+ Parameters
+ ----------
+ local_data_dir
+ Storage directory, locally available on this worker's host node, in which to copy/extract data.
+ do_optimized_object_store_copy
+ Whether to do an optimized copy for object store dataset data using the MinIO client (via a subprocess call).
+ """
+ # TODO: (later) eventually source several details of this this from other part of the code
+
+ dataset_vol_dir = get_cluster_volumes_root_directory().joinpath(local_data_dir.parent.name).joinpath(local_data_dir.name)
+
+ local_serial_file = local_data_dir.joinpath(".ds_serial_state.json")
+ dataset_serial_file = dataset_vol_dir.joinpath(f"{dataset_vol_dir.name}_serialized.json")
+
+ # Both should exist
+ if not dataset_vol_dir.is_dir():
+ raise RuntimeError(f"Can't make data local from dataset mount path '{dataset_vol_dir!s}': not a directory")
+ elif not local_data_dir.is_dir():
+ raise RuntimeError(f"Can't make data from '{dataset_vol_dir!s}' local: '{local_data_dir!s}' is not a directory")
+ # Also, dataset dir should not be empty
+ elif len([f for f in dataset_vol_dir.glob("*")]) == 0:
+ raise RuntimeError(f"Can't make data local from '{dataset_vol_dir!s}' local because it is empty")
+
+ serial_ds_dict = _get_serial_dataset_dict(dataset_serial_file)
+
+ # If dest_dir is not brand new and has something in it, check to make sure it isn't already as it needs to be
+ if local_serial_file.exists():
+ prev_ds_dict = _get_serial_dataset_dict(local_serial_file)
+ current_last_updated = datetime.strptime(serial_ds_dict["last_updated"], get_dmod_date_str_pattern())
+ prev_last_updated = datetime.strptime(prev_ds_dict["last_updated"], get_dmod_date_str_pattern())
+ if prev_last_updated == current_last_updated:
+ logging.info(f"'{local_data_dir!s}' already shows most recent 'last_updated'; skipping redundant copy")
+ return
+
+ # Determine if need to extract
+ if serial_ds_dict.get("data_archiving", False):
+ # Identify and extract archive
+ src_archive_file = [f for f in dataset_vol_dir.glob(f"{dataset_vol_dir.name}_archived*")][0]
+ archive_file = local_data_dir.joinpath(src_archive_file.name)
+ shutil.copy2(src_archive_file, archive_file)
+ shutil.unpack_archive(archive_file, local_data_dir)
+ archive_file.unlink(missing_ok=True)
+ # Also manually copy serialized state file (do last)
+ shutil.copy2(dataset_serial_file, local_serial_file)
+ # Need to optimize by using minio client directly here when dealing with OBJECT_STORE dataset, or will take 10x time
+ # TODO: (later) this is a bit of a hack, though a necessary one; find a way to integrate more elegantly
+ elif do_optimized_object_store_copy and serial_ds_dict["type"] == "OBJECT_STORE":
+ alias_src_path = f"{MINIO_ALIAS_NAME}/{local_data_dir.name}/"
+ logging.info(f"Copying data from '{alias_src_path}' to '{local_data_dir}'")
+ subprocess.run(["mc", "--config-dir", "/dmod/.mc", "cp", "-r", alias_src_path, f"{local_data_dir}/."],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL)
+ logging.info(f"Local copying from '{alias_src_path}' complete")
+ else:
+ # Otherwise copy contents
+ shutil.copy2(dataset_vol_dir, local_data_dir)
+ # Rename the copied serialized state file in the copy as needed
+ # But do this last to confirm directory contents are never more up-to-date with last_updated than expected
+ local_data_dir.joinpath(dataset_serial_file.name).rename(local_serial_file)
+
+
def _move_to_directory(source_dir: Path, dest_dir: Path, archive_name: Optional[str] = None):
"""
Move source data files from their initial directory to a different directory, potentially combining into an archive.
@@ -116,14 +221,22 @@ def _move_to_directory(source_dir: Path, dest_dir: Path, archive_name: Optional[
raise ValueError(f"{get_date_str()} Can't move job output to non-directory path {dest_dir!s}!")
if archive_name:
- logging.info("Archiving output files to output dataset")
+ logging.info(f"Archiving output files to output dataset directory '{dest_dir!s}'")
tar_and_copy(source=source_dir, dest=dest_dir, archive_name=archive_name)
else:
- logging.info("Moving output file(s) to output dataset")
+ logging.info("Moving output file(s) to output dataset directory '{dest_dir!s}'")
for p in source_dir.glob("*"):
shutil.move(p, dest_dir)
+def _parse_docker_secret(secret_name: str) -> str:
+ return Path("/run/secrets", secret_name).read_text().strip()
+
+
+def _parse_object_store_secrets() -> Tuple[str, str]:
+ return _parse_docker_secret('object_store_exec_user_name'), _parse_docker_secret('object_store_exec_user_passwd')
+
+
def gather_output(mpi_host_names: List[str], output_write_dir: Path):
"""
Using subprocesses, gather output from remote MPI hosts and collect in the analogous directory on this host.
@@ -151,6 +264,187 @@ def gather_output(mpi_host_names: List[str], output_write_dir: Path):
f"{error_in_bytes.decode()}")
+def get_data_exec_root_directory() -> Path:
+ """
+ Get the root directory path to use for reading and writing dataset data during job execution.
+
+ Returns
+ -------
+ Path
+ The root directory path to use for reading and writing dataset data during job execution.
+ """
+ return Path("/dmod/datasets")
+
+
+def get_cluster_volumes_root_directory() -> Path:
+ """
+ Get the root directory for cluster volumes (i.e., backed by dataset directly, synced cluster-wide) on this worker.
+
+ Returns
+ -------
+ Path
+ The root directory for cluster volumes on this worker.
+ """
+ return Path("/dmod/cluster_volumes")
+
+
+def get_expected_data_category_subdirs() -> Set[str]:
+ """
+ Get names of expected subdirectories for dataset categories underneath directories like local or cluster volumes.
+
+ Returns
+ -------
+ Set[str]
+ Names of expected subdirectories for dataset categories underneath directories like local or cluster volumes.
+ """
+ return {"config", "forcing", "hydrofabric", "observation", "output"}
+
+
+def get_local_volumes_root_directory() -> Path:
+ """
+ Get the root directory for local volumes (i.e., local to physical node, share by all node's workers) on this worker.
+
+ Returns
+ -------
+ Path
+ The root directory for local volumes on this worker.
+ """
+ return Path("/dmod/local_volumes")
+
+
+def link_to_data_exec_root(exceptions: Optional[List[Path]] = None):
+ """
+ Create symlinks into the data exec root for any dataset subdirectories in, e.g., cluster or local data mounts.
+
+ Function iterates through data mount roots for data (e.g., local volumes, cluster volumes) according to priority of
+ their use (i.e., local volume data should be used before an analogous cluster volume copy). If nothing for that
+ dataset category and name exists under the data exec root (from ::function:`get_data_exec_root_directory`), then
+ a symlink is created.
+
+ Exceptions to the regular prioritization can be provided. By default (or whenever ``execptions`` is ``None``),
+ all ``output`` category/directory datasets will have their symlinks backed by cluster volumes over local volumes. To
+ avoid any such exceptions and strictly follow the general priority rules, an empty list can be explicitly passed.
+
+ Parameters
+ ----------
+ exceptions
+ An optional list of exceptions to the general setup rules to create a symlink in the data exec root before
+ anything else.
+
+ """
+ if exceptions is None:
+ exceptions = [d for d in get_cluster_volumes_root_directory().joinpath('output').glob('*') if d.is_dir()]
+
+ for d in exceptions:
+ data_exec_analog = get_data_exec_root_directory().joinpath(d.parent.name).joinpath(d.name)
+ if data_exec_analog.exists():
+ if data_exec_analog.is_symlink():
+ logging.warning(f"Overwriting previous symlink at '{data_exec_analog!s}' pointing to '{data_exec_analog.readlink()!s}")
+ else:
+ logging.warning(f"Overwriting previous contents at '{data_exec_analog}' with new symlink")
+ else:
+ logging.info(f"Creating dataset symlink with source '{d!s}'")
+ os.symlink(d, data_exec_analog)
+ logging.info(f"Symlink created at dest '{data_exec_analog!s}'")
+
+ # Note that order here is important; prioritize local data if it is there
+ data_symlink_sources = [get_local_volumes_root_directory(), get_cluster_volumes_root_directory()]
+ for dir in data_symlink_sources:
+ # At this level, order isn't as important
+ for category_subdir in get_expected_data_category_subdirs():
+ for dataset_dir in [d for d in dir.joinpath(category_subdir).glob('*') if d.is_dir()]:
+ data_exec_analog = get_data_exec_root_directory().joinpath(category_subdir).joinpath(dataset_dir.name)
+ if not data_exec_analog.exists():
+ logging.info(f"Creating dataset symlink with source '{dataset_dir!s}'")
+ os.symlink(dataset_dir, data_exec_analog)
+ logging.info(f"Symlink created at dest '{data_exec_analog!s}'")
+
+
+def make_data_local(worker_index: int, primary_workers: Set[int], **kwargs):
+ """
+ Make data local for each local volume mount that exists, but only if this worker is a primary.
+
+ Copy or extract data from mounted volumes/directories directly backed by DMOD datasets (i.e., "cluster volumes") to
+ corresponding directories local to the physical node (i.e. "local volumes"), for any such local directories found to
+ exist. An important distinction is that a local volume is local to the physical node and not the worker itself, and
+ thus is shared by all workers on that node. As such, return immediately without performing any actions if this
+ worker is not considered a "primary" worker, so that only one worker per node manipulates data.
+
+ Function (for a primary worker) iterates through the local volume subdirectory paths to see if any local volumes
+ were set up when the worker was created. For any that are found, the function ensures data from the corresponding,
+ dataset-backed cluster volume directory is replicated in the local volume directory.
+
+ Parameters
+ ----------
+ worker_index
+ This worker's index.
+ primary_workers
+ Indices of designated primary workers
+ kwargs
+ Other ignored keyword args.
+
+ See Also
+ --------
+ _make_dataset_dir_local
+ """
+
+ # Every work does need to do this, though
+ link_to_data_exec_root()
+
+ if worker_index not in primary_workers:
+ return
+
+ cluster_vol_dir = get_cluster_volumes_root_directory()
+ local_vol_dir = get_local_volumes_root_directory()
+ expected_subdirs = get_expected_data_category_subdirs()
+
+ if not cluster_vol_dir.exists():
+ raise RuntimeError(f"Can't make data local: cluster volume root '{cluster_vol_dir!s}' does not exist")
+ if not cluster_vol_dir.is_dir():
+ raise RuntimeError(f"Can't make data local: cluster volume root '{cluster_vol_dir!s}' is not a directory")
+ if not local_vol_dir.exists():
+ raise RuntimeError(f"Can't make data local: local volume root '{local_vol_dir!s}' does not exist")
+ if not local_vol_dir.is_dir():
+ raise RuntimeError(f"Can't make data local: local volume root '{local_vol_dir!s}' is not a directory")
+
+ try:
+ obj_store_access_key, obj_store_secret_key = _parse_object_store_secrets()
+ logging.info(f"Executing test run of minio client to see if alias '{MINIO_ALIAS_NAME}' already exists")
+ mc_ls_result = subprocess.run(["mc", "--config-dir", "/dmod/.mc", "alias", "ls", MINIO_ALIAS_NAME])
+ logging.debug(f"Return code from alias check was {mc_ls_result.returncode!s}")
+ if mc_ls_result.returncode != 0:
+ logging.info(f"Creating new minio alias '{MINIO_ALIAS_NAME}'")
+ # TODO: (later) need to set value for obj_store_url better than just hardcoding it
+ obj_store_url = "http://minio-proxy:9000"
+ subprocess.run(["mc", "--config-dir", "/dmod/.mc", "alias", "set", MINIO_ALIAS_NAME, obj_store_url,
+ obj_store_access_key, obj_store_secret_key])
+ logging.info(f"Now rechecking minio client test for '{MINIO_ALIAS_NAME}' alias")
+ mc_ls_result_2 = subprocess.run(["mc", "--config-dir", "/dmod/.mc", "alias", "ls", MINIO_ALIAS_NAME])
+ if mc_ls_result_2.returncode != 0:
+ raise RuntimeError(f"Could not successfully create minio alias '{MINIO_ALIAS_NAME}'")
+ do_optimized_object_store_copy = True
+ except RuntimeError as e:
+ raise e
+ except Exception as e:
+ logging.warning(f"Unable to parse secrets for optimized MinIO local data copying: {e!s}")
+ do_optimized_object_store_copy = False
+
+ # Use some multi-threading here since this is IO heavy
+ logging.info(f"Performing local data copying using multiple threads")
+ futures = set()
+ with concurrent.futures.ThreadPoolExecutor(max_workers=5) as pool:
+ for type_dir in (td for td in local_vol_dir.glob("*") if td.is_dir()):
+ if not cluster_vol_dir.joinpath(type_dir.name).is_dir():
+ raise RuntimeError(f"Directory '{type_dir!s}' does not have analog in '{cluster_vol_dir!s}'")
+ if type_dir.name not in expected_subdirs:
+ logging.warning(f"Found unexpected (but matching) local volume data type subdirectory {type_dir.name}")
+ for local_ds_dir in (d for d in type_dir.glob("*") if d.is_dir()):
+ futures.add(pool.submit(_make_dataset_dir_local, local_ds_dir, do_optimized_object_store_copy))
+ for future in futures:
+ future.result()
+ logging.info(f"Local data copying complete")
+
+
def get_date_str() -> str:
"""
Get the current date and time as a string with format ``%Y-%m-%d,%H:%M:%S``
@@ -159,7 +453,7 @@ def get_date_str() -> str:
-------
The current date and time as a string.
"""
- return datetime.now().strftime('%Y-%m-%d,%H:%M:%S')
+ return datetime.now().strftime(get_dmod_date_str_pattern())
def move_job_output(output_directory: Path, move_action: str, archiving: ArchiveStrategy = ArchiveStrategy.DYNAMIC,
@@ -214,7 +508,9 @@ def move_job_output(output_directory: Path, move_action: str, archiving: Archive
archive_name = None
if move_action == "to_directory":
- _move_to_directory(source_dir=output_directory, dest_dir=kwargs["dest_dir"], archive_name=archive_name)
+ dest_dir = kwargs["dest_dir"]
+ logging.info(f"Moving output from '{output_directory!s}' to '{dest_dir!s}'")
+ _move_to_directory(source_dir=output_directory, dest_dir=dest_dir, archive_name=archive_name)
else:
raise RuntimeError(f"{get_date_str()} Invalid CLI move action {move_action}")
@@ -246,7 +542,7 @@ def process_mpi_hosts_string(hosts_string: str, hosts_sep: str = ",", host_detai
return results
-def tar_and_copy(source: Path, dest: Path, archive_name: str, do_dry_run: bool = False, do_compress: bool = False):
+def tar_and_copy(source: Path, dest: Path, archive_name: str, do_dry_run: bool = False, do_compress: bool = False, **kwargs):
"""
Make a tar archive from the contents of a directory, and place this in a specified destination.
@@ -262,6 +558,8 @@ def tar_and_copy(source: Path, dest: Path, archive_name: str, do_dry_run: bool =
Whether to only perform a dry run to check paths, with no archiving/moving/copying.
do_compress
Whether to compress the created archive with gzip compression.
+ kwargs
+ Other unused keyword args.
Raises
-------
@@ -294,12 +592,17 @@ def tar_and_copy(source: Path, dest: Path, archive_name: str, do_dry_run: bool =
return
tar_mode_args = "w:gz" if do_compress else "w"
+ logging.info(f"Creating archive file '{archive_create_path!s}'")
with tarfile.open(archive_create_path, tar_mode_args) as tar:
for p in source.glob("*"):
+ logging.debug(f"Adding '{p!s}' to archive")
tar.add(p, arcname=p.name)
if archive_create_path != final_archive_path:
+ logging.info(f"Moving archive to final location at '{final_archive_path!s}'")
shutil.move(archive_create_path, final_archive_path)
+ else:
+ logging.info(f"Archive creation complete and at final location")
def main():
@@ -314,6 +617,8 @@ def main():
gather_output(mpi_host_names=[h for h in mpi_host_to_nproc_map], output_write_dir=args.output_write_dir)
elif args.command == 'move_job_output':
move_job_output(**(vars(args)))
+ elif args.command == 'make_data_local':
+ make_data_local(**(vars(args)))
else:
raise RuntimeError(f"Command arg '{args.command}' doesn't match a command supported by module's main function")
diff --git a/python/lib/core/dmod/core/_version.py b/python/lib/core/dmod/core/_version.py
index 2f15b8cd3..e45337122 100644
--- a/python/lib/core/dmod/core/_version.py
+++ b/python/lib/core/dmod/core/_version.py
@@ -1 +1 @@
-__version__ = '0.20.0'
+__version__ = '0.21.0'
diff --git a/python/lib/core/dmod/core/meta_data.py b/python/lib/core/dmod/core/meta_data.py
index d67552d26..16a9151aa 100644
--- a/python/lib/core/dmod/core/meta_data.py
+++ b/python/lib/core/dmod/core/meta_data.py
@@ -1406,8 +1406,14 @@ class DataRequirement(Serializable):
"""
category: DataCategory
domain: DataDomain
+ # TODO: (later/future) define another type ("FulfillmentDetails" maybe) tracking all three of these; make that
+ # entire object optional here, but the attributes within it non-optional (though confirm this doesn't break things)
fulfilled_access_at: Optional[str] = Field(description="The location at which the fulfilling dataset for this requirement is accessible, if the dataset known.")
fulfilled_by: Optional[str] = Field(description="The name of the dataset that will fulfill this, if it is known.")
+ needs_data_local: Optional[bool] = Field(None, description="Whether this requirement will be fulfilled in a way "
+ "that requires the data to be copied or extracted "
+ "locally (or to a local volume) for the worker to use "
+ "it.")
is_input: bool = Field(description="Whether this represents required input data, as opposed to a requirement for storing output data.")
size: Optional[int]
diff --git a/python/lib/scheduler/dmod/scheduler/scheduler.py b/python/lib/scheduler/dmod/scheduler/scheduler.py
index a1a2b2ed2..f0301dbea 100644
--- a/python/lib/scheduler/dmod/scheduler/scheduler.py
+++ b/python/lib/scheduler/dmod/scheduler/scheduler.py
@@ -364,7 +364,7 @@ def _ds_names_helper(cls, job: 'Job', worker_index: int, category: DataCategory,
return list(dataset_names)
# TODO (later): once we get to dynamic/custom images (i.e., for arbitrary BMI modules), make sure this still works
- def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]:
+ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int, primary_workers: Dict[str, int]) -> List[str]:
"""
Create the Docker "CMD" arguments list to be used to start all services that will perform this job.
@@ -379,6 +379,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]:
The job to have worker Docker services started, with those services needing "CMD" arguments generated.
worker_index : int
The particular worker service index in question, which will have a specific set of data requirements.
+ primary_workers : Dict[str, int]
+ Mapping of Docker node hostname to the primary job worker index on that node, for all nodes used by the job.
Returns
-------
@@ -411,7 +413,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]:
}
if isinstance(job.model_request, AbstractNgenRequest):
- docker_cmd_arg_map.update(self._generate_nextgen_job_docker_cmd_args(job, worker_index))
+ docker_cmd_arg_map.update(self._generate_nextgen_job_docker_cmd_args(job=job, worker_index=worker_index,
+ primary_workers=primary_workers))
# Finally, convert the args map to a list, with each "flag"/key immediately preceding its value
args_as_list = []
@@ -421,7 +424,8 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]:
return args_as_list
- def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) -> Dict[str, str]:
+ def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int,
+ primary_workers: Dict[str, int]) -> Dict[str, str]:
"""
Prepare the specific Docker CMD arg applicable to Nextgen-based jobs
@@ -436,6 +440,8 @@ def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) -
The job to have worker Docker services started, with those services needing "CMD" arguments generated.
worker_index : int
The particular worker service index in question, which will have a specific set of data requirements.
+ primary_workers : Dict[str, int]
+ Mapping of Docker node hostname to the primary job worker index on that node, for all nodes used by the job.
Returns
-------
@@ -456,6 +462,7 @@ def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) -
"--hydrofabric-dataset": self._ds_names_helper(job, worker_index, DataCategory.HYDROFABRIC, max_count=1)[0],
"--config-dataset": self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1,
data_format=DataFormat.NGEN_JOB_COMPOSITE_CONFIG)[0],
+ "--primary-workers": ",".join(str(v) for _, v in primary_workers.items())
}
if job.cpu_count > 1:
@@ -714,14 +721,43 @@ def start_job(self, job: 'Job') -> Tuple[bool, tuple]:
secrets = [self.get_secret_reference(secret_name) for secret_name in
['object_store_exec_user_name', 'object_store_exec_user_passwd']]
+ # Decide (somewhat trivially) which worker per-host will be considered "primary"
+ # In particular, this node will handle processing any data that needs to be local on the host
+ # Also, always make worker 0 primary for its host
+ per_node_primary_workers = {job.allocations[0].hostname: 0}
+ for alloc_index in range(1, num_allocations):
+ if (hostname := job.allocations[alloc_index].hostname) not in per_node_primary_workers:
+ per_node_primary_workers[hostname] = alloc_index
+
for alloc_index in range(num_allocations):
alloc = job.allocations[alloc_index]
constraints_str = f"node.hostname == {alloc.hostname}"
constraints = list(constraints_str.split("/"))
- pattern = '{}:/dmod/datasets/{}/{}:rw'
- mounts = [pattern.format(r.fulfilled_access_at, r.category.name.lower(), r.fulfilled_by) for r in
- job.worker_data_requirements[alloc_index] if r.fulfilled_access_at is not None]
+ mounts = []
+ for req in job.worker_data_requirements[alloc_index]:
+ # TODO: (later) not sure, but this seems like it may be problematic condition that requires exception
+ if req.fulfilled_access_at is None:
+ logging.warning(f"{req.__class__.__name__} fulfilled by '{req.fulfilled_by}' for '{job.job_id!s}' "
+ f"has no value for `fulfilled_access_at`; skipping this {req.__class__.__name__} "
+ f"when assembling Docker service mounts.")
+ continue
+ cluster_volume = req.fulfilled_access_at
+ category_subdir = req.category.name.lower()
+ dataset_name = req.fulfilled_by
+ mounts.append(f"{cluster_volume}:/dmod/cluster_volumes/{category_subdir}/{dataset_name}")
+
+ # Allow this to not have been set (for now at least), but log a warning
+ if req.needs_data_local is None:
+ logging.warning(f"{req.__class__.__name__} fulfilled by '{req.fulfilled_by}' for '{job.job_id!s}' "
+ f"does not indicate explicitly whether the required data needs to be available "
+ f"locally during job execution.")
+ # For requirements that need local data, mount a local Docker volume for them
+ elif req.needs_data_local:
+ # TODO: (later/future) make sure something has checked to see that space is available on the nodes
+ local_volume = f"{dataset_name}_local_vol"
+ mounts.append(f"{local_volume}:/dmod/local_volumes/{category_subdir}/{dataset_name}")
+
#mounts.append('/local/model_as_a_service/docker_host_volumes/forcing_local:/dmod/datasets/forcing_local:rw')
# Introduce a way to inject data access directly via env config, to potentially bypass things for testing
bind_mount_from_env = getenv('DMOD_JOB_WORKER_HOST_MOUNT')
@@ -748,9 +784,10 @@ def start_job(self, job: 'Job') -> Tuple[bool, tuple]:
service_params.capabilities_to_add = ['SYS_ADMIN']
#TODO check for proper service creation, return False if doesn't work
- service = self.create_service(serviceParams=service_params, idx=alloc_index,
- docker_cmd_args=self._generate_docker_cmd_args(job, alloc_index))
-
+ cmd_args = self._generate_docker_cmd_args(job=job, worker_index=alloc_index,
+ primary_workers=per_node_primary_workers)
+ service = self.create_service(serviceParams=service_params, idx=alloc_index, docker_cmd_args=cmd_args)
+
service_per_allocation.append(service)
logging.info("\n")
diff --git a/python/lib/scheduler/dmod/test/scheduler_test_utils.py b/python/lib/scheduler/dmod/test/scheduler_test_utils.py
index 091bba9eb..84ab31ebe 100644
--- a/python/lib/scheduler/dmod/test/scheduler_test_utils.py
+++ b/python/lib/scheduler/dmod/test/scheduler_test_utils.py
@@ -86,7 +86,7 @@ def mock_job(model: str = 'nwm', cpus: int = 4, mem: int = 500000, strategy: str
data_domain = DataDomain(data_format=DataFormat.NGEN_CSV_OUTPUT,
discrete_restrictions=[DiscreteRestriction(variable='id', values=[])])
output_requirement = DataRequirement(domain=data_domain, is_input=False, category=DataCategory.OUTPUT,
- fulfilled_by=dataset_name)
+ fulfilled_by=dataset_name, needs_data_local=False)
else:
raise(ValueError("Unsupported mock model {}".format(model)))
diff --git a/python/lib/scheduler/dmod/test/test_job.py b/python/lib/scheduler/dmod/test/test_job.py
index 698f01507..613a242e1 100644
--- a/python/lib/scheduler/dmod/test/test_job.py
+++ b/python/lib/scheduler/dmod/test/test_job.py
@@ -200,7 +200,9 @@ def test_factory_init_from_deserialized_json_2_b(self):
index_val = 0
for req in base_job.data_requirements:
+ req.fulfilled_access_at = 'imaginary-dataset-{}'.format(index_val)
req.fulfilled_by = 'imaginary-dataset-{}'.format(index_val)
+ req.needs_data_local = False
index_val += 1
for f in [req.fulfilled_by for req in base_job.data_requirements]:
diff --git a/python/lib/scheduler/pyproject.toml b/python/lib/scheduler/pyproject.toml
index a03b8cadc..61b8eec31 100644
--- a/python/lib/scheduler/pyproject.toml
+++ b/python/lib/scheduler/pyproject.toml
@@ -17,7 +17,7 @@ dependencies = [
"dmod.communication>=0.22.0",
"dmod.modeldata>=0.7.1",
"dmod.redis>=0.1.0",
- "dmod.core>=0.17.0",
+ "dmod.core>=0.20.0",
"cryptography",
"uri",
"pyyaml",
diff --git a/python/services/dataservice/dmod/dataservice/_version.py b/python/services/dataservice/dmod/dataservice/_version.py
index 2c7bffbf8..2d7893e3d 100644
--- a/python/services/dataservice/dmod/dataservice/_version.py
+++ b/python/services/dataservice/dmod/dataservice/_version.py
@@ -1 +1 @@
-__version__ = '0.12.0'
+__version__ = '0.13.0'
diff --git a/python/services/dataservice/dmod/dataservice/data_derive_util.py b/python/services/dataservice/dmod/dataservice/data_derive_util.py
index 094284b86..437992f33 100644
--- a/python/services/dataservice/dmod/dataservice/data_derive_util.py
+++ b/python/services/dataservice/dmod/dataservice/data_derive_util.py
@@ -125,11 +125,12 @@ def __init__(self, dataset_manager_collection: DatasetManagerCollection, noah_ow
def _apply_dataset_to_requirement(self, dataset: Dataset, requirement: DataRequirement, job: Job):
"""
- Set ::attribute:`DataRequirement.fulfilled_access_at` and ::attribute:`DataRequirement.fulfilled_by`.
+ Set attributes that indicate details of how this dataset fulfills this requirement.
- Update the provided requirement's ::attribute:`DataRequirement.fulfilled_access_at` and
- ::attribute:`DataRequirement.fulfilled_by` attributes to associate the requirement with the provided dataset.
- The dataset is assume to have already been determined as satisfactory to fulfill the given requirement.
+ Update the provided requirement's ::attribute:`DataRequirement.fulfilled_access_at`,
+ ::attribute:`DataRequirement.fulfilled_by`, and ::attribute:`DataRequirement.needs_data_local` attributes to
+ associate the requirement with the provided dataset. The dataset is assumed to have already been determined as
+ satisfactory to fulfill the given requirement.
Parameters
----------
@@ -152,6 +153,9 @@ def _apply_dataset_to_requirement(self, dataset: Dataset, requirement: DataRequi
#################################################################################
requirement.fulfilled_access_at = self._determine_access_location(dataset, job)
#################################################################################
+
+ # Identify datasets that need data locally for job exec, and set needs_data_local to True or False
+ requirement.needs_data_local = self._managers.would_requirement_need_local_data(dataset)
requirement.fulfilled_by = dataset.name
def _build_bmi_auto_generator(self,
diff --git a/python/services/dataservice/dmod/dataservice/dataset_manager_collection.py b/python/services/dataservice/dmod/dataservice/dataset_manager_collection.py
index 6e7c6c35b..782f92627 100644
--- a/python/services/dataservice/dmod/dataservice/dataset_manager_collection.py
+++ b/python/services/dataservice/dmod/dataservice/dataset_manager_collection.py
@@ -17,6 +17,33 @@ class DatasetManagerCollection:
default_factory=dict, init=False
)
+ @classmethod
+ def would_requirement_need_local_data(cls, dataset: Dataset) -> bool:
+ """
+ Check whether a ::class:`DataRequirement` fulfilled by this dataset needs local data for job execution.
+
+ Check whether a hypothetical ::class:`DataRequirement` fulfilled by this dataset needs data cached locally for
+ execution of the requirement's parent job.
+
+ Parameters
+ ----------
+ dataset
+ The dataset fulfilling a hypothetical data requirement for a job.
+
+ Returns
+ -------
+ bool
+ Whether a ::class:`DataRequirement` fulfilled by this dataset needs data cached locally for job execution.
+ """
+ # Always require local data when something is archived
+ if dataset.is_data_archived:
+ return True
+ # Also require local data for anything in the object store
+ elif dataset.dataset_type == DatasetType.OBJECT_STORE:
+ return True
+ else:
+ return False
+
def __hash__(self) -> int:
return id(self)
diff --git a/python/services/dataservice/dmod/dataservice/service.py b/python/services/dataservice/dmod/dataservice/service.py
index 08a68a5e0..1a2ad4bfa 100644
--- a/python/services/dataservice/dmod/dataservice/service.py
+++ b/python/services/dataservice/dmod/dataservice/service.py
@@ -790,6 +790,12 @@ def _create_output_datasets(self, job: Job):
dataset_type = DatasetType.OBJECT_STORE
mgr = self._managers.manager(dataset_type)
+ # If possible, decide whether the output will need to be locally written
+ if dataset_type == DatasetType.OBJECT_STORE:
+ write_output_locally_first = True
+ else:
+ write_output_locally_first = None
+
data_format = job.model_request.output_formats[i]
# If we are writing to an object store, lots of CSV files will kill us, so switch to archived variant
if dataset_type == DatasetType.OBJECT_STORE and data_format == DataFormat.NGEN_CSV_OUTPUT:
@@ -803,7 +809,8 @@ def _create_output_datasets(self, job: Job):
discrete_restrictions=[id_restrict]))
# TODO: (later) in the future, whether the job is running via Docker needs to be checked
# TODO: also, whatever is done here needs to align with what is done within perform_checks_for_job, when
- # setting the fulfilled_access_at for the DataRequirement
+ # setting the fulfilled_access_at for the DataRequirement (and with _determine_access_location functions
+ # in python/services/dataservice/dmod/dataservice/data_derive_util.py )
is_job_run_in_docker = True
if is_job_run_in_docker:
output_access_at = dataset.docker_mount
@@ -812,7 +819,8 @@ def _create_output_datasets(self, job: Job):
raise DmodRuntimeError(msg.format(dataset.__class__.__name__, job.job_id))
# Create a data requirement for the job, fulfilled by the new dataset
requirement = DataRequirement(domain=dataset.data_domain, is_input=False, category=DataCategory.OUTPUT,
- fulfilled_by=dataset.name, fulfilled_access_at=output_access_at)
+ fulfilled_by=dataset.name, fulfilled_access_at=output_access_at,
+ needs_data_local=write_output_locally_first)
job.data_requirements.append(requirement)
async def perform_checks_for_job(self, job: Job) -> bool:
@@ -861,13 +869,16 @@ async def perform_checks_for_job(self, job: Job) -> bool:
job_ds_user.link_to_dataset(dataset=dataset)
# TODO: (later) in the future, whether the job is running via Docker needs to be checked
# TODO: also, whatever is done here needs to align with what is done within _create_output_dataset,
- # when creating the output data DataRequirement
+ # when creating the output data DataRequirement, and with _determine_access_location function
+ # in python/services/dataservice/dmod/dataservice/data_derive_util.py
is_job_run_in_docker = True
if is_job_run_in_docker:
requirement.fulfilled_access_at = dataset.docker_mount
else:
msg = "Could not determine proper access location for dataset of type {} by non-Docker job {}."
raise DmodRuntimeError(msg.format(dataset.__class__.__name__, job.job_id))
+ # Identify datasets that need data locally for job exec, and set needs_data_local to True or False
+ requirement.needs_data_local = DatasetManagerCollection.would_requirement_need_local_data(dataset)
requirement.fulfilled_by = dataset.name
return True
except Exception as e:
diff --git a/python/services/dataservice/pyproject.toml b/python/services/dataservice/pyproject.toml
index acde4194a..735513d91 100644
--- a/python/services/dataservice/pyproject.toml
+++ b/python/services/dataservice/pyproject.toml
@@ -9,9 +9,9 @@ authors = [
{ name = "Austin Raney", email = "austin.raney@noaa.gov" },
]
dependencies = [
- "dmod.core>=0.19.0",
+ "dmod.core>=0.21.0",
"dmod.communication>=0.21.0",
- "dmod.scheduler>=0.12.2",
+ "dmod.scheduler>=0.14.0",
"dmod.modeldata>=0.13.0",
"redis",
"pydantic[dotenv]>=1.10.8,~=1.10",
diff --git a/python/services/partitionerservice/dmod/partitionerservice/_version.py b/python/services/partitionerservice/dmod/partitionerservice/_version.py
index d93b5b242..0404d8103 100644
--- a/python/services/partitionerservice/dmod/partitionerservice/_version.py
+++ b/python/services/partitionerservice/dmod/partitionerservice/_version.py
@@ -1 +1 @@
-__version__ = '0.2.3'
+__version__ = '0.3.0'
diff --git a/python/services/partitionerservice/dmod/partitionerservice/service.py b/python/services/partitionerservice/dmod/partitionerservice/service.py
index 3d4435997..4dc465c92 100644
--- a/python/services/partitionerservice/dmod/partitionerservice/service.py
+++ b/python/services/partitionerservice/dmod/partitionerservice/service.py
@@ -327,7 +327,11 @@ async def _find_partition_dataset(self, job: Job) -> DatasetManagementResponse:
if response.success:
logging.info("Existing partition dataset for {} found: {}".format(job.job_id, response.dataset_name))
for r in reqs:
+ # TODO: (later) fix this not being set
+ #r.fulfilled_access_at =
r.fulfilled_by = response.dataset_name
+ # For the moment at least, we shouldn't need to worry about IO speed and copying the config locally
+ r.needs_data_local = False
else:
logging.info("No existing partition dataset for {} was found: ".format(job.job_id))
return response
@@ -415,8 +419,12 @@ async def _generate_partition_config_dataset(self, job: Job) -> bool:
# If good, save the partition dataset data_id as a data requirement for the job.
data_id_restrict = DiscreteRestriction(variable=StandardDatasetIndex.DATA_ID, values=[part_dataset_data_id])
domain = DataDomain(data_format=DataFormat.NGEN_PARTITION_CONFIG, discrete_restrictions=[data_id_restrict])
+ # TODO: (later) fix this not being set properly
+ #fulfilled_access_at =
+ fulfilled_access_at = None
requirement = DataRequirement(domain=domain, is_input=True, category=DataCategory.CONFIG,
- fulfilled_by=part_dataset_name)
+ fulfilled_by=part_dataset_name, fulfilled_access_at=fulfilled_access_at,
+ needs_data_local=False)
job.data_requirements.append(requirement)
else:
logging.error("Partition config dataset generation for {} failed".format(job.job_id))
diff --git a/python/services/partitionerservice/pyproject.toml b/python/services/partitionerservice/pyproject.toml
index 459490b9f..097d0c16e 100644
--- a/python/services/partitionerservice/pyproject.toml
+++ b/python/services/partitionerservice/pyproject.toml
@@ -9,10 +9,10 @@ authors = [
{ name = "Austin Raney", email = "austin.raney@noaa.gov" },
]
dependencies = [
- "dmod.core>=0.1.0",
+ "dmod.core>=0.21.0",
"dmod.communication>=0.7.1",
"dmod.modeldata>=0.7.1",
- "dmod.scheduler>=0.12.2",
+ "dmod.scheduler>=0.14.0",
"dmod.externalrequests>=0.3.0",
]
readme = "README.md"
diff --git a/python/services/requestservice/pyproject.toml b/python/services/requestservice/pyproject.toml
index 7e22792eb..edb80e6f1 100644
--- a/python/services/requestservice/pyproject.toml
+++ b/python/services/requestservice/pyproject.toml
@@ -12,7 +12,7 @@ authors = [
]
dependencies = [
"websockets",
- "dmod.core>=0.19.0",
+ "dmod.core>=0.21.0",
"dmod.communication>=0.22.0",
"dmod.access>=0.2.0",
"dmod.externalrequests>=0.6.0",
diff --git a/python/services/schedulerservice/pyproject.toml b/python/services/schedulerservice/pyproject.toml
index 5a8c16c6d..0013ebdc1 100644
--- a/python/services/schedulerservice/pyproject.toml
+++ b/python/services/schedulerservice/pyproject.toml
@@ -10,7 +10,7 @@ authors = [
{ name = "Austin Raney", email = "austin.raney@noaa.gov" },
]
dependencies = [
- "dmod.core>=0.17.0",
+ "dmod.core>=0.21.0",
"dmod.communication>=0.22.0",
"dmod.scheduler>=0.14.0",
]
diff --git a/scripts/prep_fast_dev_update.sh b/scripts/prep_fast_dev_update.sh
index 79200e47c..c1cc8fb1c 100755
--- a/scripts/prep_fast_dev_update.sh
+++ b/scripts/prep_fast_dev_update.sh
@@ -139,19 +139,23 @@ while [ ${#} -gt 0 ]; do
shift
done
+# Always stop GUI when GUI update flag is set, regardless of whether set for safe running or to deploy
+if [ -n "${DO_GUI:-}" ]; then
+ if ${CONTROL_SCRIPT} ${GUI_STACK_NAME} check > /dev/null; then
+ ${CONTROL_SCRIPT} ${GUI_STACK_NAME} stop
+ STOPPED_GUI_FOR_REBUILD="true"
+ sleep 1
+ fi
+fi
+
# Make sure nothing is running if it doesn't need to be, bailing or stopping it as appropriate
if [ -n "${RUN_SAFE:-}" ]; then
if ${CONTROL_SCRIPT} ${PRIMARY_STACK_NAME} check > /dev/null; then
>&2 echo "Error: option for safe mode active and found primary '${PRIMARY_STACK_NAME}' stack running; exiting."
exit 1
fi
+# If deploying, and stack is running, make sure to stop it
elif [ -n "${DO_DEPLOY:-}" ]; then
- if ${CONTROL_SCRIPT} ${GUI_STACK_NAME} check > /dev/null; then
- ${CONTROL_SCRIPT} ${GUI_STACK_NAME} stop
- STOPPED_GUI_FOR_REBUILD="true"
- sleep 1
- fi
-
if ${CONTROL_SCRIPT} ${PRIMARY_STACK_NAME} check > /dev/null; then
${CONTROL_SCRIPT} ${PRIMARY_STACK_NAME} stop
echo "Waiting for services to stop ..."
@@ -159,15 +163,6 @@ elif [ -n "${DO_DEPLOY:-}" ]; then
fi
fi
-if [ -n "${DO_GUI:-}" ]; then
- if [ -z "${STOPPED_GUI_FOR_REBUILD:-}" ]; then
- if ${CONTROL_SCRIPT} ${GUI_STACK_NAME} check > /dev/null; then
- ${CONTROL_SCRIPT} ${GUI_STACK_NAME} stop
- STOPPED_GUI_FOR_REBUILD="true"
- fi
- fi
-fi
-
# Prepare a Docker volume for the dmod Python packages, removing any existing
# Do in background so other things can be done
try_clean_volume &
@@ -178,16 +173,17 @@ if [ -n "${JUST_REMOVE_VOLUME:-}" ]; then
exit
fi
-# Build updated py-sources image; if requested, build everything, but by default, just build the last image
+# Build and push updated py-sources image; if requested, build everything, but by default, just build the last image
if [ -n "${DO_FULL_BUILD:-}" ]; then
- ${CONTROL_SCRIPT} ${PY_PACKAGES_STACK_NAME} build
+ ${CONTROL_SCRIPT} ${PY_PACKAGES_STACK_NAME} build push
else
${CONTROL_SCRIPT} --build-args "${PY_PACKAGES_LAST_SERVICE_NAME}" ${PY_PACKAGES_STACK_NAME} build
+ ${CONTROL_SCRIPT} ${PY_PACKAGES_STACK_NAME} push
fi
-if [ -n "${STOPPED_GUI_FOR_REBUILD:-}${DO_GUI:-}" ]; then
+if [ -n "${DO_GUI:-}" ]; then
echo "Rebuilding nwm_gui stack app service image"
- ${CONTROL_SCRIPT} ${GUI_STACK_NAME} build &
+ ${CONTROL_SCRIPT} ${GUI_STACK_NAME} build push &
_REBUILD_GUI_IMAGES_PID=$!
fi