From e22f4bbdbc49066f1a5d9676ba6f68fed1141b28 Mon Sep 17 00:00:00 2001 From: Emile Filteau-Tessier Date: Tue, 15 Oct 2024 15:23:09 -0400 Subject: [PATCH] Add job definition to QueueItem labels --- src/saturn_engine/worker/services/usage_metrics.py | 11 +++++++++-- .../worker_manager/config/declarative_job.py | 7 ++++++- tests/worker/services/test_usage_metrics.py | 7 ++++++- tests/worker_manager/api/test_job_definitions.py | 5 ++++- 4 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/saturn_engine/worker/services/usage_metrics.py b/src/saturn_engine/worker/services/usage_metrics.py index 65adbf89..2b535b17 100644 --- a/src/saturn_engine/worker/services/usage_metrics.py +++ b/src/saturn_engine/worker/services/usage_metrics.py @@ -18,6 +18,7 @@ class PipelineName(t.NamedTuple): + labels: t.FrozenSet[tuple[str, str]] executor: str name: str @@ -38,6 +39,7 @@ class PipelineName(t.NamedTuple): class PipelineUsage(t.NamedTuple): executor: str + labels: dict[str, str] name: str usage: float @@ -88,6 +90,7 @@ class StageState(t.Generic[T, U]): def _name(xmsg: ExecutableMessage) -> PipelineName: return PipelineName( executor=xmsg.queue.definition.executor or "default", + labels=frozenset(xmsg.queue.definition.labels.items()), name=xmsg.message.info.name, ) @@ -104,7 +107,10 @@ def pop(self, xmsg: ExecutableMessage) -> U: def collect(self, *, now: Nanoseconds) -> t.Iterator[PipelineUsage]: for k, state in self.pipelines.items(): yield PipelineUsage( - executor=k.executor, name=k.name, usage=state.collect(now=now) + labels=dict(k.labels), + executor=k.executor, + name=k.name, + usage=state.collect(now=now), ) @@ -178,7 +184,8 @@ def collect_usage_metrics( "executor": pipeline.executor, "pipeline": pipeline.name, "state": stage_name, - }, + } + | {f"saturn.job.labels.{k}": v for k, v in pipeline.labels.items()}, ) async def on_message_polled(self, xmsg: ExecutableMessage) -> None: diff --git a/src/saturn_engine/worker_manager/config/declarative_job.py b/src/saturn_engine/worker_manager/config/declarative_job.py index ec5b8ba6..34dd09cc 100644 --- a/src/saturn_engine/worker_manager/config/declarative_job.py +++ b/src/saturn_engine/worker_manager/config/declarative_job.py @@ -96,7 +96,12 @@ def to_core_objects( info=self.pipeline.to_core_object(), args=dict(), ), - labels=labels, + labels=dict( + { + "job-definition-name": name, + }, + **labels, + ), config=self.config, executor=self.executor, ) diff --git a/tests/worker/services/test_usage_metrics.py b/tests/worker/services/test_usage_metrics.py index 354d7b5f..adec774e 100644 --- a/tests/worker/services/test_usage_metrics.py +++ b/tests/worker/services/test_usage_metrics.py @@ -26,10 +26,15 @@ async def test_message_metrics( xmsg: ExecutableMessage = Mock() xmsg.message.info.name = pipeline_name xmsg.queue.definition.executor = "default" + xmsg.queue.definition.labels = {"k": "v"} xmsgs.append(xmsg) any_: t.Any = None - pipeline_params = {"pipeline": pipeline_name, "executor": "default"} + pipeline_params = { + "pipeline": pipeline_name, + "executor": "default", + "saturn.job.labels.k": "v", + } results = PipelineResults(outputs=[], resources=[]) metric = services_manager.services.cast_service(UsageMetrics) diff --git a/tests/worker_manager/api/test_job_definitions.py b/tests/worker_manager/api/test_job_definitions.py index df79bd68..dc9f04fa 100644 --- a/tests/worker_manager/api/test_job_definitions.py +++ b/tests/worker_manager/api/test_job_definitions.py @@ -91,7 +91,10 @@ def test_api_job_definitions_loaded_from_str( "resources": {"api_key": "GithubApiKey"}, }, }, - "labels": {"owner": "team-saturn"}, + "labels": { + "owner": "team-saturn", + "job-definition-name": "test-job-definition", + }, "config": { "tracer": { "sampler": {