Skip to content

Commit

Permalink
Add original job definition name to pipeline metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Emile-Filteau committed Oct 17, 2024
1 parent cdf69ea commit dc96a9e
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/saturn_engine/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class QueueItem:
config: dict[str, Any] = field(default_factory=dict)
labels: dict[str, str] = field(default_factory=dict)
executor: str = "default"
job_definition_name: str = ""

def with_state(self, state: "QueueItemState") -> "QueueItemWithState":
return QueueItemWithState(**self.__dict__, state=state)
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/worker/services/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

def executable_params(xmsg: ExecutableMessage) -> dict:
return {
"saturn.job_definition.name": xmsg.queue.definition.job_definition_name,
"saturn.executor.name": xmsg.queue.definition.executor,
"saturn.job.name": xmsg.queue.definition.name,
"pipeline": xmsg.message.info.name,
Expand Down
9 changes: 8 additions & 1 deletion src/saturn_engine/worker/services/usage_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

class PipelineName(t.NamedTuple):
executor: str
job_definition_name: str
name: str


Expand All @@ -38,6 +39,7 @@ class PipelineName(t.NamedTuple):

class PipelineUsage(t.NamedTuple):
executor: str
job_definition_name: str
name: str
usage: float

Expand Down Expand Up @@ -88,6 +90,7 @@ class StageState(t.Generic[T, U]):
def _name(xmsg: ExecutableMessage) -> PipelineName:
return PipelineName(
executor=xmsg.queue.definition.executor or "default",
job_definition_name=xmsg.queue.definition.job_definition_name or "",
name=xmsg.message.info.name,
)

Expand All @@ -104,7 +107,10 @@ def pop(self, xmsg: ExecutableMessage) -> U:
def collect(self, *, now: Nanoseconds) -> t.Iterator[PipelineUsage]:
for k, state in self.pipelines.items():
yield PipelineUsage(
executor=k.executor, name=k.name, usage=state.collect(now=now)
job_definition_name=k.job_definition_name,
executor=k.executor,
name=k.name,
usage=state.collect(now=now),
)


Expand Down Expand Up @@ -175,6 +181,7 @@ def collect_usage_metrics(
yield Observation(
pipeline.usage,
{
"job_definition_name": pipeline.job_definition_name,
"executor": pipeline.executor,
"pipeline": pipeline.name,
"state": stage_name,
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/worker_manager/config/declarative_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def to_core_objects(
labels=labels,
config=self.config,
executor=self.executor,
job_definition_name=name,
)


Expand Down
10 changes: 10 additions & 0 deletions tests/worker/services/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def fake_published_message() -> Mock:
data = Mock()
data.xmsg.queue.definition.executor = "exec"
data.xmsg.queue.definition.name = "test-job"
data.xmsg.queue.definition.job_definition_name = "test-job-definition"
data.xmsg.queue.definition.labels = {"k": "v"}
data.xmsg.message.info.name = "test.fake.pipeline"
data.topic.name = "test.fake.topic"
Expand All @@ -30,10 +31,12 @@ async def test_message_metrics(
data = Mock()
data.queue.definition.name = "test-job"
data.queue.definition.executor = "exec"
data.queue.definition.job_definition_name = "test-job-definition"
data.queue.definition.labels = {"k": "v"}
data.message.info.name = "test.fake.pipeline"
pipeline_params = {
"pipeline": data.message.info.name,
"saturn.job_definition.name": "test-job-definition",
"saturn.executor.name": "exec",
"saturn.job.name": "test-job",
"saturn.job.labels.k": "v",
Expand Down Expand Up @@ -69,10 +72,12 @@ async def test_metrics_message_executed(
data = Mock()
data.queue.definition.name = "test-job"
data.queue.definition.executor = "exec"
data.queue.definition.job_definition_name = "test-job-definition"
data.queue.definition.labels = {"k": "v"}
data.message.info.name = "test.fake.pipeline"
pipeline_params = {
"pipeline": data.message.info.name,
"saturn.job_definition.name": "test-job-definition",
"saturn.executor.name": "exec",
"saturn.job.name": "test-job",
"saturn.job.labels.k": "v",
Expand Down Expand Up @@ -145,10 +150,12 @@ async def test_metrics_message_execute_failed(
data = Mock()
data.queue.definition.name = "test-job"
data.queue.definition.executor = "exec"
data.queue.definition.job_definition_name = "test-job-definition"
data.queue.definition.labels = {"k": "v"}
data.message.info.name = "test.fake.pipeline"
pipeline_params = {
"pipeline": data.message.info.name,
"saturn.job_definition.name": "test-job-definition",
"saturn.executor.name": "exec",
"saturn.job.name": "test-job",
"saturn.job.labels.k": "v",
Expand Down Expand Up @@ -198,6 +205,7 @@ async def test_metrics_message_published(
params = {
"pipeline": "test.fake.pipeline",
"topic": "test.fake.topic",
"saturn.job_definition.name": "test-job-definition",
"saturn.executor.name": "exec",
"saturn.job.name": "test-job",
"saturn.job.labels.k": "v",
Expand Down Expand Up @@ -233,6 +241,7 @@ async def test_metrics_message_publish_failed(
params = {
"pipeline": "test.fake.pipeline",
"topic": "test.fake.topic",
"saturn.job_definition.name": "test-job-definition",
"saturn.executor.name": "exec",
"saturn.job.name": "test-job",
"saturn.job.labels.k": "v",
Expand Down Expand Up @@ -268,6 +277,7 @@ async def test_metrics_topic_blocked(
params = {
"pipeline": "test.fake.pipeline",
"topic": "test.fake.topic",
"saturn.job_definition.name": "test-job-definition",
"saturn.executor.name": "exec",
"saturn.job.name": "test-job",
"saturn.job.labels.k": "v",
Expand Down
7 changes: 6 additions & 1 deletion tests/worker/services/test_usage_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ async def test_message_metrics(
xmsg: ExecutableMessage = Mock()
xmsg.message.info.name = pipeline_name
xmsg.queue.definition.executor = "default"
xmsg.queue.definition.job_definition_name = "my-job-definition"
xmsgs.append(xmsg)

any_: t.Any = None
pipeline_params = {"pipeline": pipeline_name, "executor": "default"}
pipeline_params = {
"pipeline": pipeline_name,
"executor": "default",
"job_definition_name": "my-job-definition",
}
results = PipelineResults(outputs=[], resources=[])
metric = services_manager.services.cast_service(UsageMetrics)

Expand Down
2 changes: 2 additions & 0 deletions tests/worker/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ async def test_broker_dummy(
labels={"owner": "team-saturn"},
output={},
executor="e1",
job_definition_name="job-definition-name",
)
],
resources=[
Expand Down Expand Up @@ -159,6 +160,7 @@ async def test_broker_dummy(

# Test metrics
pipeline_params = {
"saturn.job_definition.name": "job-definition-name",
"saturn.executor.name": "e1",
"saturn.job.name": "j1",
"pipeline": "tests.worker.test_broker.pipeline",
Expand Down
1 change: 1 addition & 0 deletions tests/worker_manager/api/test_job_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def test_api_job_definitions_loaded_from_str(
},
"name": "test-job-definition",
"executor": "default",
"job_definition_name": "test-job-definition",
"output": {
"default": [
{
Expand Down

0 comments on commit dc96a9e

Please sign in to comment.