diff --git a/RELEASING.md b/RELEASING.md index 669749102..8db802535 100644 --- a/RELEASING.md +++ b/RELEASING.md @@ -112,4 +112,62 @@ git push are properly added to the `main` branch. 6. Create a GitHub release from the tag. See [GitHub documentation](https://docs.github.com/en/repositories/releasing-projects-on-github/managing-releases-in-a-repository#creating-a-release) - for instructions. \ No newline at end of file + for instructions. + +### DEB/RPM Packaging Workflow + +#### Pre-requisites + +Before building the packages, ensure that the release itself, either the alpha +or prod versions, is published on PyPI. + +Additionally, VPN needs to be enabled for the build page. + +#### Build Process + +In the future, building the DEB/RPM packages will be a simple one-step button +click of the green **Build** button on the Globus Compute Agent +[build page here](https://builds.globus.org/jenkins/job/BuildGlobusComputeAgentPackages/build?delay=0sec). + +As a temporary workaround, we need to add a few lines to manually set some +env variables in our [JenkinsFile](https://github.com/globus/globus-compute/blob/743fa1e398fd40a00efb5880c55e3fa6e47392fc/compute_endpoint/packaging/JenkinsFile#L24) before triggering the build, as detailed below. + +1. Git checkout both the current release branch that was recently pushed to + PyPI, ie. ``v2.23.0`` or ``v2.25.0a0`` and the ``build_for_stable`` branch +2. Rebase ``build_for_stable`` on the release branch which should result in + adding the following ~6 lines: + + ... + env.BRANCH_NAME = scmVars.GIT_BRANCH.replaceFirst(/^.*origin\//, "") + + env.TAG_NAME = sh(returnStdout: true, script: "git tag --contains | head -1").trim() + env.SOURCE_STASH_NAME = "${UUID.randomUUID()}" + echo "env.BRANCH_NAME = ${env.BRANCH_NAME}" + sh "git clean -fdx" + + + // temporary hack to build for stable + + sh "git checkout build_for_stable" + + env.TAG_NAME = "v2.23.0" + + env.DEFAULT_BRANCH = "build_for_stable" + + + dir("compute_endpoint/packaging/") { + ... + +3. Change the ``env.TAG_NAME`` above to the current production release version + * Note that ``env.TAG_NAME`` determines whether the build is sent to + the ``unstable`` repo or also to the ``testing`` and ``stable`` ones. + * Example of unstable repo: + * https://downloads.globus.org/globus-connect-server/unstable/rpm/el/9/x86_64/ + * Example of stable repo: + * https://downloads.globus.org/globus-connect-server/stable/rpm/el/9/x86_64/ + * The logic of whether a release is stable is determined by whether the + package version of Globus Compute Endpoint set in ``version.py`` or + ``setup.py`` matches ``env.TAG_NAME`` above. If they are unequal, then + [publishResults.groovy line 85](https://github.com/globusonline/gcs-build-scripts/blob/168617a0ccbb0aee7b3bee04ee67940bbe2a80f6/vars/publishResults.groovy#L85) + will be (``tag`` : v2.23.0) != (``stable_tag`` : v2.23.0a0), where + stable_tag is constructed from the package version of an alpha release. +4. Commit and push your ``build_for_stable`` branch +5. (Access on VPN) Click the [build button here](https://builds.globus.org/jenkins/job/BuildGlobusComputeAgentPackages/build?delay=0sec) +6. Wait 20-30 minutes and confirm that the [build is green](https://builds.globus.org/jenkins/job/BuildGlobusComputeAgentPackages/) +7. For production release, we will have finished the build before the GCS + team, and can notify them that our build is complete. They then will + publish all packages when they finish their builds, which includes ours. diff --git a/compute_endpoint/packaging/JenkinsFile b/compute_endpoint/packaging/JenkinsFile index d46f598d4..144310bf2 100644 --- a/compute_endpoint/packaging/JenkinsFile +++ b/compute_endpoint/packaging/JenkinsFile @@ -22,8 +22,10 @@ pipeline { script { def scmVars = checkout scm env.BRANCH_NAME = scmVars.GIT_BRANCH.replaceFirst(/^.*origin\//, "") + env.TAG_NAME = sh(returnStdout: true, script: "git tag --contains | head -1").trim() env.SOURCE_STASH_NAME = "${UUID.randomUUID()}" echo "env.BRANCH_NAME = ${env.BRANCH_NAME}" + echo "env.DEFAULT_BRANCH = ${env.DEFAULT_BRANCH}" sh "git clean -fdx" dir("compute_endpoint/packaging/") { diff --git a/compute_endpoint/tests/conftest.py b/compute_endpoint/tests/conftest.py index f58d8bace..2b2707dd6 100644 --- a/compute_endpoint/tests/conftest.py +++ b/compute_endpoint/tests/conftest.py @@ -12,11 +12,14 @@ import globus_sdk import pytest import responses +from globus_compute_common import messagepack from globus_compute_endpoint import engines from globus_compute_endpoint.engines.base import GlobusComputeEngineBase from globus_compute_sdk.sdk.web_client import WebClient +from globus_compute_sdk.serialize import ComputeSerializer from parsl.launchers import SimpleLauncher from parsl.providers import LocalProvider +from tests.utils import ez_pack_function @pytest.fixture(autouse=True) @@ -192,3 +195,31 @@ def _wrapped(*args, **kwargs): pass return _wrapped + + +@pytest.fixture +def task_uuid() -> uuid.UUID: + return uuid.uuid4() + + +@pytest.fixture +def container_uuid() -> uuid.UUID: + return uuid.uuid4() + + +@pytest.fixture(scope="module") +def serde(): + return ComputeSerializer() + + +@pytest.fixture +def ez_pack_task(serde, task_uuid, container_uuid): + def _pack_it(fn, *a, **k) -> bytes: + task_body = ez_pack_function(serde, fn, a, k) + return messagepack.pack( + messagepack.message_types.Task( + task_id=task_uuid, container_id=container_uuid, task_buffer=task_body + ) + ) + + return _pack_it diff --git a/compute_endpoint/tests/integration/endpoint/endpoint/test_gcengine_shell_functions.py b/compute_endpoint/tests/integration/endpoint/endpoint/test_gcengine_shell_functions.py index 27f8ec286..c45a7be4e 100644 --- a/compute_endpoint/tests/integration/endpoint/endpoint/test_gcengine_shell_functions.py +++ b/compute_endpoint/tests/integration/endpoint/endpoint/test_gcengine_shell_functions.py @@ -1,31 +1,22 @@ -import uuid - import pytest from globus_compute_common import messagepack from globus_compute_endpoint.engines import GlobusComputeEngine from globus_compute_sdk.sdk.shell_function import ShellFunction -from globus_compute_sdk.serialize import ComputeSerializer -from tests.utils import ez_pack_function -def test_shell_function(engine_runner, tmp_path): +def test_shell_function(engine_runner, tmp_path, task_uuid, serde, ez_pack_task): """Test running ShellFunction with GCE: Happy path""" engine = engine_runner(GlobusComputeEngine) - task_id = uuid.uuid1() - serializer = ComputeSerializer() shell_func = ShellFunction("pwd") - task_body = ez_pack_function(serializer, shell_func, (), {}) - task_message = messagepack.pack( - messagepack.message_types.Task(task_id=task_id, task_buffer=task_body) - ) - future = engine.submit(task_id, task_message, resource_specification={}) + task_bytes = ez_pack_task(shell_func) + future = engine.submit(task_uuid, task_bytes, resource_specification={}) packed_result = future.result() result = messagepack.unpack(packed_result) - assert result.task_id == task_id + assert result.task_id == task_uuid assert result.error_details is None - result_obj = serializer.deserialize(result.data) + result_obj = serde.deserialize(result.data) assert "pwd" == result_obj.cmd assert result_obj.returncode == 0 @@ -42,46 +33,38 @@ def test_shell_function(engine_runner, tmp_path): ("touch foo; ./foo", "Permission denied", 126), ], ) -def test_fail_shell_function(engine_runner, tmp_path, cmd, error_str, returncode): +def test_fail_shell_function( + engine_runner, cmd, error_str, returncode, serde, task_uuid, ez_pack_task +): """Test running ShellFunction with GCE: Failure path""" engine = engine_runner(GlobusComputeEngine, run_in_sandbox=True) - task_id = uuid.uuid1() - serializer = ComputeSerializer() shell_func = ShellFunction(cmd, walltime=0.1) - task_body = ez_pack_function(serializer, shell_func, (), {}) - task_message = messagepack.pack( - messagepack.message_types.Task(task_id=task_id, task_buffer=task_body) - ) - future = engine.submit(task_id, task_message, resource_specification={}) + task_bytes = ez_pack_task(shell_func) + future = engine.submit(task_uuid, task_bytes, resource_specification={}) packed_result = future.result() result = messagepack.unpack(packed_result) - assert result.task_id == task_id + assert result.task_id == task_uuid assert not result.error_details - result_obj = serializer.deserialize(result.data) + result_obj = serde.deserialize(result.data) assert error_str in result_obj.stderr assert result_obj.returncode == returncode -def test_no_sandbox(engine_runner, tmp_path): +def test_no_sandbox(engine_runner, task_uuid, serde, ez_pack_task): """Test running ShellFunction without sandbox""" engine = engine_runner(GlobusComputeEngine, run_in_sandbox=False) - task_id = uuid.uuid1() - serializer = ComputeSerializer() shell_func = ShellFunction("pwd") - task_body = ez_pack_function(serializer, shell_func, (), {}) - task_message = messagepack.pack( - messagepack.message_types.Task(task_id=task_id, task_buffer=task_body) - ) - future = engine.submit(task_id, task_message, resource_specification={}) + task_bytes = ez_pack_task(shell_func) + future = engine.submit(task_uuid, task_bytes, resource_specification={}) packed_result = future.result() result = messagepack.unpack(packed_result) - assert result.task_id == task_id + assert result.task_id == task_uuid assert result.error_details is None - result_obj = serializer.deserialize(result.data) + result_obj = serde.deserialize(result.data) assert "pwd" == result_obj.cmd assert result_obj.returncode == 0 diff --git a/compute_endpoint/tests/integration/endpoint/executors/high_throughput/test_htex_regression.py b/compute_endpoint/tests/integration/endpoint/executors/high_throughput/test_htex_regression.py index 990e0ceca..1c7a8823f 100644 --- a/compute_endpoint/tests/integration/endpoint/executors/high_throughput/test_htex_regression.py +++ b/compute_endpoint/tests/integration/endpoint/executors/high_throughput/test_htex_regression.py @@ -5,8 +5,7 @@ import pytest from globus_compute_common import messagepack from globus_compute_endpoint.engines import HighThroughputEngine -from globus_compute_sdk.serialize import ComputeSerializer -from tests.utils import double, ez_pack_function +from tests.utils import double @pytest.fixture @@ -24,20 +23,13 @@ def engine(tmp_path): engine.shutdown() -def test_engine_submit(engine): +def test_engine_submit(engine, serde, task_uuid, ez_pack_task): q = engine.results_passthrough - task_id = uuid.uuid1() - serializer = ComputeSerializer() task_arg = random.randint(1, 1000) - task_body = ez_pack_function(serializer, double, (task_arg,), {}) - task_message = messagepack.pack( - messagepack.message_types.Task( - task_id=task_id, container_id=uuid.uuid1(), task_buffer=task_body - ) - ) + task_bytes = ez_pack_task(double, task_arg) resource_spec = {} future = engine.submit( - str(task_id), task_message, resource_specification=resource_spec + str(task_uuid), task_bytes, resource_specification=resource_spec ) packed_result = future.result() @@ -45,7 +37,7 @@ def test_engine_submit(engine): assert isinstance(packed_result, bytes) result = messagepack.unpack(packed_result) assert isinstance(result, messagepack.message_types.Result) - assert result.task_id == task_id + assert result.task_id == task_uuid # Confirm that the same result got back though the queue for _i in range(10): @@ -63,8 +55,8 @@ def test_engine_submit(engine): packed_result == packed_result_q ), "Result from passthrough_q and future should match" - assert result.task_id == task_id - final_result = serializer.deserialize(result.data) + assert result.task_id == task_uuid + final_result = serde.deserialize(result.data) expected = task_arg * 2 assert final_result == expected, f"Expected {expected}, but got: {final_result}" break diff --git a/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py b/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py index 2ab618183..a1dc84681 100644 --- a/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py +++ b/compute_endpoint/tests/integration/endpoint/executors/test_gcengine_retries.py @@ -8,10 +8,9 @@ import pytest from globus_compute_common import messagepack from globus_compute_endpoint.engines import GlobusComputeEngine -from globus_compute_sdk.serialize import ComputeSerializer from parsl.executors.high_throughput.interchange import ManagerLost from parsl.providers import LocalProvider -from tests.utils import ez_pack_function, slow_double +from tests.utils import slow_double class MockHTEX: @@ -54,59 +53,41 @@ def mock_gce(tmp_path): yield engine -def test_success_after_1_fail(mock_gce, tmp_path): +def test_success_after_1_fail(mock_gce, serde, ez_pack_task): engine = mock_gce engine.max_retries_on_system_failure = 2 - queue = engine.results_passthrough + q = engine.results_passthrough task_id = uuid.uuid1() - serializer = ComputeSerializer() num = random.randint(1, 10000) + task_bytes = ez_pack_task(slow_double, num, 0.2) # Set the failure count on the mock executor to force failure engine.executor.fail_count = 1 + engine.submit(task_id, task_bytes, resource_specification={}) - task_body = ez_pack_function( - serializer, - slow_double, - ( - num, - 0.2, - ), - {}, - ) - task_message = messagepack.pack( - messagepack.message_types.Task(task_id=task_id, task_buffer=task_body) - ) - - engine.submit(task_id, task_message, resource_specification={}) - - packed_result = queue.get() + packed_result = q.get() assert isinstance(packed_result, dict) result = messagepack.unpack(packed_result["message"]) assert result.task_id == task_id - assert serializer.deserialize(result.data) == 2 * num + assert serde.deserialize(result.data) == 2 * num -def test_repeated_fail(mock_gce, tmp_path): +def test_repeated_fail(mock_gce, ez_pack_task): fail_count = 2 engine = mock_gce engine.max_retries_on_system_failure = fail_count - queue = engine.results_passthrough + q = engine.results_passthrough task_id = uuid.uuid1() - serializer = ComputeSerializer() # Set executor to continue failures beyond retry limit engine.executor.fail_count = fail_count + 1 - task_body = ez_pack_function(serializer, slow_double, (5,), {}) - task_message = messagepack.pack( - messagepack.message_types.Task(task_id=task_id, task_buffer=task_body) - ) + task_bytes = ez_pack_task(slow_double, 5) - engine.submit(task_id, task_message, resource_specification={}) + engine.submit(task_id, task_bytes, resource_specification={}) - packed_result_q = queue.get(10) + packed_result_q = q.get() result = messagepack.unpack(packed_result_q["message"]) assert isinstance(result, messagepack.message_types.Result) assert result.task_id == task_id diff --git a/compute_endpoint/tests/integration/endpoint/executors/test_mpiengine.py b/compute_endpoint/tests/integration/endpoint/executors/test_mpiengine.py index 17820e476..e689b3964 100644 --- a/compute_endpoint/tests/integration/endpoint/executors/test_mpiengine.py +++ b/compute_endpoint/tests/integration/endpoint/executors/test_mpiengine.py @@ -1,68 +1,55 @@ import concurrent.futures import random -import uuid import pytest from globus_compute_common import messagepack from globus_compute_endpoint.engines import GlobusMPIEngine from globus_compute_sdk.sdk.mpi_function import MPIFunction from globus_compute_sdk.sdk.shell_function import ShellResult -from globus_compute_sdk.serialize import ComputeSerializer from parsl.executors.high_throughput.mpi_prefix_composer import ( InvalidResourceSpecification, ) -from tests.utils import ez_pack_function, get_env_vars +from tests.utils import get_env_vars -def test_mpi_function(engine_runner, nodeslist): +def test_mpi_function(engine_runner, nodeslist, serde, task_uuid, ez_pack_task): """Test for the right cmd being generated""" engine = engine_runner(GlobusMPIEngine) - task_id = uuid.uuid1() - serializer = ComputeSerializer() num_nodes = random.randint(1, len(nodeslist)) resource_spec = {"num_nodes": num_nodes, "num_ranks": random.randint(1, 4)} mpi_func = MPIFunction("pwd") - task_body = ez_pack_function(serializer, mpi_func, (), {}) - task_message = messagepack.pack( - messagepack.message_types.Task(task_id=task_id, task_buffer=task_body) - ) - future = engine.submit(task_id, task_message, resource_specification=resource_spec) + task_bytes = ez_pack_task(mpi_func) + future = engine.submit(task_uuid, task_bytes, resource_specification=resource_spec) packed_result = future.result(timeout=10) result = messagepack.unpack(packed_result) assert isinstance(result, messagepack.message_types.Result) - assert result.task_id == task_id + assert result.task_id == task_uuid assert result.error_details is None - result = serializer.deserialize(result.data) + result = serde.deserialize(result.data) assert isinstance(result, ShellResult) assert result.cmd == "$PARSL_MPI_PREFIX pwd" -def test_env_vars(engine_runner, nodeslist, tmp_path): +def test_env_vars(engine_runner, nodeslist, serde, task_uuid, ez_pack_task): engine = engine_runner(GlobusMPIEngine) - task_id = uuid.uuid1() - serializer = ComputeSerializer() - task_body = ez_pack_function(serializer, get_env_vars, (), {}) - - task_message = messagepack.pack( - messagepack.message_types.Task(task_id=task_id, task_buffer=task_body) - ) + task_bytes = ez_pack_task(get_env_vars) num_nodes = random.randint(1, len(nodeslist)) future = engine.submit( - task_id, - task_message, + task_uuid, + task_bytes, resource_specification={"num_nodes": num_nodes, "num_ranks": 2}, ) packed_result = future.result(timeout=10) result = messagepack.unpack(packed_result) assert isinstance(result, messagepack.message_types.Result) - assert result.task_id == task_id + assert result.task_id == task_uuid assert result.error_details is None - inner_result = serializer.deserialize(result.data) + inner_result = serde.deserialize(result.data) assert inner_result["PARSL_NUM_NODES"] == str(num_nodes) assert inner_result["PARSL_MPI_PREFIX"].startswith("mpiexec") diff --git a/compute_endpoint/tests/unit/test_engines.py b/compute_endpoint/tests/unit/test_engines.py index cec3250d9..2ef427027 100644 --- a/compute_endpoint/tests/unit/test_engines.py +++ b/compute_endpoint/tests/unit/test_engines.py @@ -4,7 +4,6 @@ import random import time import typing as t -import uuid from queue import Queue from unittest import mock @@ -22,57 +21,26 @@ ThreadPoolEngine, ) from globus_compute_endpoint.engines.base import GlobusComputeEngineBase -from globus_compute_sdk.serialize import ComputeSerializer from parsl import HighThroughputExecutor from parsl.executors.high_throughput.interchange import ManagerLost from parsl.providers import KubernetesProvider -from pytest_mock import MockFixture -from tests.utils import double, ez_pack_function, kill_manager +from tests.utils import double, kill_manager logger = logging.getLogger(__name__) -@pytest.fixture(scope="module") -def serde(): - return ComputeSerializer() - - -@pytest.fixture -def task_uuid() -> uuid.UUID: - return uuid.uuid4() - - -@pytest.fixture -def container_uuid() -> uuid.UUID: - return uuid.uuid4() - - -@pytest.fixture -def ez_pack_task(serde, task_uuid, container_uuid): - def _pack_it(fn, *a, **k) -> bytes: - task_body = ez_pack_function(serde, fn, a, k) - return messagepack.pack( - messagepack.message_types.Task( - task_id=task_uuid, container_id=container_uuid, task_buffer=task_body - ) - ) - - return _pack_it - - -def test_result_message_packing(serde): +def test_result_message_packing(serde, task_uuid): exec_start = TaskTransition( timestamp=time.time_ns(), state=TaskState.EXEC_START, actor=ActorName.WORKER ) - task_id = uuid.uuid1() result = random.randint(0, 1000) exec_end = TaskTransition( timestamp=time.time_ns(), state=TaskState.EXEC_END, actor=ActorName.WORKER ) result_message = dict( - task_id=task_id, + task_id=task_uuid, data=serde.serialize(result), task_statuses=[exec_start, exec_end], ) @@ -84,9 +52,8 @@ def test_result_message_packing(serde): unpacked = messagepack.unpack(packed_result) assert isinstance(unpacked, messagepack.message_types.Result) - # assert unpacked. - logger.warning(f"Type of unpacked : {unpacked}") - assert unpacked.task_id == task_id + + assert unpacked.task_id == task_uuid assert serde.deserialize(unpacked.data) == result @@ -227,32 +194,31 @@ def test_gcengine_pass_through_to_executor(randomstring): assert kwargs == k -def test_gcengine_start_pass_through_to_executor(tmp_path: pathlib.Path): +def test_gcengine_start_pass_through_to_executor(tmp_path: pathlib.Path, endpoint_uuid): mock_ex = mock.Mock( status_polling_interval=5, launch_cmd="foo-bar", interchange_launch_cmd="foo-bar", ) - run_dir = tmp_path scripts_dir = str(tmp_path / "submit_scripts") with mock.patch.object(GlobusComputeEngine, "_ExecutorClass", mock.Mock): engine = GlobusComputeEngine(executor=mock_ex) - assert mock_ex.run_dir != run_dir + assert mock_ex.run_dir != tmp_path assert mock_ex.provider.script_dir != scripts_dir - engine.start(endpoint_id=uuid.uuid4(), run_dir=run_dir, results_passthrough=Queue()) + engine.start( + endpoint_id=endpoint_uuid, run_dir=tmp_path, results_passthrough=Queue() + ) engine.shutdown() - assert mock_ex.run_dir == run_dir + assert mock_ex.run_dir == tmp_path assert mock_ex.provider.script_dir == scripts_dir -def test_gcengine_start_provider_without_channel( - mocker: MockFixture, tmp_path: pathlib.Path -): +def test_gcengine_start_provider_without_channel(tmp_path: pathlib.Path, endpoint_uuid): mock_executor = mock.Mock(spec=HighThroughputExecutor) mock_executor.status_polling_interval = 5 mock_executor.provider = mock.Mock(spec=KubernetesProvider) @@ -263,7 +229,7 @@ def test_gcengine_start_provider_without_channel( engine = GlobusComputeEngine(executor=mock_executor) engine.start( - endpoint_id=uuid.uuid4(), run_dir=tmp_path, results_passthrough=Queue() + endpoint_id=endpoint_uuid, run_dir=tmp_path, results_passthrough=Queue() ) engine.shutdown() @@ -295,16 +261,14 @@ def test_gcengine_executor_exception_passthrough(randomstring): assert exc_text in str(gce.executor_exception) -def test_gcengine_bad_state_futures_failed_immediately(randomstring): +def test_gcengine_bad_state_futures_failed_immediately(randomstring, task_uuid): gce = GlobusComputeEngine() exc_text = randomstring() gce.executor.set_bad_state_and_fail_all(ZeroDivisionError(exc_text)) taskb = b"some packed task bytes" futs = [ - gce.submit( - task_id=str(uuid.uuid4()), packed_task=taskb, resource_specification={} - ) + gce.submit(task_id=str(task_uuid), packed_task=taskb, resource_specification={}) for _ in range(5) ] @@ -312,13 +276,12 @@ def test_gcengine_bad_state_futures_failed_immediately(randomstring): assert all(exc_text in str(f.exception()) for f in futs) -def test_gcengine_exception_report_from_bad_state(): +def test_gcengine_exception_report_from_bad_state(task_uuid): gce = GlobusComputeEngine() gce.executor.set_bad_state_and_fail_all(ZeroDivisionError()) - task_id = uuid.uuid4() gce.submit( - task_id=str(task_id), resource_specification={}, packed_task=b"MOCK_PACKED_TASK" + task_id=str(task_uuid), resource_specification={}, packed_task=b"MOCK_TASK" ) result = None @@ -329,7 +292,7 @@ def test_gcengine_exception_report_from_bad_state(): if isinstance(result, messagepack.message_types.Result): break - assert result.task_id == task_id + assert result.task_id == task_uuid assert result.error_details.code == "RemoteExecutionError" assert "ZeroDivisionError" in result.data diff --git a/compute_endpoint/tests/unit/test_execute_task.py b/compute_endpoint/tests/unit/test_execute_task.py index 5164c7bf9..8eb944268 100644 --- a/compute_endpoint/tests/unit/test_execute_task.py +++ b/compute_endpoint/tests/unit/test_execute_task.py @@ -1,11 +1,8 @@ import logging -import uuid from unittest import mock from globus_compute_common import messagepack from globus_compute_endpoint.engines.helper import execute_task -from globus_compute_sdk.serialize import ComputeSerializer -from tests.utils import ez_pack_function logger = logging.getLogger(__name__) @@ -16,20 +13,12 @@ def divide(x, y): return x / y -def test_execute_task(): - serializer = ComputeSerializer() - ep_id = uuid.uuid1() - task_id = uuid.uuid1() - input, output = (10, 2), 5 - task_body = ez_pack_function(serializer, divide, input, {}) +def test_execute_task(endpoint_uuid, serde, task_uuid, ez_pack_task): + inp, outp = (10, 2), 5 - task_message = messagepack.pack( - messagepack.message_types.Task( - task_id=task_id, container_id=uuid.uuid1(), task_buffer=task_body - ) - ) + task_bytes = ez_pack_task(divide, *inp) - packed_result = execute_task(task_id, task_message, ep_id) + packed_result = execute_task(task_uuid, task_bytes, endpoint_uuid) assert isinstance(packed_result, bytes) result = messagepack.unpack(packed_result) @@ -39,31 +28,14 @@ def test_execute_task(): assert "python_version" in result.details assert "dill_version" in result.details assert "endpoint_id" in result.details - assert serializer.deserialize(result.data) == output + assert serde.deserialize(result.data) == outp -def test_execute_task_with_exception(): - serializer = ComputeSerializer() - ep_id = uuid.uuid1() - task_id = uuid.uuid1() - task_body = ez_pack_function( - serializer, - divide, - ( - 10, - 0, - ), - {}, - ) - - task_message = messagepack.pack( - messagepack.message_types.Task( - task_id=task_id, container_id=uuid.uuid1(), task_buffer=task_body - ) - ) +def test_execute_task_with_exception(endpoint_uuid, task_uuid, ez_pack_task): + task_bytes = ez_pack_task(divide, 10, 0) with mock.patch(f"{_MOCK_BASE}log") as mock_log: - packed_result = execute_task(task_id, task_message, ep_id) + packed_result = execute_task(task_uuid, task_bytes, endpoint_uuid) assert mock_log.exception.called a, _k = mock_log.exception.call_args diff --git a/compute_endpoint/tests/unit/test_htex.py b/compute_endpoint/tests/unit/test_htex.py index 6c8644588..f32857b84 100644 --- a/compute_endpoint/tests/unit/test_htex.py +++ b/compute_endpoint/tests/unit/test_htex.py @@ -38,22 +38,15 @@ def htex(): @pytest.mark.skip("Skip until HTEX has been fixed up") -def test_htex_submit_raw(htex): +def test_htex_submit_raw(htex, serde, task_uuid, ez_pack_task): """Testing the HighThroughputExecutor/Engine""" engine = htex q = engine.results_passthrough - task_id = uuid.uuid1() - serializer = ComputeSerializer() - task_body = ez_pack_function(serializer, double, (3,), {}) - task_message = messagepack.pack( - messagepack.message_types.Task( - task_id=task_id, container_id=uuid.uuid1(), task_buffer=task_body - ) - ) + task_bytes = ez_pack_task(double, 3) # HTEX doesn't give you a future back - engine.submit_raw(task_message) + engine.submit_raw(task_bytes) # Confirm that the same result got back though the queue for _i in range(3): @@ -68,30 +61,22 @@ def test_htex_submit_raw(htex): continue # At this point the message should be the result - assert result.task_id == task_id + assert result.task_id == task_uuid - final_result = serializer.deserialize(result.data) + final_result = serde.deserialize(result.data) assert final_result == 6, f"Expected 6, but got: {final_result}" break @pytest.mark.skip("Skip until HTEX has been fixed up") -def test_htex_submit_raw_exception(htex): +def test_htex_submit_raw_exception(htex, task_uuid, ez_pack_task): """Testing the HighThroughputExecutor/Engine with a remote side exception""" engine = htex q = engine.results_passthrough - task_id = uuid.uuid1() - serializer = ComputeSerializer() - task_body = ez_pack_function(serializer, div_zero, (3,), {}) - task_message = messagepack.pack( - messagepack.message_types.Task( - task_id=task_id, container_id=uuid.uuid1(), task_buffer=task_body - ) - ) - + task_bytes = ez_pack_task(div_zero, 3) # HTEX doesn't give you a future back - engine.submit_raw(task_message) + engine.submit_raw(task_bytes) # Confirm that the same result got back though the queue for _i in range(3): @@ -106,28 +91,21 @@ def test_htex_submit_raw_exception(htex): continue # At this point the message should be the result - assert result.task_id == task_id + assert result.task_id == task_uuid assert result.error_details break @pytest.mark.skip("Skip until HTEX has been fixed up") -def test_htex_manager_lost(htex): +def test_htex_manager_lost(htex, task_uuid, ez_pack_task): """Testing the HighThroughputExecutor/Engine""" engine = htex q = engine.results_passthrough - task_id = uuid.uuid1() - serializer = ComputeSerializer() - task_body = ez_pack_function(serializer, kill_manager, (), {}) - task_message = messagepack.pack( - messagepack.message_types.Task( - task_id=task_id, container_id=uuid.uuid1(), task_buffer=task_body - ) - ) + task_bytes = ez_pack_task(kill_manager) # HTEX doesn't give you a future back - engine.submit_raw(task_message) + engine.submit_raw(task_bytes) # Confirm that the same result got back though the queue for _i in range(10): @@ -143,7 +121,7 @@ def test_htex_manager_lost(htex): continue # At this point the message should be the result - assert result.task_id == task_id + assert result.task_id == task_uuid assert result.error_details.code == "RemoteExecutionError" assert "ManagerLost" in result.data @@ -151,15 +129,14 @@ def test_htex_manager_lost(htex): def test_engine_submit_container_location( - mocker: MockFixture, htex: HighThroughputEngine + mocker: MockFixture, htex: HighThroughputEngine, serde: ComputeSerializer ): engine = htex task_id = uuid.uuid4() container_id = uuid.uuid4() container_type = "singularity" container_loc = "/path/to/container" - serializer = ComputeSerializer() - task_body = ez_pack_function(serializer, double, (10,), {}) + task_body = ez_pack_function(serde, double, (10,), {}) task_message = messagepack.pack( messagepack.message_types.Task( task_id=task_id, diff --git a/compute_endpoint/tests/unit/test_worker.py b/compute_endpoint/tests/unit/test_worker.py index eb01288b0..74a0e950e 100644 --- a/compute_endpoint/tests/unit/test_worker.py +++ b/compute_endpoint/tests/unit/test_worker.py @@ -25,13 +25,14 @@ def large_result(size): return bytearray(size) -def ez_pack_function(serializer, func, args, kwargs): - serialized_func = serializer.serialize(func) - serialized_args = serializer.serialize(args) - serialized_kwargs = serializer.serialize(kwargs) - return serializer.pack_buffers( - [serialized_func, serialized_args, serialized_kwargs] - ) +def sleeper(t: float): + import time + + now = start = time.monotonic() + while now - start < t: + time.sleep(0.0001) + now = time.monotonic() + return True @pytest.fixture(autouse=True) @@ -123,55 +124,36 @@ def test_execute_failing_function(test_worker): ) -def test_execute_function_exceeding_result_size_limit(test_worker): +def test_execute_function_exceeding_result_size_limit( + test_worker, endpoint_uuid, task_uuid, ez_pack_task +): return_size = 10 - task_id = uuid.uuid1() - ep_id = uuid.uuid1() - - payload = ez_pack_function(test_worker.serializer, large_result, (return_size,), {}) - task_body = messagepack.pack( - messagepack.message_types.Task(task_id=task_id, task_buffer=payload) - ) + task_bytes = ez_pack_task(large_result, return_size) with mock.patch("globus_compute_endpoint.engines.helper.log") as mock_log: s_result = execute_task( - task_id, task_body, ep_id, result_size_limit=return_size - 2 + task_uuid, task_bytes, endpoint_uuid, result_size_limit=return_size - 2 ) result = messagepack.unpack(s_result) assert isinstance(result, messagepack.message_types.Result) assert result.error_details - assert result.task_id == task_id + assert result.task_id == task_uuid assert result.error_details assert result.error_details.code == "MaxResultSizeExceeded" assert mock_log.exception.called -def sleeper(t): - import time - - now = start = time.monotonic() - while now - start < t: - time.sleep(0.0001) - now = time.monotonic() - return True - - -def test_app_timeout(test_worker): - task_id = uuid.uuid1() - ep_id = uuid.uuid1() - task_body = ez_pack_function(test_worker.serializer, sleeper, (1,), {}) - task_body = messagepack.pack( - messagepack.message_types.Task(task_id=task_id, task_buffer=task_body) - ) +def test_app_timeout(test_worker, endpoint_uuid, task_uuid, ez_pack_task): + task_bytes = ez_pack_task(sleeper, 1) with mock.patch("globus_compute_endpoint.engines.helper.log") as mock_log: with mock.patch.dict(os.environ, {"GC_TASK_TIMEOUT": "0.01"}): - packed_result = execute_task(task_id, task_body, ep_id) + packed_result = execute_task(task_uuid, task_bytes, endpoint_uuid) result = messagepack.unpack(packed_result) assert isinstance(result, messagepack.message_types.Result) - assert result.task_id == task_id + assert result.task_id == task_uuid assert "AppTimeout" in result.data assert mock_log.exception.called diff --git a/compute_endpoint/tests/unit/test_working_dir.py b/compute_endpoint/tests/unit/test_working_dir.py index 471dbaab1..b67f8a8da 100644 --- a/compute_endpoint/tests/unit/test_working_dir.py +++ b/compute_endpoint/tests/unit/test_working_dir.py @@ -6,9 +6,7 @@ from globus_compute_common import messagepack from globus_compute_endpoint.engines.globus_compute import GlobusComputeEngine from globus_compute_endpoint.engines.helper import execute_task -from globus_compute_sdk.serialize import ComputeSerializer from parsl.executors import HighThroughputExecutor -from tests.utils import ez_pack_function @pytest.fixture() @@ -59,7 +57,7 @@ def test_absolute_working_dir(tmp_path): assert gce.working_dir == "/absolute/path" -def test_submit_pass(tmp_path): +def test_submit_pass(tmp_path, task_uuid): """Test absolute path for working_dir""" gce = GlobusComputeEngine( address="127.0.0.1", @@ -69,7 +67,7 @@ def test_submit_pass(tmp_path): gce.start(endpoint_id=uuid.uuid4(), run_dir=tmp_path) gce.submit( - task_id=uuid.uuid4(), + task_id=task_uuid, packed_task=b"PACKED_TASK", resource_specification={}, ) @@ -84,92 +82,65 @@ def test_submit_pass(tmp_path): assert flag, "Call args parsing failed, did not find run_dir" -def test_execute_task_working_dir(tmp_path, reset_cwd): - - task_id = uuid.uuid4() - serializer = ComputeSerializer() - task_body = ez_pack_function(serializer, get_cwd, (), {}) - task_message = messagepack.pack( - messagepack.message_types.Task( - task_id=task_id, container_id=uuid.uuid1(), task_buffer=task_body - ) - ) - - assert os.getcwd() != tmp_path.name - - packed_result = execute_task( - task_id, - task_message, - uuid.uuid4(), - run_dir=tmp_path, - ) +def test_execute_task_working_dir( + tmp_path, reset_cwd, serde, endpoint_uuid, task_uuid, ez_pack_task +): + assert os.getcwd() != str(tmp_path) + task_bytes = ez_pack_task(get_cwd) + packed_result = execute_task(task_uuid, task_bytes, endpoint_uuid, run_dir=tmp_path) message = messagepack.unpack(packed_result) - assert message.task_id == task_id + assert message.task_id == task_uuid assert message.data - result = serializer.deserialize(message.data) - assert result == tmp_path.__fspath__() - assert os.getcwd() == tmp_path.__fspath__() + + result = serde.deserialize(message.data) + assert result == os.fspath(tmp_path) + assert os.getcwd() == os.fspath(tmp_path) -def test_non_existent_relative_working_dir(tmp_path, reset_cwd): +def test_non_existent_relative_working_dir( + tmp_path, reset_cwd, serde, endpoint_uuid, task_uuid, ez_pack_task +): """This tests for execute_task creating a non-existent working dir when a relative path is specified to the CWD""" - task_id = uuid.uuid4() os.chdir(tmp_path) target_dir = f"{uuid.uuid4()}" - abs_target_dir = os.path.abspath(target_dir) - serializer = ComputeSerializer() - task_body = ez_pack_function(serializer, get_cwd, (), {}) - task_message = messagepack.pack( - messagepack.message_types.Task( - task_id=task_id, container_id=uuid.uuid1(), task_buffer=task_body - ) - ) - assert os.getcwd() != target_dir + abs_target_dir = os.path.abspath(target_dir) + + task_bytes = ez_pack_task(get_cwd) packed_result = execute_task( - task_id, - task_message, - uuid.uuid4(), + task_uuid, + task_bytes, + endpoint_uuid, run_dir=target_dir, ) message = messagepack.unpack(packed_result) - assert message.task_id == task_id + assert message.task_id == task_uuid assert message.data - result = serializer.deserialize(message.data) + result = serde.deserialize(message.data) assert result == abs_target_dir -def test_sandbox(tmp_path, reset_cwd): - - task_id = uuid.uuid4() +def test_sandbox(tmp_path, reset_cwd, serde, endpoint_uuid, task_uuid, ez_pack_task): os.chdir(tmp_path) - serializer = ComputeSerializer() - task_body = ez_pack_function(serializer, get_cwd, (), {}) - task_message = messagepack.pack( - messagepack.message_types.Task( - task_id=task_id, container_id=uuid.uuid1(), task_buffer=task_body - ) - ) - - assert os.getcwd() != tmp_path + task_bytes = ez_pack_task(get_cwd) packed_result = execute_task( - task_id, - task_message, - uuid.uuid4(), + task_uuid, + task_bytes, + endpoint_uuid, run_dir=tmp_path, run_in_sandbox=True, ) message = messagepack.unpack(packed_result) - assert message.task_id == task_id + assert message.task_id == task_uuid assert message.data - result = serializer.deserialize(message.data) + result = serde.deserialize(message.data) - assert result == os.path.join(tmp_path, str(task_id)) + assert result == os.path.join(tmp_path, str(task_uuid))