From d7837f0b83187b4cb7e4d2e79d2580506d3141a5 Mon Sep 17 00:00:00 2001 From: isra17 Date: Tue, 31 Oct 2023 11:52:20 -0400 Subject: [PATCH] Add executor labels to log and metrics --- src/saturn_engine/worker/services/loggers/logger.py | 1 + src/saturn_engine/worker/services/metrics.py | 1 + tests/worker/services/test_logger.py | 2 ++ tests/worker/services/test_metrics.py | 10 ++++++++++ tests/worker/test_broker.py | 1 + 5 files changed, 15 insertions(+) diff --git a/src/saturn_engine/worker/services/loggers/logger.py b/src/saturn_engine/worker/services/loggers/logger.py index 7a7fcce3..01b5b23b 100644 --- a/src/saturn_engine/worker/services/loggers/logger.py +++ b/src/saturn_engine/worker/services/loggers/logger.py @@ -30,6 +30,7 @@ def executable_message_data( ) -> dict[str, t.Any]: labels = xmsg.queue.definition.labels.copy() return pipeline_message_data(xmsg.message, verbose=verbose) | { + "executor": {"name": xmsg.queue.definition.executor}, "job": {"name": xmsg.queue.name}, "input": xmsg.queue.definition.input.name, "labels": labels, diff --git a/src/saturn_engine/worker/services/metrics.py b/src/saturn_engine/worker/services/metrics.py index d8a7a3f1..19348110 100644 --- a/src/saturn_engine/worker/services/metrics.py +++ b/src/saturn_engine/worker/services/metrics.py @@ -15,6 +15,7 @@ def executable_params(xmsg: ExecutableMessage) -> dict: return { + "saturn.executor.name": xmsg.queue.definition.executor, "saturn.job.name": xmsg.queue.definition.name, "pipeline": xmsg.message.info.name, } | {f"saturn.job.labels.{k}": v for k, v in xmsg.queue.definition.labels.items()} diff --git a/tests/worker/services/test_logger.py b/tests/worker/services/test_logger.py index 49fc7532..b5533b07 100644 --- a/tests/worker/services/test_logger.py +++ b/tests/worker/services/test_logger.py @@ -63,6 +63,7 @@ async def test_logger_message_executed( assert r.message == "Executing message" assert r.data == { "input": "fake-topic", + "executor": {"name": "default"}, "job": {"name": "fake-queue"}, "message": { "id": "m1", @@ -83,6 +84,7 @@ async def test_logger_message_executed( assert r.message == "Executed message" assert r.data == { "input": "fake-topic", + "executor": {"name": "default"}, "job": { "name": "fake-queue", }, diff --git a/tests/worker/services/test_metrics.py b/tests/worker/services/test_metrics.py index 9846538d..0079ead0 100644 --- a/tests/worker/services/test_metrics.py +++ b/tests/worker/services/test_metrics.py @@ -17,10 +17,12 @@ async def test_message_metrics( ) -> None: data = Mock() data.queue.definition.name = "test-job" + data.queue.definition.executor = "exec" data.queue.definition.labels = {"k": "v"} data.message.info.name = "test.fake.pipeline" pipeline_params = { "pipeline": data.message.info.name, + "saturn.executor.name": "exec", "saturn.job.name": "test-job", "saturn.job.labels.k": "v", } @@ -54,10 +56,12 @@ async def test_metrics_message_executed( ) -> None: data = Mock() data.queue.definition.name = "test-job" + data.queue.definition.executor = "exec" data.queue.definition.labels = {"k": "v"} data.message.info.name = "test.fake.pipeline" pipeline_params = { "pipeline": data.message.info.name, + "saturn.executor.name": "exec", "saturn.job.name": "test-job", "saturn.job.labels.k": "v", } @@ -128,10 +132,12 @@ async def test_metrics_message_execute_failed( ) -> None: data = Mock() data.queue.definition.name = "test-job" + data.queue.definition.executor = "exec" data.queue.definition.labels = {"k": "v"} data.message.info.name = "test.fake.pipeline" pipeline_params = { "pipeline": data.message.info.name, + "saturn.executor.name": "exec", "saturn.job.name": "test-job", "saturn.job.labels.k": "v", } @@ -176,6 +182,7 @@ async def test_metrics_message_published( services_manager: ServicesManager, metrics_capture: MetricsCapture ) -> None: data = Mock() + data.xmsg.queue.definition.executor = "exec" data.xmsg.queue.definition.name = "test-job" data.xmsg.queue.definition.labels = {"k": "v"} data.xmsg.message.info.name = "test.fake.pipeline" @@ -183,6 +190,7 @@ async def test_metrics_message_published( params = { "pipeline": "test.fake.pipeline", "topic": "test.fake.topic", + "saturn.executor.name": "exec", "saturn.job.name": "test-job", "saturn.job.labels.k": "v", } @@ -213,6 +221,7 @@ async def test_metrics_message_publish_failed( services_manager: ServicesManager, metrics_capture: MetricsCapture ) -> None: data = Mock() + data.xmsg.queue.definition.executor = "exec" data.xmsg.queue.definition.name = "test-job" data.xmsg.queue.definition.labels = {"k": "v"} data.xmsg.message.info.name = "test.fake.pipeline" @@ -220,6 +229,7 @@ async def test_metrics_message_publish_failed( params = { "pipeline": "test.fake.pipeline", "topic": "test.fake.topic", + "saturn.executor.name": "exec", "saturn.job.name": "test-job", "saturn.job.labels.k": "v", } diff --git a/tests/worker/test_broker.py b/tests/worker/test_broker.py index c05aa076..e82f1c01 100644 --- a/tests/worker/test_broker.py +++ b/tests/worker/test_broker.py @@ -160,6 +160,7 @@ async def test_broker_dummy( # Test metrics pipeline_params = { + "saturn.executor.name": "e1", "saturn.job.name": "j1", "pipeline": "tests.worker.test_broker.pipeline", "saturn.job.labels.owner": "team-saturn",