diff --git a/flask_app.py b/flask_app.py index 9b701f9b..6009f7a2 100644 --- a/flask_app.py +++ b/flask_app.py @@ -62,40 +62,40 @@ ########################################### -# # https://github.com/googleapis/python-ndb/issues/743#issuecomment-2067590945 -# # -# # fixes "RuntimeError: Key has already been set in this batch" errors due to -# # tasklets in pages.serve_feed -# from logging import error as log_error -# from sys import modules - -# from google.cloud.datastore_v1.types.entity import Key -# from google.cloud.ndb._cache import ( -# _GlobalCacheSetBatch, -# global_compare_and_swap, -# global_set_if_not_exists, -# global_watch, -# ) -# from google.cloud.ndb.tasklets import Future, Return, tasklet - -# GLOBAL_CACHE_KEY_PREFIX: bytes = modules["google.cloud.ndb._cache"]._PREFIX -# LOCKED_FOR_READ: bytes = modules["google.cloud.ndb._cache"]._LOCKED_FOR_READ -# LOCK_TIME: bytes = modules["google.cloud.ndb._cache"]._LOCK_TIME - - -# @tasklet -# def custom_global_lock_for_read(key: str, value: str): -# if value is not None: -# yield global_watch(key, value) -# lock_acquired = yield global_compare_and_swap( -# key, LOCKED_FOR_READ, expires=LOCK_TIME -# ) -# else: -# lock_acquired = yield global_set_if_not_exists( -# key, LOCKED_FOR_READ, expires=LOCK_TIME -# ) - -# if lock_acquired: -# raise Return(LOCKED_FOR_READ) - -# modules["google.cloud.ndb._cache"].global_lock_for_read = custom_global_lock_for_read +# https://github.com/googleapis/python-ndb/issues/743#issuecomment-2067590945 +# +# fixes "RuntimeError: Key has already been set in this batch" errors due to +# tasklets in pages.serve_feed +from logging import error as log_error +from sys import modules + +from google.cloud.datastore_v1.types.entity import Key +from google.cloud.ndb._cache import ( + _GlobalCacheSetBatch, + global_compare_and_swap, + global_set_if_not_exists, + global_watch, +) +from google.cloud.ndb.tasklets import Future, Return, tasklet + +GLOBAL_CACHE_KEY_PREFIX: bytes = modules["google.cloud.ndb._cache"]._PREFIX +LOCKED_FOR_READ: bytes = modules["google.cloud.ndb._cache"]._LOCKED_FOR_READ +LOCK_TIME: bytes = modules["google.cloud.ndb._cache"]._LOCK_TIME + + +@tasklet +def custom_global_lock_for_read(key: str, value: str): + if value is not None: + yield global_watch(key, value) + lock_acquired = yield global_compare_and_swap( + key, LOCKED_FOR_READ, expires=LOCK_TIME + ) + else: + lock_acquired = yield global_set_if_not_exists( + key, LOCKED_FOR_READ, expires=LOCK_TIME + ) + + if lock_acquired: + raise Return(LOCKED_FOR_READ) + +modules["google.cloud.ndb._cache"].global_lock_for_read = custom_global_lock_for_read diff --git a/pages.py b/pages.py index 2916bf19..3e37fb71 100644 --- a/pages.py +++ b/pages.py @@ -260,34 +260,33 @@ def serve_feed(*, objects, format, user, title, as_snippets=False, quiet=False): else: activities = [obj.as1 for obj in objects] - # TODO: bring back? - # # hydrate authors, actors, objects from stored Objects - # fields = 'author', 'actor', 'object' - # gets = [] - # for a in activities: - # for field in fields: - # val = as1.get_object(a, field) - # if val and val.keys() <= set(['id']): - # def hydrate(a, f): - # def maybe_set(future): - # if future.result() and future.result().as1: - # a[f] = future.result().as1 - # return maybe_set - - # # TODO: extract a Protocol class method out of User.profile_id, - # # then use that here instead. the catch is that we'd need to - # # determine Protocol for every id, which is expensive. - # # - # # same TODO is in models.fetch_objects - # id = val['id'] - # if id.startswith('did:'): - # id = f'at://{id}/app.bsky.actor.profile/self' - - # future = Object.get_by_id_async(id) - # future.add_done_callback(hydrate(a, field)) - # gets.append(future) - - # tasklets.wait_all(gets) + # hydrate authors, actors, objects from stored Objects + fields = 'author', 'actor', 'object' + gets = [] + for a in activities: + for field in fields: + val = as1.get_object(a, field) + if val and val.keys() <= set(['id']): + def hydrate(a, f): + def maybe_set(future): + if future.result() and future.result().as1: + a[f] = future.result().as1 + return maybe_set + + # TODO: extract a Protocol class method out of User.profile_id, + # then use that here instead. the catch is that we'd need to + # determine Protocol for every id, which is expensive. + # + # same TODO is in models.fetch_objects + id = val['id'] + if id.startswith('did:'): + id = f'at://{id}/app.bsky.actor.profile/self' + + future = Object.get_by_id_async(id) + future.add_done_callback(hydrate(a, field)) + gets.append(future) + + tasklets.wait_all(gets) actor = (user.obj.as1 if user.obj and user.obj.as1 else {'displayName': user.readable_id, 'url': user.web_url()}) diff --git a/tests/test_pages.py b/tests/test_pages.py index 92ab4343..15491527 100644 --- a/tests/test_pages.py +++ b/tests/test_pages.py @@ -488,7 +488,6 @@ def test_feed_html_empty(self): self.assert_equals(200, got.status_code) self.assert_equals([], microformats2.html_to_activities(got.text)) - @skip def test_feed_html(self): self.add_objects() @@ -525,7 +524,6 @@ def test_feed_atom_empty_g_user_without_obj(self): self.user.put() self.test_feed_atom_empty() - @skip def test_feed_atom(self): self.add_objects() got = self.client.get('/web/user.com/feed?format=atom') @@ -549,7 +547,6 @@ def test_feed_rss_empty(self): self.assert_equals(rss.CONTENT_TYPE, got.headers['Content-Type']) self.assert_equals([], rss.to_activities(got.text)) - @skip def test_feed_rss(self): self.add_objects() got = self.client.get('/web/user.com/feed?format=rss')