diff --git a/src/saturn_engine/worker/services/usage_metrics.py b/src/saturn_engine/worker/services/usage_metrics.py index 65adbf89..2b535b17 100644 --- a/src/saturn_engine/worker/services/usage_metrics.py +++ b/src/saturn_engine/worker/services/usage_metrics.py @@ -18,6 +18,7 @@ class PipelineName(t.NamedTuple): + labels: t.FrozenSet[tuple[str, str]] executor: str name: str @@ -38,6 +39,7 @@ class PipelineName(t.NamedTuple): class PipelineUsage(t.NamedTuple): executor: str + labels: dict[str, str] name: str usage: float @@ -88,6 +90,7 @@ class StageState(t.Generic[T, U]): def _name(xmsg: ExecutableMessage) -> PipelineName: return PipelineName( executor=xmsg.queue.definition.executor or "default", + labels=frozenset(xmsg.queue.definition.labels.items()), name=xmsg.message.info.name, ) @@ -104,7 +107,10 @@ def pop(self, xmsg: ExecutableMessage) -> U: def collect(self, *, now: Nanoseconds) -> t.Iterator[PipelineUsage]: for k, state in self.pipelines.items(): yield PipelineUsage( - executor=k.executor, name=k.name, usage=state.collect(now=now) + labels=dict(k.labels), + executor=k.executor, + name=k.name, + usage=state.collect(now=now), ) @@ -178,7 +184,8 @@ def collect_usage_metrics( "executor": pipeline.executor, "pipeline": pipeline.name, "state": stage_name, - }, + } + | {f"saturn.job.labels.{k}": v for k, v in pipeline.labels.items()}, ) async def on_message_polled(self, xmsg: ExecutableMessage) -> None: diff --git a/src/saturn_engine/worker_manager/config/declarative_job.py b/src/saturn_engine/worker_manager/config/declarative_job.py index ec5b8ba6..039d8115 100644 --- a/src/saturn_engine/worker_manager/config/declarative_job.py +++ b/src/saturn_engine/worker_manager/config/declarative_job.py @@ -96,7 +96,12 @@ def to_core_objects( info=self.pipeline.to_core_object(), args=dict(), ), - labels=labels, + labels=dict( + { + "internal.job-definition-name": name, + }, + **labels, + ), config=self.config, executor=self.executor, ) diff --git a/tests/worker/services/test_usage_metrics.py b/tests/worker/services/test_usage_metrics.py index 354d7b5f..adec774e 100644 --- a/tests/worker/services/test_usage_metrics.py +++ b/tests/worker/services/test_usage_metrics.py @@ -26,10 +26,15 @@ async def test_message_metrics( xmsg: ExecutableMessage = Mock() xmsg.message.info.name = pipeline_name xmsg.queue.definition.executor = "default" + xmsg.queue.definition.labels = {"k": "v"} xmsgs.append(xmsg) any_: t.Any = None - pipeline_params = {"pipeline": pipeline_name, "executor": "default"} + pipeline_params = { + "pipeline": pipeline_name, + "executor": "default", + "saturn.job.labels.k": "v", + } results = PipelineResults(outputs=[], resources=[]) metric = services_manager.services.cast_service(UsageMetrics) diff --git a/tests/worker_manager/api/test_job_definitions.py b/tests/worker_manager/api/test_job_definitions.py index df79bd68..9210f165 100644 --- a/tests/worker_manager/api/test_job_definitions.py +++ b/tests/worker_manager/api/test_job_definitions.py @@ -91,7 +91,10 @@ def test_api_job_definitions_loaded_from_str( "resources": {"api_key": "GithubApiKey"}, }, }, - "labels": {"owner": "team-saturn"}, + "labels": { + "owner": "team-saturn", + "internal.job-definition-name": "test-job-definition", + }, "config": { "tracer": { "sampler": {