From b612dd2d709f3dd7a56bb5f918c8ab9d43dc36ed Mon Sep 17 00:00:00 2001 From: Zach Mullen Date: Wed, 14 Nov 2018 13:57:47 -0500 Subject: [PATCH] WIP proposed chained docker pattern --- girder_worker/docker/tasks/__init__.py | 15 +++++++++++++++ girder_worker/docker/transforms/__init__.py | 15 +++++++++++++++ girder_worker/docker/transforms/girder.py | 8 +++++++- .../integration_test_endpoints/server/docker.py | 8 ++++---- 4 files changed, 41 insertions(+), 5 deletions(-) diff --git a/girder_worker/docker/tasks/__init__.py b/girder_worker/docker/tasks/__init__.py index 4594d29c..da20f384 100644 --- a/girder_worker/docker/tasks/__init__.py +++ b/girder_worker/docker/tasks/__init__.py @@ -21,6 +21,7 @@ StdStreamWriter ) from girder_worker.docker.transforms import ( + ChainedResultTransform, ContainerStdErr, ContainerStdOut, _TemporaryVolumeBase, @@ -376,3 +377,17 @@ def docker_run(task, image, pull_image=True, entrypoint=None, container_args=Non return _docker_run( task, image, pull_image, entrypoint, container_args, volumes, remove_container, **kwargs) + + +@app.task(base=DockerTask, bind=True) +def docker_run_chained(task, results, container_args=None, *args, **kwargs): + container_args = container_args or [] + kwargs['container_args'] = [_maybe_transform_chain_result(a, results) for a in container_args] + + return _docker_run(task, *args, **kwargs) + + +def _maybe_transform_chain_result(arg, results): + if isinstance(arg, ChainedResultTransform): + return arg.transform(results) + return arg diff --git a/girder_worker/docker/transforms/__init__.py b/girder_worker/docker/transforms/__init__.py index 3c4c3480..ec27c1a6 100644 --- a/girder_worker/docker/transforms/__init__.py +++ b/girder_worker/docker/transforms/__init__.py @@ -386,3 +386,18 @@ def transform(self, **kwargs): ) return ChunkedTransferEncodingStreamWriter(self.url, self.headers) + + +class ChainedResultTransform(Transform): + """ + This is the base class for a set of transforms that chain one of the result + hook values from one docker task into the next docker task. This version of the + class passes the result hook value as-is rather than transforming it. + + :param index: The index into the previous task's results list. + """ + def __init__(self, index): + self._index = index + + def transform(self, results): + return results[self._index] diff --git a/girder_worker/docker/transforms/girder.py b/girder_worker/docker/transforms/girder.py index bc509849..734c5096 100644 --- a/girder_worker/docker/transforms/girder.py +++ b/girder_worker/docker/transforms/girder.py @@ -8,7 +8,7 @@ GirderUploadToFolder, GirderUploadToItem ) -from . import TemporaryVolume, Connect, NamedOutputPipe, _maybe_transform +from . import TemporaryVolume, Connect, NamedOutputPipe, _maybe_transform, ChainedResultTransform class ProgressPipe(Transform): @@ -298,3 +298,9 @@ def transform(self, *args, **kwargs): def exception(self): if self._upload_on_exception: return self.transform() + + +class ChainedResultItem(ChainedResultTransform): + def transform(self, results): + item_id = super(ChainedResultItem, self).transform(results) + return GirderItemIdToVolume(item_id) diff --git a/tests/integration/integration_test_endpoints/server/docker.py b/tests/integration/integration_test_endpoints/server/docker.py index 04244a88..bc48792d 100644 --- a/tests/integration/integration_test_endpoints/server/docker.py +++ b/tests/integration/integration_test_endpoints/server/docker.py @@ -17,7 +17,7 @@ from girder.models.token import Token from girder.constants import AccessType -from girder_worker.docker.tasks import docker_run +from girder_worker.docker.tasks import docker_run, docker_run_chained from girder_worker.docker.transforms import ( HostStdOut, NamedOutputPipe, @@ -29,6 +29,7 @@ TemporaryVolume ) from girder_worker.docker.transforms.girder import ( + ChainedResultItem, GirderFileIdToStream, GirderUploadVolumePathToItem, ProgressPipe, @@ -113,11 +114,10 @@ def test_docker_run_multistep_workflow(self, file1, file2, outItem, finalOutItem GirderUploadVolumePathToItem(outpath, outItem['_id']) ] ) - # Create some partial and chain into it? - step2 = docker_run.s( + step2 = docker_run_chained.s( TEST_IMAGE, pull_image=True, container_args=[ 'reverse', - step1.results[0], # TODO this doesn't work + ChainedResultItem(0), finaloutpath ], girder_result_hooks=[ GirderUploadVolumePathToItem(finaloutpath, finalOutItem['_id'])