diff --git a/src/saturn_engine/worker/worker_manager.py b/src/saturn_engine/worker/worker_manager.py index 695f559e..92b0a57b 100644 --- a/src/saturn_engine/worker/worker_manager.py +++ b/src/saturn_engine/worker/worker_manager.py @@ -26,9 +26,12 @@ def __init__( ) -> None: self.sessionmaker = sessionmaker self.worker_id = config.c.worker_id - self.context = WorkerManagerContext(config=config.c.worker_manager) self.max_assigned_items = config.c.worker_manager.work_items_per_worker + self.context = WorkerManagerContext(config=config.c.worker_manager) + with self.sessionmaker() as session: + self.context.reset_static_definition(session=session) + async def init_db(self) -> None: return await asyncio.get_event_loop().run_in_executor( None, diff --git a/src/saturn_engine/worker_manager/context.py b/src/saturn_engine/worker_manager/context.py index 7eec578b..cb974055 100644 --- a/src/saturn_engine/worker_manager/context.py +++ b/src/saturn_engine/worker_manager/context.py @@ -1,6 +1,5 @@ -from functools import cached_property - from saturn_engine.config import WorkerManagerConfig +from saturn_engine.utils.sqlalchemy import AnySession from saturn_engine.worker_manager.config.declarative import filter_with_jobs_selector from saturn_engine.worker_manager.config.declarative import load_definitions_from_paths @@ -15,22 +14,35 @@ class WorkerManagerContext: def __init__(self, config: WorkerManagerConfig) -> None: self.config: WorkerManagerConfig = config + self._static_definitions: StaticDefinitions | None - @cached_property + @property def static_definitions(self) -> StaticDefinitions: - """ - Static definitions contain objects defined in a declarative configuration: - - Inventories - - Topics - - Jobs - - JobDefinitions - """ - definitions = load_definitions_from_paths( - self.config.static_definitions_directories + assert ( + self._static_definitions + ), "Static definitions need to be set before usage" + return self._static_definitions + + def reset_static_definition(self, session: AnySession) -> None: + self._static_definitions = _load_static_definition( + session=session, config=self.config + ) + + +def _load_static_definition( + config: WorkerManagerConfig, session: AnySession +) -> StaticDefinitions: + """ + Static definitions contain objects defined in a declarative configuration: + - Inventories + - Topics + - Jobs + - JobDefinitions + """ + definitions = load_definitions_from_paths(config.static_definitions_directories) + if config.static_definitions_jobs_selector: + definitions = filter_with_jobs_selector( + definitions=definitions, + selector=config.static_definitions_jobs_selector, ) - if self.config.static_definitions_jobs_selector: - definitions = filter_with_jobs_selector( - definitions=definitions, - selector=self.config.static_definitions_jobs_selector, - ) - return definitions + return definitions diff --git a/src/saturn_engine/worker_manager/server.py b/src/saturn_engine/worker_manager/server.py index f9b9e5d0..83f7b6a8 100644 --- a/src/saturn_engine/worker_manager/server.py +++ b/src/saturn_engine/worker_manager/server.py @@ -56,6 +56,7 @@ def init_all(app: Optional[SaturnApp] = None) -> None: with app.app_context(): create_all() with session_scope() as session: + current_app.saturn.reset_static_definition(session=session) sync_jobs( static_definitions=current_app.saturn.static_definitions, session=session, diff --git a/tests/worker_manager/conftest.py b/tests/worker_manager/conftest.py index cb2bf588..73e7e143 100644 --- a/tests/worker_manager/conftest.py +++ b/tests/worker_manager/conftest.py @@ -103,7 +103,7 @@ def client( "TESTING": True, }, ) - app.saturn.static_definitions = static_definitions + app.saturn._static_definitions = static_definitions with app.app_context(): Base.metadata.drop_all(bind=database.engine()) Base.metadata.create_all(bind=database.engine())