Skip to content

Commit

Permalink
replace docker python interface and initial addition of PBS container
Browse files Browse the repository at this point in the history
  • Loading branch information
gpetretto committed Dec 30, 2024
1 parent 41bf0b2 commit c68d65e
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 124 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies = [

[project.optional-dependencies]
dev = ["pre-commit>=3.0.0"]
tests = ["docker ~= 7.0", "pytest ~= 8.0", "pytest-cov >= 4,< 6", "pytest-durations ~= 1.3", "pytest-mock ~= 3.14"]
tests = ["docker ~= 7.0", "pytest ~= 8.0", "pytest-cov >= 4,< 6", "pytest-durations ~= 1.3", "pytest-mock ~= 3.14", "python-on-whales"]
docs = [
"autodoc_pydantic>=2.0.0",
"pydata-sphinx-theme",
Expand Down
261 changes: 138 additions & 123 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
from functools import partialmethod
from pathlib import Path

import docker
import fabric
import pytest
from docker.models.containers import Container
from python_on_whales import DockerClient
from python_on_whales import docker as docker_pow


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -62,6 +62,12 @@ def sge_ssh_port():
return _get_free_port()


@pytest.fixture(scope="session")
def pbs_ssh_port():
"""The exposed local port for SSH connections to the queue container."""
return _get_free_port()


@pytest.fixture(scope="session")
def db_port():
"""The exposed local port for connections to the MongoDB stores."""
Expand All @@ -70,139 +76,134 @@ def db_port():

@pytest.fixture(scope="session")
def docker_client():
return docker.from_env()

return docker_pow

def build_and_launch_container(
docker_client: docker.client.DockerClient,
dockerfile: Path | None = None,
image_name: str | None = None,
ports: dict[str, int] | None = None,
buildargs: dict[str, str] | None = None,
):
"""Builds and/or launches a container, returning the container object.

Parameters
----------
docker_client: The local docker client.
dockerfile: An optional location of a dockerfile to build.
image_name: Either the tag to attach to the built image, or an image
name to pull from the web (may require authenticated docker client).
ports: A port specification to use for the launched container.
# @pytest.fixture(scope="session", autouse=True)
def bake_containers():
hcl_path = Path(__file__).parent.resolve() / "dockerfiles/docker-bake.hcl"
docker_pow.buildx.bake(
targets=["slurm", "sge"],
# targets=["slurm", "sge", "pbs"],
files=hcl_path,
set={"*.context": str(Path(__file__).parent.parent.parent.resolve())},
)

Yields
------
The launched container object, then stops the container after use.

"""
if dockerfile is not None:
print(f" * Building {image_name}")
_, logs = docker_client.images.build(
path=str(Path(__file__).parent.parent.parent.resolve()),
dockerfile=dockerfile,
buildargs=buildargs,
tag=image_name,
rm=True,
quiet=False,
)
@pytest.fixture(scope="session", autouse=True)
def compose_containers(slurm_ssh_port, sge_ssh_port, pbs_ssh_port, db_port):
compose_yaml = f"""
name: jobflow_remote_testing
services:
mongo:
image: mongo:7
ports:
- "{db_port}:27017"
restart: always
container_name: mongo_container
healthcheck:
test: ["CMD", "mongosh", "--eval", "db.runCommand('ping').ok"]
interval: 1s
timeout: 1s
retries: 28
start_period: 2s
jobflow_remote_testing_slurm:
image: jobflow-remote-testing-slurm:latest
container_name: jobflow_testing_slurm
ports:
- "{slurm_ssh_port}:22"
stdin_open: true
tty: true
healthcheck:
test: ["CMD", "bash", "-c", "</dev/tcp/localhost/22"]
interval: 1s
timeout: 1s
retries: 30
start_period: 2s
jobflow_remote_testing_sge:
image: jobflow-remote-testing-sge:latest
container_name: jobflow_testing_sge
ports:
- "{sge_ssh_port}:22"
stdin_open: true
tty: true
healthcheck:
test: ["CMD", "bash", "-c", "</dev/tcp/localhost/22"]
interval: 1s
timeout: 1s
retries: 30
start_period: 2s
"""
with tempfile.NamedTemporaryFile("wt", suffix="compose.yaml", delete=False) as f:
f.write(compose_yaml)
f.flush()

docker_client = DockerClient(compose_files=[f.name])
try:
print("\n * Launching compose...")

for step in logs:
if step.get("stream"):
print(step["stream"], end="")

try:
print(f"\n * Launching container for {image_name}...")
container = docker_client.containers.run(
image_name,
detach=True,
remove=False,
auto_remove=False,
tty=True,
ports=ports,
)
assert isinstance(container, Container)
print(" * Waiting for container to be ready...", end="")
max_retries = 30
while (retries := 0) < max_retries:
if container.status == "running":
print(f"\n{container.logs().decode()}\n")
print(f"\n * Container {container.id} launched.")
break
if container.status == "exited":
logs = f"\n{container.logs().decode()}\n"
compose_version = docker_client.compose.up(
detach=True,
)
print(" * Waiting for container to be ready...", end="")
max_retries = 30
retries = 0
while retries < max_retries:
containers = docker_client.compose.ps()

if all(
c.state.health and c.state.health.status == "healthy"
for c in containers
):
print(f"\n{docker_client.compose.logs()}\n")
print(f"\n * {compose_version} launched.")
break

exited = [c.id for c in containers if c.state.status == "exited"]
if any(exited):
logs = f"\n{docker_client.compose.logs()}\n"
pytest.fail(
f"Containers {', '.join(exited)} exited before being ready.\nFull logs: {logs}"
)

print(".", end="")
time.sleep(1)
retries += 1
else:
not_started = [
c.id
for c in containers
if not c.state.health or c.state.health.status != "healthy"
]
logs = f"\n{docker_client.compose.logs()}\n"
pytest.fail(
f"Container {container.name!r} ({container.image}) exited before being ready.\nFull logs: {logs}"
f"Containers {', '.join(not_started)} did not start in time. Full logs: {logs}"
)
print(".", end="")
time.sleep(1)
retries += 1
container.reload()
else:
logs = f"\n{container.logs().decode()}\n"
pytest.fail(
f"Container {container.name!r} ({container.image}) did not start in time. Full\nlogs: {logs}"
)

yield container
finally:
try:
print(f"\n * Stopping container {container.id}...")
try:
container.stop()
except (docker.errors.APIError, docker.errors.NotFound):
pass
try:
container.kill()
except (docker.errors.APIError, docker.errors.NotFound):
pass
yield docker_client
finally:
try:
container.remove()
except (docker.errors.APIError, docker.errors.NotFound):
pass
print(" * Done!")
except Exception as exc:
print(f" x Failed to stop container: {exc}")
print("\n * Stopping containers...")
try:
docker_client.compose.stop()
except Exception:
pass

try:
docker_client.compose.kill()
except Exception:
pass

@pytest.fixture(scope="session", autouse=True)
def slurm_container(docker_client, slurm_ssh_port):
"""Build and launch a container running Slurm and SSH, exposed on a random available
port."""
ports = {"22/tcp": slurm_ssh_port}
yield from build_and_launch_container(
docker_client,
Path("./tests/integration/dockerfiles/Dockerfile"),
"jobflow-remote-slurm:latest",
ports=ports,
buildargs={"QUEUE_SYSTEM": "slurm"},
)

try:
docker_client.compose.rm(volumes=True)
except Exception:
pass

@pytest.fixture(scope="session", autouse=True)
def sge_container(docker_client, sge_ssh_port):
"""Build and launch a container running SGE and SSH, exposed on a random available
port."""
ports = {"22/tcp": sge_ssh_port}
yield from build_and_launch_container(
docker_client,
Path("./tests/integration/dockerfiles/Dockerfile"),
"jobflow-remote-sge:latest",
ports=ports,
buildargs={"QUEUE_SYSTEM": "sge"},
)


@pytest.fixture(scope="session", autouse=True)
def mongo_container(docker_client, db_port):
"""Build and launch a container running MongoDB, exposed on a random available
port."""
ports = {"27017/tcp": db_port}
yield from build_and_launch_container(
docker_client,
dockerfile=None,
image_name="mongo:7",
ports=ports,
)
print(" * Done!")
except Exception as exc:
print(f" x Failed to stop container: {exc}")


@pytest.fixture(scope="session")
Expand All @@ -216,6 +217,7 @@ def write_tmp_settings(
store_database_name,
slurm_ssh_port,
sge_ssh_port,
pbs_ssh_port,
db_port,
):
"""Collects the various sub-configs and writes them to a temporary file in a
Expand Down Expand Up @@ -305,6 +307,19 @@ def write_tmp_settings(
pre_run="source /home/jobflow/.venv/bin/activate",
connect_kwargs={"allow_agent": False, "look_for_keys": False},
),
# "test_remote_pbs_worker": dict(
# type="remote",
# host="localhost",
# port=pbs_ssh_port,
# scheduler_type="pbs",
# work_dir="/home/jobflow/jfr",
# user="jobflow",
# password="jobflow",
# scheduler_username="jobflow",
# pre_run="source /home/jobflow/.venv/bin/activate",
# connect_kwargs={"allow_agent": False, "look_for_keys": False},
# resources={"walltime": "00:05:00", "select": "select=1:1"},
# ),
"test_remote_limited_worker": dict(
type="remote",
host="localhost",
Expand Down
Loading

0 comments on commit c68d65e

Please sign in to comment.