Skip to content

Commit

Permalink
Add executor labels to log and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Oct 31, 2023
1 parent e6974d8 commit d7837f0
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/saturn_engine/worker/services/loggers/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def executable_message_data(
) -> dict[str, t.Any]:
labels = xmsg.queue.definition.labels.copy()
return pipeline_message_data(xmsg.message, verbose=verbose) | {
"executor": {"name": xmsg.queue.definition.executor},
"job": {"name": xmsg.queue.name},
"input": xmsg.queue.definition.input.name,
"labels": labels,
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/worker/services/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

def executable_params(xmsg: ExecutableMessage) -> dict:
return {
"saturn.executor.name": xmsg.queue.definition.executor,
"saturn.job.name": xmsg.queue.definition.name,
"pipeline": xmsg.message.info.name,
} | {f"saturn.job.labels.{k}": v for k, v in xmsg.queue.definition.labels.items()}
Expand Down
2 changes: 2 additions & 0 deletions tests/worker/services/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ async def test_logger_message_executed(
assert r.message == "Executing message"
assert r.data == {
"input": "fake-topic",
"executor": {"name": "default"},
"job": {"name": "fake-queue"},
"message": {
"id": "m1",
Expand All @@ -83,6 +84,7 @@ async def test_logger_message_executed(
assert r.message == "Executed message"
assert r.data == {
"input": "fake-topic",
"executor": {"name": "default"},
"job": {
"name": "fake-queue",
},
Expand Down
10 changes: 10 additions & 0 deletions tests/worker/services/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ async def test_message_metrics(
) -> None:
data = Mock()
data.queue.definition.name = "test-job"
data.queue.definition.executor = "exec"
data.queue.definition.labels = {"k": "v"}
data.message.info.name = "test.fake.pipeline"
pipeline_params = {
"pipeline": data.message.info.name,
"saturn.executor.name": "exec",
"saturn.job.name": "test-job",
"saturn.job.labels.k": "v",
}
Expand Down Expand Up @@ -54,10 +56,12 @@ async def test_metrics_message_executed(
) -> None:
data = Mock()
data.queue.definition.name = "test-job"
data.queue.definition.executor = "exec"
data.queue.definition.labels = {"k": "v"}
data.message.info.name = "test.fake.pipeline"
pipeline_params = {
"pipeline": data.message.info.name,
"saturn.executor.name": "exec",
"saturn.job.name": "test-job",
"saturn.job.labels.k": "v",
}
Expand Down Expand Up @@ -128,10 +132,12 @@ async def test_metrics_message_execute_failed(
) -> None:
data = Mock()
data.queue.definition.name = "test-job"
data.queue.definition.executor = "exec"
data.queue.definition.labels = {"k": "v"}
data.message.info.name = "test.fake.pipeline"
pipeline_params = {
"pipeline": data.message.info.name,
"saturn.executor.name": "exec",
"saturn.job.name": "test-job",
"saturn.job.labels.k": "v",
}
Expand Down Expand Up @@ -176,13 +182,15 @@ async def test_metrics_message_published(
services_manager: ServicesManager, metrics_capture: MetricsCapture
) -> None:
data = Mock()
data.xmsg.queue.definition.executor = "exec"
data.xmsg.queue.definition.name = "test-job"
data.xmsg.queue.definition.labels = {"k": "v"}
data.xmsg.message.info.name = "test.fake.pipeline"
data.topic.name = "test.fake.topic"
params = {
"pipeline": "test.fake.pipeline",
"topic": "test.fake.topic",
"saturn.executor.name": "exec",
"saturn.job.name": "test-job",
"saturn.job.labels.k": "v",
}
Expand Down Expand Up @@ -213,13 +221,15 @@ async def test_metrics_message_publish_failed(
services_manager: ServicesManager, metrics_capture: MetricsCapture
) -> None:
data = Mock()
data.xmsg.queue.definition.executor = "exec"
data.xmsg.queue.definition.name = "test-job"
data.xmsg.queue.definition.labels = {"k": "v"}
data.xmsg.message.info.name = "test.fake.pipeline"
data.topic.name = "test.fake.topic"
params = {
"pipeline": "test.fake.pipeline",
"topic": "test.fake.topic",
"saturn.executor.name": "exec",
"saturn.job.name": "test-job",
"saturn.job.labels.k": "v",
}
Expand Down
1 change: 1 addition & 0 deletions tests/worker/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ async def test_broker_dummy(

# Test metrics
pipeline_params = {
"saturn.executor.name": "e1",
"saturn.job.name": "j1",
"pipeline": "tests.worker.test_broker.pipeline",
"saturn.job.labels.owner": "team-saturn",
Expand Down

0 comments on commit d7837f0

Please sign in to comment.