Skip to content

Commit

Permalink
Create a factory to load static definition, separate the creation flow
Browse files Browse the repository at this point in the history
  • Loading branch information
Olivier Michaud committed Oct 16, 2024
1 parent cdf69ea commit 1fc703c
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 20 deletions.
5 changes: 4 additions & 1 deletion src/saturn_engine/worker/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ def __init__(
) -> None:
self.sessionmaker = sessionmaker
self.worker_id = config.c.worker_id
self.context = WorkerManagerContext(config=config.c.worker_manager)
self.max_assigned_items = config.c.worker_manager.work_items_per_worker

self.context = WorkerManagerContext(config=config.c.worker_manager)
with self.sessionmaker() as session:
self.context.reset_static_definition(session=session)

async def init_db(self) -> None:
return await asyncio.get_event_loop().run_in_executor(
None,
Expand Down
48 changes: 30 additions & 18 deletions src/saturn_engine/worker_manager/context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from functools import cached_property

from saturn_engine.config import WorkerManagerConfig
from saturn_engine.utils.sqlalchemy import AnySession
from saturn_engine.worker_manager.config.declarative import filter_with_jobs_selector
from saturn_engine.worker_manager.config.declarative import load_definitions_from_paths

Expand All @@ -15,22 +14,35 @@ class WorkerManagerContext:

def __init__(self, config: WorkerManagerConfig) -> None:
self.config: WorkerManagerConfig = config
self._static_definitions: StaticDefinitions | None

@cached_property
@property
def static_definitions(self) -> StaticDefinitions:
"""
Static definitions contain objects defined in a declarative configuration:
- Inventories
- Topics
- Jobs
- JobDefinitions
"""
definitions = load_definitions_from_paths(
self.config.static_definitions_directories
assert (
self._static_definitions
), "Static definitions need to be set before usage"
return self._static_definitions

def reset_static_definition(self, session: AnySession) -> None:
self._static_definitions = _load_static_definition(
session=session, config=self.config
)


def _load_static_definition(
config: WorkerManagerConfig, session: AnySession
) -> StaticDefinitions:
"""
Static definitions contain objects defined in a declarative configuration:
- Inventories
- Topics
- Jobs
- JobDefinitions
"""
definitions = load_definitions_from_paths(config.static_definitions_directories)
if config.static_definitions_jobs_selector:
definitions = filter_with_jobs_selector(
definitions=definitions,
selector=config.static_definitions_jobs_selector,
)
if self.config.static_definitions_jobs_selector:
definitions = filter_with_jobs_selector(
definitions=definitions,
selector=self.config.static_definitions_jobs_selector,
)
return definitions
return definitions
1 change: 1 addition & 0 deletions src/saturn_engine/worker_manager/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def init_all(app: Optional[SaturnApp] = None) -> None:
with app.app_context():
create_all()
with session_scope() as session:
current_app.saturn.reset_static_definition(session=session)
sync_jobs(
static_definitions=current_app.saturn.static_definitions,
session=session,
Expand Down
2 changes: 1 addition & 1 deletion tests/worker_manager/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def client(
"TESTING": True,
},
)
app.saturn.static_definitions = static_definitions
app.saturn._static_definitions = static_definitions
with app.app_context():
Base.metadata.drop_all(bind=database.engine())
Base.metadata.create_all(bind=database.engine())
Expand Down

0 comments on commit 1fc703c

Please sign in to comment.