Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics about bytes transfer #380

Merged
merged 1 commit into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion src/saturn_engine/worker/executors/arq/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import asyncio
import dataclasses
import pickle # noqa: S403

import aioredis.errors
from arq import create_pool
from arq.connections import ArqRedis
from arq.connections import RedisSettings
from arq.jobs import Job
from arq.jobs import JobStatus
from opentelemetry.metrics import get_meter

from saturn_engine.core import PipelineResults
from saturn_engine.utils.asyncutils import TasksGroup
Expand Down Expand Up @@ -46,14 +48,45 @@ def __init__(self, options: Options, services: Services) -> None:
self.options = options
self.config = LazyConfig([{ARQ_EXECUTOR_NAMESPACE: self.options}])

meter = get_meter("saturn.metrics")
self.execute_bytes = meter.create_counter(
name="saturn.executor.arq.execute",
unit="By",
description="""
Total bytes sent to the executor
""",
)

self.results_bytes = meter.create_counter(
name="saturn.executor.arq.results",
unit="By",
description="""
Total bytes received from the executor
""",
)

if services.s.hooks.executor_initialized:
self.logger.warning(
"ARQExecutor does not support executor_initialized hooks"
)

def serialize(self, obj: dict[str, t.Any]) -> bytes:
data = pickle.dumps(obj)
self.execute_bytes.add(len(data), {"executor": self.name})
return data

def deserialize(self, data: bytes) -> dict[str, t.Any]:
self.results_bytes.add(len(data), {"executor": self.name})
obj = pickle.loads(data) # noqa: S301
return obj

@cached_property
async def redis_queue(self) -> ArqRedis:
return await create_pool(RedisSettings.from_dsn(self.options.redis_url))
return await create_pool(
RedisSettings.from_dsn(self.options.redis_url),
job_serializer=self.serialize,
job_deserializer=self.deserialize,
)

async def process_message(self, message: ExecutableMessage) -> PipelineResults:
config = self.config.load_object(message.config)
Expand Down
22 changes: 22 additions & 0 deletions src/saturn_engine/worker/topics/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import aio_pika
import aio_pika.abc
import aio_pika.exceptions
from opentelemetry.metrics import get_meter

from saturn_engine.core import TopicMessage
from saturn_engine.utils.asyncutils import SharedLock
Expand Down Expand Up @@ -104,6 +105,7 @@ def __init__(self, options: Options, services: Services, **kwargs: object) -> No
self.attempt_by_message: LRUDefaultDict[str, int] = LRUDefaultDict(
cache_len=1024, default_factory=lambda: 0
)

self.queue_arguments: dict[str, t.Any] = self.options.arguments

if self.options.max_length:
Expand All @@ -114,6 +116,22 @@ def __init__(self, options: Options, services: Services, **kwargs: object) -> No
)
self.queue_arguments.setdefault("x-overflow", self.options.overflow)

meter = get_meter("saturn.metrics")
self.publish_bytes_counter = meter.create_counter(
name="saturn.topics.rabbitmq.publish",
unit="By",
description="""
Total bytes sent on this topic.
""",
)
self.received_bytes_counter = meter.create_counter(
name="saturn.topics.rabbitmq.received",
unit="By",
description="""
Total bytes received on this topic.
""",
)

async def open(self) -> None:
await self.ensure_queue()

Expand All @@ -131,6 +149,9 @@ async def run(self) -> AsyncGenerator[t.AsyncContextManager[TopicMessage], None]
async for message in queue_iter:
attempt = 0
yield self.message_context(message)
self.received_bytes_counter.add(
message.body_size, {"topic": self.name}
)
except Exception:
self.logger.exception("Failed to consume")
await self.backoff_sleep(attempt)
Expand Down Expand Up @@ -167,6 +188,7 @@ async def publish(
),
routing_key=self.options.routing_key or self.options.queue_name,
)
self.publish_bytes_counter.add(len(body), {"topic": self.name})
return True
except aio_pika.exceptions.DeliveryError as e:
# Only handle Nack
Expand Down
Loading