Skip to content

Commit

Permalink
Add metrics about bytes transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Nov 30, 2023
1 parent 23b47bb commit 11dc0b2
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
29 changes: 29 additions & 0 deletions src/saturn_engine/worker/executors/arq/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import asyncio
import dataclasses
import pickle

import aioredis.errors
from arq import create_pool
from arq.connections import ArqRedis
from arq.connections import RedisSettings
from arq.jobs import Job
from arq.jobs import JobStatus
from opentelemetry.metrics import get_meter

from saturn_engine.core import PipelineResults
from saturn_engine.utils.asyncutils import TasksGroup
Expand Down Expand Up @@ -46,11 +48,38 @@ def __init__(self, options: Options, services: Services) -> None:
self.options = options
self.config = LazyConfig([{ARQ_EXECUTOR_NAMESPACE: self.options}])

meter = get_meter("saturn.metrics")
self.execute_bytes = meter.create_counter(
name="saturn.executor.arq.execute",
unit="By",
description="""
Total bytes sent to the executor
""",
)

self.results_bytes = meter.create_counter(
name="saturn.executor.arq.results",
unit="By",
description="""
Total bytes received from the executor
""",
)

if services.s.hooks.executor_initialized:
self.logger.warning(
"ARQExecutor does not support executor_initialized hooks"
)

def serialize(self, obj: dict[str, t.Any]) -> bytes:
data = pickle.dumps(obj)
self.execute_bytes.add(len(data), {"executor": self.name})
return data

def deserialize(self, data: bytes) -> dict[str, t.Any]:
self.results_bytes.add(len(data), {"executor": self.name})
obj = pickle.loads(data)
return obj

@cached_property
async def redis_queue(self) -> ArqRedis:
return await create_pool(RedisSettings.from_dsn(self.options.redis_url))
Expand Down
22 changes: 22 additions & 0 deletions src/saturn_engine/worker/topics/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import aio_pika
import aio_pika.abc
import aio_pika.exceptions
from opentelemetry.metrics import get_meter

from saturn_engine.core import TopicMessage
from saturn_engine.utils.asyncutils import SharedLock
Expand Down Expand Up @@ -104,6 +105,7 @@ def __init__(self, options: Options, services: Services, **kwargs: object) -> No
self.attempt_by_message: LRUDefaultDict[str, int] = LRUDefaultDict(
cache_len=1024, default_factory=lambda: 0
)

self.queue_arguments: dict[str, t.Any] = self.options.arguments

if self.options.max_length:
Expand All @@ -114,6 +116,22 @@ def __init__(self, options: Options, services: Services, **kwargs: object) -> No
)
self.queue_arguments.setdefault("x-overflow", self.options.overflow)

meter = get_meter("saturn.metrics")
self.publish_bytes_counter = meter.create_counter(
name="saturn.topics.rabbitmq.publish",
unit="By",
description="""
Total bytes sent on this topic.
""",
)
self.received_bytes_counter = meter.create_counter(
name="saturn.topics.rabbitmq.received",
unit="By",
description="""
Total bytes received on this topic.
""",
)

async def open(self) -> None:
await self.ensure_queue()

Expand All @@ -131,6 +149,9 @@ async def run(self) -> AsyncGenerator[t.AsyncContextManager[TopicMessage], None]
async for message in queue_iter:
attempt = 0
yield self.message_context(message)
self.received_bytes_counter.add(
message.body_size, {"topic": self.name}
)
except Exception:
self.logger.exception("Failed to consume")
await self.backoff_sleep(attempt)
Expand Down Expand Up @@ -167,6 +188,7 @@ async def publish(
),
routing_key=self.options.routing_key or self.options.queue_name,
)
self.publish_bytes_counter.add(len(body), {"topic": self.name})
return True
except aio_pika.exceptions.DeliveryError as e:
# Only handle Nack
Expand Down

0 comments on commit 11dc0b2

Please sign in to comment.