From 11dc0b2cdcace37f077653a96d8086671a3771a2 Mon Sep 17 00:00:00 2001 From: isra17 Date: Thu, 30 Nov 2023 16:38:04 -0500 Subject: [PATCH] Add metrics about bytes transfer --- .../worker/executors/arq/executor.py | 29 +++++++++++++++++++ src/saturn_engine/worker/topics/rabbitmq.py | 22 ++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/src/saturn_engine/worker/executors/arq/executor.py b/src/saturn_engine/worker/executors/arq/executor.py index a768e20a..e64764c6 100644 --- a/src/saturn_engine/worker/executors/arq/executor.py +++ b/src/saturn_engine/worker/executors/arq/executor.py @@ -2,6 +2,7 @@ import asyncio import dataclasses +import pickle import aioredis.errors from arq import create_pool @@ -9,6 +10,7 @@ from arq.connections import RedisSettings from arq.jobs import Job from arq.jobs import JobStatus +from opentelemetry.metrics import get_meter from saturn_engine.core import PipelineResults from saturn_engine.utils.asyncutils import TasksGroup @@ -46,11 +48,38 @@ def __init__(self, options: Options, services: Services) -> None: self.options = options self.config = LazyConfig([{ARQ_EXECUTOR_NAMESPACE: self.options}]) + meter = get_meter("saturn.metrics") + self.execute_bytes = meter.create_counter( + name="saturn.executor.arq.execute", + unit="By", + description=""" + Total bytes sent to the executor + """, + ) + + self.results_bytes = meter.create_counter( + name="saturn.executor.arq.results", + unit="By", + description=""" + Total bytes received from the executor + """, + ) + if services.s.hooks.executor_initialized: self.logger.warning( "ARQExecutor does not support executor_initialized hooks" ) + def serialize(self, obj: dict[str, t.Any]) -> bytes: + data = pickle.dumps(obj) + self.execute_bytes.add(len(data), {"executor": self.name}) + return data + + def deserialize(self, data: bytes) -> dict[str, t.Any]: + self.results_bytes.add(len(data), {"executor": self.name}) + obj = pickle.loads(data) + return obj + @cached_property async def redis_queue(self) -> ArqRedis: return await create_pool(RedisSettings.from_dsn(self.options.redis_url)) diff --git a/src/saturn_engine/worker/topics/rabbitmq.py b/src/saturn_engine/worker/topics/rabbitmq.py index e8c0e8d7..75c91827 100644 --- a/src/saturn_engine/worker/topics/rabbitmq.py +++ b/src/saturn_engine/worker/topics/rabbitmq.py @@ -14,6 +14,7 @@ import aio_pika import aio_pika.abc import aio_pika.exceptions +from opentelemetry.metrics import get_meter from saturn_engine.core import TopicMessage from saturn_engine.utils.asyncutils import SharedLock @@ -104,6 +105,7 @@ def __init__(self, options: Options, services: Services, **kwargs: object) -> No self.attempt_by_message: LRUDefaultDict[str, int] = LRUDefaultDict( cache_len=1024, default_factory=lambda: 0 ) + self.queue_arguments: dict[str, t.Any] = self.options.arguments if self.options.max_length: @@ -114,6 +116,22 @@ def __init__(self, options: Options, services: Services, **kwargs: object) -> No ) self.queue_arguments.setdefault("x-overflow", self.options.overflow) + meter = get_meter("saturn.metrics") + self.publish_bytes_counter = meter.create_counter( + name="saturn.topics.rabbitmq.publish", + unit="By", + description=""" + Total bytes sent on this topic. + """, + ) + self.received_bytes_counter = meter.create_counter( + name="saturn.topics.rabbitmq.received", + unit="By", + description=""" + Total bytes received on this topic. + """, + ) + async def open(self) -> None: await self.ensure_queue() @@ -131,6 +149,9 @@ async def run(self) -> AsyncGenerator[t.AsyncContextManager[TopicMessage], None] async for message in queue_iter: attempt = 0 yield self.message_context(message) + self.received_bytes_counter.add( + message.body_size, {"topic": self.name} + ) except Exception: self.logger.exception("Failed to consume") await self.backoff_sleep(attempt) @@ -167,6 +188,7 @@ async def publish( ), routing_key=self.options.routing_key or self.options.queue_name, ) + self.publish_bytes_counter.add(len(body), {"topic": self.name}) return True except aio_pika.exceptions.DeliveryError as e: # Only handle Nack