From 97897b2934c6830858e0dbdff164ff779ab75e30 Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Fri, 25 Aug 2017 09:46:51 -0400 Subject: [PATCH 1/6] Add basic girder results backend --- .../plugins/girder_io/backend/__init__.py | 146 ++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 girder_worker/plugins/girder_io/backend/__init__.py diff --git a/girder_worker/plugins/girder_io/backend/__init__.py b/girder_worker/plugins/girder_io/backend/__init__.py new file mode 100644 index 00000000..87f2c001 --- /dev/null +++ b/girder_worker/plugins/girder_io/backend/__init__.py @@ -0,0 +1,146 @@ +from celery.backends.base import BaseBackend +from celery.exceptions import ImproperlyConfigured +from celery import states +from girder_client import GirderClient, HttpError +try: + from urllib.parse import parse_qsl, urlparse +except ImportError: + from urlparse import urlparse, parse_qsl +import json +from datetime import datetime +import pytz + + +class GirderBackend(BaseBackend): + """The Girder result backend.""" + + def __init__(self, app, url=None, + api_key=None, + token=None, + parent_id=None, + parent_type=None, **kwargs): + super(GirderBackend, self).__init__(app, url=url, **kwargs) + + + if self.url is None: + raise ImproperlyConfigured( + 'url for Girder server must be provided.') + + self._token = token + self._api_key = api_key + self._parent_id = parent_id + self._parent_type = parent_type + self._gclient = None + + parts = urlparse(url) + query_params = dict(parse_qsl(parts.query)) + print(query_params) + # Give precedence to parameter passed in + if self._token is None: + self._token = query_params.get('token') + + if self._api_key is None: + self._api_key = query_params.get('apiKey') + + if self._parent_id is None: + self._parent_id = query_params.get('parentId') + + if self._parent_type is None: + self._parent_type = query_params.get('parentType') + + self.url = '%s://%s:%s/api/v1' % (parts.scheme, parts.hostname, parts.port) + + if self._api_key is None and self._token is None: + raise ImproperlyConfigured( + 'An API key or token for Girder must be provided.') + + if self._parent_id is None or self._parent_type is None: + raise ImproperlyConfigured( + 'A parent resource must be provided.') + + @property + def _client(self): + if self._gclient is None: + self._gclient = GirderClient(apiUrl=self.url) + if self._api_key: + self._gclient.authenticate(apiKey=self._api_key) + else: + self._gclient.token = self._token + + return self._gclient + + def _store_result(self, task_id, result, state, + traceback=None, request=None, **kwargs): + + result_meta = { + 'status': state, + 'result': self.encode(result), + 'timestamp': datetime.utcnow().replace(tzinfo=pytz.UTC).isoformat(), + 'traceback': self.encode(traceback), + 'children': self.encode( + self.current_task_children(request), + ) + } + + if self._parent_type == 'folder': + # Change to one call then PR is merged into master + item = self._client.createItem(self._parent_id, task_id, reuseExisting=False) + self._client.addMetadataToItem(item['_id'], result_meta) + + return result + + def _forget(self, task_id): + items = list(self._client.listItem(self._parent_id, name=task_id, limit=1)) + if len(items) == 1: + item = items[0] + self._client.delete('item/%s' % item['_id']) + + + def _save_group(self, group_id, result): + print('save group') + group_meta = { + 'result': self.encode([i.id for i in result]), + 'timestamp': datetime.utcnow().replace(tzinfo=pytz.UTC).isoformat() + } + + if self._parent_type == 'folder': + # Change to one call then PR is merged into master + item = self._client.createItem(self._parent_id, group_id, reuseExisting=False) + self._client.addMetadataToItem(item['_id'], group_meta) + + return result + + def _delete_group(self, group_id): + self._forget(group_id) + + def _get_task_meta_for(self, task_id): + items = list(self._client.listItem(self._parent_id, name=task_id, limit=1)) + if len(items) == 1: + item = items[0] + meta = item['meta'] + + return self.meta_from_decoded({ + 'task_id': item['name'], + 'status': meta['status'], + 'result': self.decode(meta['result']), + 'timestamp': meta['timestamp'], + 'traceback': self.decode(meta['traceback']), + 'children': self.decode(meta['children']), + }) + + return {'status': states.PENDING, 'result': None} + + def _restore_group(self, group_id): + items = list(self._client.listItem(self._parent_id, name=group_id, limit=1)) + if len(items) == 1: + item = items[0] + meta = item['meta'] + + return { + 'task_id': item['name'], + 'timestamp': meta['timestamp'], + 'result': [ + self.app.AsyncResult(task) + for task in self.decode(meta['result']) + ], + } From 4b547863d0121712595547c11400b9940a53e373 Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Fri, 25 Aug 2017 13:21:37 -0400 Subject: [PATCH 2/6] Add girder to backend aliases There may be a "better" way to do this --- girder_worker/plugins/girder_io/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/girder_worker/plugins/girder_io/__init__.py b/girder_worker/plugins/girder_io/__init__.py index b902cef4..595d2652 100644 --- a/girder_worker/plugins/girder_io/__init__.py +++ b/girder_worker/plugins/girder_io/__init__.py @@ -3,6 +3,8 @@ import os from girder_worker import config from six import StringIO +from celery.app import backends + # Make a sensible limit for metadata outputs MAX_METADATA_LENGTH = 4 * 1024 * 1024 # 4MB @@ -191,3 +193,4 @@ def load(params): from girder_worker.core import io io.register_fetch_handler('girder', fetch_handler) io.register_push_handler('girder', push_handler) + backends.BACKEND_ALIASES['girder'] = 'girder_worker.plugins.girder_io.backend:GirderBackend' From fd0e857ae52f2cd78af256c2aaf557f68b7e8bdd Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Fri, 25 Aug 2017 13:23:14 -0400 Subject: [PATCH 3/6] Refactor to take in account running within Girder --- .../plugins/girder_io/backend/__init__.py | 192 ++++++++++++++---- 1 file changed, 158 insertions(+), 34 deletions(-) diff --git a/girder_worker/plugins/girder_io/backend/__init__.py b/girder_worker/plugins/girder_io/backend/__init__.py index 87f2c001..5c2b3604 100644 --- a/girder_worker/plugins/girder_io/backend/__init__.py +++ b/girder_worker/plugins/girder_io/backend/__init__.py @@ -21,10 +21,12 @@ def __init__(self, app, url=None, parent_type=None, **kwargs): super(GirderBackend, self).__init__(app, url=url, **kwargs) - - if self.url is None: - raise ImproperlyConfigured( - 'url for Girder server must be provided.') + self._girder = None + if not self._in_girder: + # If we are outside Girder we need a url + if self.url is None: + raise ImproperlyConfigured( + 'url for Girder server must be provided.') self._token = token self._api_key = api_key @@ -34,7 +36,7 @@ def __init__(self, app, url=None, parts = urlparse(url) query_params = dict(parse_qsl(parts.query)) - print(query_params) + # Give precedence to parameter passed in if self._token is None: self._token = query_params.get('token') @@ -58,6 +60,18 @@ def __init__(self, app, url=None, raise ImproperlyConfigured( 'A parent resource must be provided.') + @property + def _in_girder(self): + if self._girder is None: + self._girder = False + try: + from girder.utility.model_importer import ModelImporter + self._girder = True + except ImportError: + pass + + return self._girder + @property def _client(self): if self._gclient is None: @@ -72,6 +86,13 @@ def _client(self): def _store_result(self, task_id, result, state, traceback=None, request=None, **kwargs): + client = self._client + + # If the request has an url and token use these + if hasattr(request, 'girder_api_url') and hasattr(request, 'girder_client_token'): + client = GirderClient(apiUrl=request.girder_api_url) + client.token = request.girder_client_token + result_meta = { 'status': state, 'result': self.encode(result), @@ -89,58 +110,161 @@ def _store_result(self, task_id, result, state, return result - def _forget(self, task_id): + def _forget_girder(self, task_id): + from girder.utility.model_importer import ModelImporter + from girder.api.rest import getCurrentUser + from girder.constants import AccessType + folder = ModelImporter.model('folder').load( + id=self._parent_id, user=getCurrentUser(), level=AccessType.ADMIN) + + filters = { + 'name': task_id + } + items = list(ModelImporter.model('folder').childItems( + folder=folder, limit=1, filters=filters)) + + if len(items) == 1: + item = items[0] + ModelImporter.model('item').remove(item) + + def _forget_girder_client(self, task_id): items = list(self._client.listItem(self._parent_id, name=task_id, limit=1)) if len(items) == 1: item = items[0] self._client.delete('item/%s' % item['_id']) + def _forget(self, task_id): + if self._in_girder: + self._forget_girder(task_id) + else: + self._forget_girder_client(task_id) - def _save_group(self, group_id, result): - print('save group') - group_meta = { + def _group_meta(self, result): + return { 'result': self.encode([i.id for i in result]), 'timestamp': datetime.utcnow().replace(tzinfo=pytz.UTC).isoformat() } - if self._parent_type == 'folder': - # Change to one call then PR is merged into master - item = self._client.createItem(self._parent_id, group_id, reuseExisting=False) - self._client.addMetadataToItem(item['_id'], group_meta) + def _save_group_girder(self, group_id, result): + from girder.utility.model_importer import ModelImporter + from girder.api.rest import getCurrentUser + from girder.constants import AccessType + folder = ModelImporter.model('folder').load( + id=self._parent_id, user=getCurrentUser(), level=AccessType.WRITE) + + item_model = ModelImporter.model('item') + item = item_model.createItem(group_id, getCurrentUser(), + folder) + item_model.setMetadata(item, self._group_meta(result)) + + return result + + def _save_group_girder_client(self, group_id, result): + group_meta = self._group_meta(result) + # Change to one call then PR is merged into master + item = self._client.createItem(self._parent_id, group_id, reuseExisting=False) + self._client.addMetadataToItem(item['_id'], group_meta) return result + def _save_group(self, group_id, result): + if self._in_girder: + return self._save_group_girder(group_id, result) + else: + return self._save_group_girder_client(group_id, result) + def _delete_group(self, group_id): self._forget(group_id) - def _get_task_meta_for(self, task_id): + def _task_meta_from_item(self, item): + print(item) + meta = item['meta'] + + return self.meta_from_decoded({ + 'task_id': item['name'], + 'status': meta['status'], + 'result': self.decode(meta['result']), + 'timestamp': meta['timestamp'], + 'traceback': self.decode(meta['traceback']), + 'children': self.decode(meta['children']), + }) + + def _get_task_meta_for_girder(self, task_id): + from girder.utility.model_importer import ModelImporter + from girder.api.rest import getCurrentUser + from girder.constants import AccessType + folder = ModelImporter.model('folder').load( + id=self._parent_id, user=getCurrentUser(), level=AccessType.READ) + + filters = { + 'name': task_id + } + items = list(ModelImporter.model('folder').childItems( + folder=folder, limit=1, filters=filters)) + + if len(items) == 1: + item = items[0] + return self._task_meta_from_item(item) + + return {'status': states.PENDING, 'result': None} + + def _get_task_meta_for_girder_client(self, task_id): items = list(self._client.listItem(self._parent_id, name=task_id, limit=1)) if len(items) == 1: item = items[0] - meta = item['meta'] - return self.meta_from_decoded({ - 'task_id': item['name'], - 'status': meta['status'], - 'result': self.decode(meta['result']), - 'timestamp': meta['timestamp'], - 'traceback': self.decode(meta['traceback']), - 'children': self.decode(meta['children']), - }) + return self._task_meta_from_item(item) return {'status': states.PENDING, 'result': None} - def _restore_group(self, group_id): + def _get_task_meta_for(self, task_id): + if self._in_girder: + return self._get_task_meta_for_girder(task_id) + else: + return self._get_task_meta_for_girder_client(task_id) + + def _group_meta_from_item(self, item): + meta = item['meta'] + + return { + 'task_id': item['name'], + 'timestamp': meta['timestamp'], + 'result': [ + self.app.AsyncResult(task) + for task in self.decode(meta['result']) + ], + } + + def _restore_group_girder(self, group_id): + from girder.utility.model_importer import ModelImporter + from girder.api.rest import getCurrentUser + from girder.constants import AccessType + folder = ModelImporter.model('folder').load( + id=self._parent_id, user=getCurrentUser(), level=AccessType.READ) + + filters = { + 'name': group_id + } + items = list(ModelImporter.model('folder').childItems( + folder=folder, limit=1, filters=filters)) + + if len(items) == 1: + item = items[0] + return self._group_meta_from_item(item) + + return {'status': states.PENDING, 'result': None} + + def _restore_group_grider_client(self, group_id): items = list(self._client.listItem(self._parent_id, name=group_id, limit=1)) if len(items) == 1: item = items[0] - meta = item['meta'] - - return { - 'task_id': item['name'], - 'timestamp': meta['timestamp'], - 'result': [ - self.app.AsyncResult(task) - for task in self.decode(meta['result']) - ], - } + + return self._group_meta_from_item(item) + + return {'status': states.PENDING, 'result': None} + + def _restore_group(self, group_id): + if self._in_girder: + return self._restore_group_girder(group_id) + else: + return self._restore_group_girder_client(group_id) From 25ad4382af36406708ddc505ae5a8b2f9e939bfb Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Fri, 25 Aug 2017 13:37:26 -0400 Subject: [PATCH 4/6] Don't encode the result Girder can take care of this --- girder_worker/plugins/girder_io/backend/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/girder_worker/plugins/girder_io/backend/__init__.py b/girder_worker/plugins/girder_io/backend/__init__.py index 5c2b3604..9dc44cb2 100644 --- a/girder_worker/plugins/girder_io/backend/__init__.py +++ b/girder_worker/plugins/girder_io/backend/__init__.py @@ -95,7 +95,7 @@ def _store_result(self, task_id, result, state, result_meta = { 'status': state, - 'result': self.encode(result), + 'result': result, 'timestamp': datetime.utcnow().replace(tzinfo=pytz.UTC).isoformat(), 'traceback': self.encode(traceback), 'children': self.encode( @@ -141,7 +141,7 @@ def _forget(self, task_id): def _group_meta(self, result): return { - 'result': self.encode([i.id for i in result]), + 'result': [i.id for i in result], 'timestamp': datetime.utcnow().replace(tzinfo=pytz.UTC).isoformat() } @@ -183,7 +183,7 @@ def _task_meta_from_item(self, item): return self.meta_from_decoded({ 'task_id': item['name'], 'status': meta['status'], - 'result': self.decode(meta['result']), + 'result': meta['result'], 'timestamp': meta['timestamp'], 'traceback': self.decode(meta['traceback']), 'children': self.decode(meta['children']), @@ -231,7 +231,7 @@ def _group_meta_from_item(self, item): 'timestamp': meta['timestamp'], 'result': [ self.app.AsyncResult(task) - for task in self.decode(meta['result']) + for task in meta['result'] ], } From a097e6b215a6b9d12276d4db2d316e8ff37ce6f5 Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Fri, 25 Aug 2017 14:43:02 -0400 Subject: [PATCH 5/6] Relax configuration restrictions We can read some of it from the request --- .../plugins/girder_io/backend/__init__.py | 45 +++++++++---------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/girder_worker/plugins/girder_io/backend/__init__.py b/girder_worker/plugins/girder_io/backend/__init__.py index 9dc44cb2..8e3a895b 100644 --- a/girder_worker/plugins/girder_io/backend/__init__.py +++ b/girder_worker/plugins/girder_io/backend/__init__.py @@ -21,44 +21,39 @@ def __init__(self, app, url=None, parent_type=None, **kwargs): super(GirderBackend, self).__init__(app, url=url, **kwargs) - self._girder = None - if not self._in_girder: - # If we are outside Girder we need a url - if self.url is None: - raise ImproperlyConfigured( - 'url for Girder server must be provided.') - self._token = token self._api_key = api_key self._parent_id = parent_id self._parent_type = parent_type self._gclient = None + self._girder = None - parts = urlparse(url) - query_params = dict(parse_qsl(parts.query)) + if url is not None: + parts = urlparse(url) + query_params = dict(parse_qsl(parts.query)) - # Give precedence to parameter passed in - if self._token is None: - self._token = query_params.get('token') + # Give precedence to parameter passed in + if self._token is None: + self._token = query_params.get('token') - if self._api_key is None: - self._api_key = query_params.get('apiKey') + if self._api_key is None: + self._api_key = query_params.get('apiKey') - if self._parent_id is None: - self._parent_id = query_params.get('parentId') + if self._parent_id is None: + self._parent_id = query_params.get('parentId') - if self._parent_type is None: - self._parent_type = query_params.get('parentType') + if self._parent_type is None: + self._parent_type = query_params.get('parentType') - self.url = '%s://%s:%s/api/v1' % (parts.scheme, parts.hostname, parts.port) + self.url = '%s://%s:%s/api/v1' % (parts.scheme, parts.hostname, parts.port) - if self._api_key is None and self._token is None: - raise ImproperlyConfigured( - 'An API key or token for Girder must be provided.') + #if self._api_key is None and self._token is None: + # raise ImproperlyConfigured( + # 'An API key or token for Girder must be provided.') - if self._parent_id is None or self._parent_type is None: - raise ImproperlyConfigured( - 'A parent resource must be provided.') + #if self._parent_id is None or self._parent_type is None: + # raise ImproperlyConfigured( + # 'A parent resource must be provided.') @property def _in_girder(self): From 2f029df0e62fac926e80619db2c953233457315f Mon Sep 17 00:00:00 2001 From: Chris Harris Date: Fri, 25 Aug 2017 14:45:23 -0400 Subject: [PATCH 6/6] Make sure we use correct authenticated client --- girder_worker/plugins/girder_io/backend/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/girder_worker/plugins/girder_io/backend/__init__.py b/girder_worker/plugins/girder_io/backend/__init__.py index 8e3a895b..c3414ee5 100644 --- a/girder_worker/plugins/girder_io/backend/__init__.py +++ b/girder_worker/plugins/girder_io/backend/__init__.py @@ -100,8 +100,8 @@ def _store_result(self, task_id, result, state, if self._parent_type == 'folder': # Change to one call then PR is merged into master - item = self._client.createItem(self._parent_id, task_id, reuseExisting=False) - self._client.addMetadataToItem(item['_id'], result_meta) + item = client.createItem(self._parent_id, task_id, reuseExisting=False) + client.addMetadataToItem(item['_id'], result_meta) return result