Skip to content
This repository has been archived by the owner on Nov 7, 2024. It is now read-only.

Commit

Permalink
WIP proposed chained docker pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
zachmullen committed Nov 14, 2018
1 parent e245076 commit b612dd2
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 5 deletions.
15 changes: 15 additions & 0 deletions girder_worker/docker/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
StdStreamWriter
)
from girder_worker.docker.transforms import (
ChainedResultTransform,
ContainerStdErr,
ContainerStdOut,
_TemporaryVolumeBase,
Expand Down Expand Up @@ -376,3 +377,17 @@ def docker_run(task, image, pull_image=True, entrypoint=None, container_args=Non
return _docker_run(
task, image, pull_image, entrypoint, container_args, volumes,
remove_container, **kwargs)


@app.task(base=DockerTask, bind=True)
def docker_run_chained(task, results, container_args=None, *args, **kwargs):
container_args = container_args or []
kwargs['container_args'] = [_maybe_transform_chain_result(a, results) for a in container_args]

return _docker_run(task, *args, **kwargs)


def _maybe_transform_chain_result(arg, results):
if isinstance(arg, ChainedResultTransform):
return arg.transform(results)
return arg
15 changes: 15 additions & 0 deletions girder_worker/docker/transforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,18 @@ def transform(self, **kwargs):
)

return ChunkedTransferEncodingStreamWriter(self.url, self.headers)


class ChainedResultTransform(Transform):
"""
This is the base class for a set of transforms that chain one of the result
hook values from one docker task into the next docker task. This version of the
class passes the result hook value as-is rather than transforming it.
:param index: The index into the previous task's results list.
"""
def __init__(self, index):
self._index = index

def transform(self, results):
return results[self._index]
8 changes: 7 additions & 1 deletion girder_worker/docker/transforms/girder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
GirderUploadToFolder,
GirderUploadToItem
)
from . import TemporaryVolume, Connect, NamedOutputPipe, _maybe_transform
from . import TemporaryVolume, Connect, NamedOutputPipe, _maybe_transform, ChainedResultTransform


class ProgressPipe(Transform):
Expand Down Expand Up @@ -298,3 +298,9 @@ def transform(self, *args, **kwargs):
def exception(self):
if self._upload_on_exception:
return self.transform()


class ChainedResultItem(ChainedResultTransform):
def transform(self, results):
item_id = super(ChainedResultItem, self).transform(results)
return GirderItemIdToVolume(item_id)
8 changes: 4 additions & 4 deletions tests/integration/integration_test_endpoints/server/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from girder.models.token import Token
from girder.constants import AccessType

from girder_worker.docker.tasks import docker_run
from girder_worker.docker.tasks import docker_run, docker_run_chained
from girder_worker.docker.transforms import (
HostStdOut,
NamedOutputPipe,
Expand All @@ -29,6 +29,7 @@
TemporaryVolume
)
from girder_worker.docker.transforms.girder import (
ChainedResultItem,
GirderFileIdToStream,
GirderUploadVolumePathToItem,
ProgressPipe,
Expand Down Expand Up @@ -113,11 +114,10 @@ def test_docker_run_multistep_workflow(self, file1, file2, outItem, finalOutItem
GirderUploadVolumePathToItem(outpath, outItem['_id'])
]
)
# Create some partial and chain into it?
step2 = docker_run.s(
step2 = docker_run_chained.s(
TEST_IMAGE, pull_image=True, container_args=[
'reverse',
step1.results[0], # TODO this doesn't work
ChainedResultItem(0),
finaloutpath
], girder_result_hooks=[
GirderUploadVolumePathToItem(finaloutpath, finalOutItem['_id'])
Expand Down

0 comments on commit b612dd2

Please sign in to comment.