Skip to content

Commit

Permalink
Defer inventory update processing til cache is loaded
Browse files Browse the repository at this point in the history
  • Loading branch information
SaladDais committed Jan 18, 2024
1 parent a7825a8 commit c949576
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 1 deletion.
3 changes: 3 additions & 0 deletions hippolyzer/lib/client/inventory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,6 @@ def _handle_move_inventory_item(self, msg: Message):
if inventory_block["NewName"]:
node.name = str(inventory_block["NewName"])
node.parent_id = inventory_block['FolderID']

def process_aisv3_response(self, payload: dict):
pass
54 changes: 53 additions & 1 deletion hippolyzer/lib/proxy/inventory_manager.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,51 @@
import asyncio
import datetime as dt
import functools
import logging
from typing import *

from hippolyzer.lib.base.helpers import get_mtime
from hippolyzer.lib.base.helpers import get_mtime, create_logged_task
from hippolyzer.lib.client.inventory_manager import InventoryManager
from hippolyzer.lib.client.state import BaseClientSession
from hippolyzer.lib.proxy.viewer_settings import iter_viewer_cache_dirs


LOG = logging.getLogger(__name__)


class ProxyInventoryManager(InventoryManager):
def __init__(self, session: BaseClientSession):
# These handlers all need their processing deferred until the cache has been loaded.
# Since cache is loaded asynchronously, the viewer may get ahead of us due to parsing
# the cache faster and start requesting inventory details we can't do anything with yet.
self._handle_update_create_inventory_item = self._wrap_with_cache_defer(
self._handle_update_create_inventory_item
)
self._handle_remove_inventory_item = self._wrap_with_cache_defer(
self._handle_remove_inventory_item
)
self._handle_remove_inventory_folder = self._wrap_with_cache_defer(
self._handle_remove_inventory_folder
)
self._handle_bulk_update_inventory = self._wrap_with_cache_defer(
self._handle_bulk_update_inventory
)
self._handle_move_inventory_item = self._wrap_with_cache_defer(
self._handle_move_inventory_item
)
self.process_aisv3_response = self._wrap_with_cache_defer(
self.process_aisv3_response
)

# Base constructor after, because it registers handlers to specific methods, which need to
# be wrapped before we call they're registered. Handlers are registered by method reference,
# not by name!
super().__init__(session)
newest_cache = None
newest_timestamp = dt.datetime(year=1970, month=1, day=1, tzinfo=dt.timezone.utc)
# So consumers know when the inventory should be complete
self.cache_loaded: asyncio.Event = asyncio.Event()
self._cache_deferred_calls: List[Tuple[Callable[..., None], Tuple]] = []
# Look for the newest version of the cached inventory and use that.
# Not foolproof, but close enough if we're not sure what viewer is being used.
for cache_dir in iter_viewer_cache_dirs():
Expand All @@ -31,5 +63,25 @@ def __init__(self, session: BaseClientSession):
cache_load_fut = asyncio.ensure_future(asyncio.to_thread(self.load_cache, newest_cache))
# Meh. Don't care if it fails.
cache_load_fut.add_done_callback(lambda *args: self.cache_loaded.set())
create_logged_task(self._apply_deferred_after_loaded(), "Apply deferred inventory", LOG)
else:
self.cache_loaded.set()

async def _apply_deferred_after_loaded(self):
await self.cache_loaded.wait()
deferred_calls = self._cache_deferred_calls[:]
self._cache_deferred_calls.clear()
for func, args in deferred_calls:
try:
func(*args)
except:
LOG.exception("Failed to apply deferred inventory call")

def _wrap_with_cache_defer(self, func: Callable[..., None]):
@functools.wraps(func)
def wrapped(*inner_args):
if not self.cache_loaded.is_set():
self._cache_deferred_calls.append((func, inner_args))
else:
func(*inner_args)
return wrapped

0 comments on commit c949576

Please sign in to comment.