-
Notifications
You must be signed in to change notification settings - Fork 47
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Starting as an initial warning fix (`max_workers`), refactor tests to verify the Compute additions independently of Parsl -- Compute is not Parsl so let Parsl be responsible for its own testing. In so doing, reduce the time for each test and more strongly assert correctness via use of `randomstring`. Move updated tests to `tests/unit/`.
- Loading branch information
1 parent
59889bd
commit e8f51c6
Showing
2 changed files
with
70 additions
and
153 deletions.
There are no files selected for viewing
103 changes: 0 additions & 103 deletions
103
compute_endpoint/tests/integration/endpoint/endpoint/test_gce_container.py
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,71 +1,91 @@ | ||
import logging | ||
import random | ||
import typing as t | ||
import uuid | ||
from unittest import mock | ||
|
||
import pytest | ||
from globus_compute_endpoint.engines.globus_compute import GlobusComputeEngine | ||
|
||
|
||
def test_docker(tmp_path, uri="funcx/kube-endpoint:main-3.10"): | ||
gce = GlobusComputeEngine( | ||
address="127.0.0.1", container_type="docker", container_uri=uri | ||
_MOCK_BASE = "globus_compute_endpoint.engines.globus_compute." | ||
_LAUNCH_CMD_PREFIX = ( | ||
"globus-compute-endpoint python-exec" | ||
" parsl.executors.high_throughput.process_worker_pool" | ||
) | ||
|
||
|
||
@pytest.fixture | ||
def gce_factory(tmp_path, randomstring) -> t.Callable: | ||
def _kernel(**k): | ||
expect_uri = randomstring(length=random.randint(1, 20)) | ||
expect_opts = randomstring(length=random.randint(1, 20)) | ||
k = { | ||
"address": "127.0.0.1", | ||
"max_workers_per_node": 1, | ||
"label": "GCE_TEST", | ||
"container_uri": expect_uri, | ||
"container_cmd_options": expect_opts, | ||
**k, | ||
} | ||
with mock.patch(f"{_MOCK_BASE}ReportingThread"): | ||
gce = GlobusComputeEngine(**k) | ||
with mock.patch.object(gce.executor, "start"): | ||
gce.start(endpoint_id=uuid.uuid4(), run_dir=tmp_path) | ||
assert gce.executor.start.called | ||
|
||
# No cleanup necessary because executor not started | ||
return gce, expect_uri, expect_opts | ||
|
||
yield _kernel | ||
|
||
|
||
def test_docker(tmp_path, gce_factory): | ||
gce, exp_uri, exp_opts = gce_factory(container_type="docker") | ||
container_launch_cmd = gce.executor.launch_cmd | ||
expected = ( | ||
f"docker run {exp_opts} -v {tmp_path}:{tmp_path} -t" | ||
f" {exp_uri} {_LAUNCH_CMD_PREFIX}" | ||
) | ||
assert container_launch_cmd.startswith(expected) | ||
|
||
gce.executor.start = mock.MagicMock() | ||
gce.start(endpoint_id=uuid.uuid4(), run_dir=tmp_path) | ||
|
||
assert gce.executor.launch_cmd | ||
assert gce.executor.launch_cmd.startswith("docker run") | ||
assert uri in gce.executor.launch_cmd | ||
gce.executor.start.assert_called() | ||
# No cleanup necessary because HTEX was not started | ||
def test_apptainer(gce_factory): | ||
gce, exp_uri, exp_opts = gce_factory(container_type="apptainer") | ||
container_launch_cmd = gce.executor.launch_cmd | ||
expected = f"apptainer run {exp_opts} {exp_uri} {_LAUNCH_CMD_PREFIX}" | ||
assert container_launch_cmd.startswith(expected) | ||
|
||
|
||
def test_apptainer(tmp_path, uri="/tmp/kube-endpoint.sif"): | ||
gce = GlobusComputeEngine( | ||
address="127.0.0.1", container_type="apptainer", container_uri=uri | ||
) | ||
def test_singularity(gce_factory, randomstring): | ||
gce, exp_uri, exp_opts = gce_factory(container_type="singularity") | ||
container_launch_cmd = gce.executor.launch_cmd | ||
expected = f"singularity run {exp_opts} {exp_uri} {_LAUNCH_CMD_PREFIX}" | ||
assert container_launch_cmd.startswith(expected) | ||
|
||
gce.executor.start = mock.MagicMock() | ||
gce.start(endpoint_id=uuid.uuid4(), run_dir=tmp_path) | ||
|
||
assert gce.executor.launch_cmd | ||
assert gce.executor.launch_cmd.startswith("apptainer run") | ||
assert uri in gce.executor.launch_cmd | ||
gce.executor.start.assert_called() | ||
def test_custom_missing_options(tmp_path): | ||
with pytest.raises(AssertionError) as pyt_e: | ||
with mock.patch(f"{_MOCK_BASE}ReportingThread"): | ||
GlobusComputeEngine( | ||
address="127.0.0.1", | ||
max_workers_per_node=1, | ||
label="GCE_TEST", | ||
container_type="custom", | ||
).start(endpoint_id=uuid.uuid4(), run_dir=tmp_path) | ||
assert "container_cmd_options is required" in str(pyt_e.value) | ||
|
||
|
||
def test_custom(tmp_path): | ||
gce = GlobusComputeEngine( | ||
address="127.0.0.1", | ||
def test_custom(gce_factory, randomstring): | ||
exp_exec = randomstring() | ||
gce, *_ = gce_factory( | ||
container_type="custom", | ||
container_cmd_options=( | ||
"mycontainer -v {EXECUTOR_RUNDIR}:" | ||
"{EXECUTOR_RUNDIR} {EXECUTOR_LAUNCH_CMD}" | ||
), | ||
container_uri=None, # not necessary, but not used; "undo" test factory default | ||
container_cmd_options=f"{exp_exec} {{EXECUTOR_RUNDIR}} {{EXECUTOR_LAUNCH_CMD}}", | ||
) | ||
|
||
gce.executor.start = mock.MagicMock() | ||
gce.start(endpoint_id=uuid.uuid4(), run_dir=tmp_path) | ||
logging.warning(f"Got launch {gce.executor.launch_cmd}") | ||
|
||
assert gce.executor.launch_cmd | ||
assert gce.executor.launch_cmd.startswith("mycontainer") | ||
assert f"{tmp_path}:{tmp_path}" in gce.executor.launch_cmd | ||
assert "process_worker_pool" in gce.executor.launch_cmd | ||
gce.executor.start.assert_called() | ||
container_launch_cmd = gce.executor.launch_cmd | ||
expected = f"{exp_exec} {gce.run_dir} {_LAUNCH_CMD_PREFIX}" | ||
assert container_launch_cmd.startswith(expected) | ||
|
||
|
||
def test_bad_container(tmp_path): | ||
def test_bad_container(): | ||
with pytest.raises(AssertionError): | ||
GlobusComputeEngine(address="127.0.0.1", container_type="BAD") | ||
|
||
|
||
def test_no_container(tmp_path): | ||
gce = GlobusComputeEngine(address="127.0.0.1") | ||
original = gce.executor.launch_cmd | ||
|
||
gce.executor.start = mock.MagicMock() | ||
gce.start(endpoint_id=uuid.uuid4(), run_dir=tmp_path) | ||
|
||
assert gce.executor.launch_cmd == original |