Skip to content

Commit

Permalink
Add job definition to QueueItem labels
Browse files Browse the repository at this point in the history
  • Loading branch information
Emile-Filteau committed Oct 18, 2024
1 parent 5be9dca commit e22f4bb
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 5 deletions.
11 changes: 9 additions & 2 deletions src/saturn_engine/worker/services/usage_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@


class PipelineName(t.NamedTuple):
labels: t.FrozenSet[tuple[str, str]]
executor: str
name: str

Expand All @@ -38,6 +39,7 @@ class PipelineName(t.NamedTuple):

class PipelineUsage(t.NamedTuple):
executor: str
labels: dict[str, str]
name: str
usage: float

Expand Down Expand Up @@ -88,6 +90,7 @@ class StageState(t.Generic[T, U]):
def _name(xmsg: ExecutableMessage) -> PipelineName:
return PipelineName(
executor=xmsg.queue.definition.executor or "default",
labels=frozenset(xmsg.queue.definition.labels.items()),
name=xmsg.message.info.name,
)

Expand All @@ -104,7 +107,10 @@ def pop(self, xmsg: ExecutableMessage) -> U:
def collect(self, *, now: Nanoseconds) -> t.Iterator[PipelineUsage]:
for k, state in self.pipelines.items():
yield PipelineUsage(
executor=k.executor, name=k.name, usage=state.collect(now=now)
labels=dict(k.labels),
executor=k.executor,
name=k.name,
usage=state.collect(now=now),
)


Expand Down Expand Up @@ -178,7 +184,8 @@ def collect_usage_metrics(
"executor": pipeline.executor,
"pipeline": pipeline.name,
"state": stage_name,
},
}
| {f"saturn.job.labels.{k}": v for k, v in pipeline.labels.items()},
)

async def on_message_polled(self, xmsg: ExecutableMessage) -> None:
Expand Down
7 changes: 6 additions & 1 deletion src/saturn_engine/worker_manager/config/declarative_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ def to_core_objects(
info=self.pipeline.to_core_object(),
args=dict(),
),
labels=labels,
labels=dict(
{
"job-definition-name": name,
},
**labels,
),
config=self.config,
executor=self.executor,
)
Expand Down
7 changes: 6 additions & 1 deletion tests/worker/services/test_usage_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ async def test_message_metrics(
xmsg: ExecutableMessage = Mock()
xmsg.message.info.name = pipeline_name
xmsg.queue.definition.executor = "default"
xmsg.queue.definition.labels = {"k": "v"}
xmsgs.append(xmsg)

any_: t.Any = None
pipeline_params = {"pipeline": pipeline_name, "executor": "default"}
pipeline_params = {
"pipeline": pipeline_name,
"executor": "default",
"saturn.job.labels.k": "v",
}
results = PipelineResults(outputs=[], resources=[])
metric = services_manager.services.cast_service(UsageMetrics)

Expand Down
5 changes: 4 additions & 1 deletion tests/worker_manager/api/test_job_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ def test_api_job_definitions_loaded_from_str(
"resources": {"api_key": "GithubApiKey"},
},
},
"labels": {"owner": "team-saturn"},
"labels": {
"owner": "team-saturn",
"job-definition-name": "test-job-definition",
},
"config": {
"tracer": {
"sampler": {
Expand Down

0 comments on commit e22f4bb

Please sign in to comment.