Skip to content

Commit

Permalink
services(tracer,logger,sentry): Includes job labels in context
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Jun 29, 2023
1 parent cb274f3 commit cc11297
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 93 deletions.
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"pytest-mock",
"freezegun",
"opentelemetry-sdk",
"sentry-sdk",
]
mypy_packages = [
"pytest",
Expand Down
1 change: 0 additions & 1 deletion src/saturn_engine/default_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ class config(SaturnConfig):

class services_manager(ServicesManagerConfig):
services = [
"saturn_engine.worker.services.labels_propagator.LabelsPropagator",
"saturn_engine.worker.services.tracing.Tracer",
"saturn_engine.worker.services.metrics.Metrics",
"saturn_engine.worker.services.loggers.Logger",
Expand Down
95 changes: 59 additions & 36 deletions src/saturn_engine/worker/services/extras/sentry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from typing import Any
from typing import Optional
from typing import Type
import typing as t

import logging
import os
Expand All @@ -26,18 +24,43 @@
from .. import BaseServices
from .. import Service

Event = dict[str, Any]
Hint = dict[str, Any]
Event = dict[str, t.Any]
Hint = dict[str, t.Any]
ExcInfo = tuple[
Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]
t.Optional[t.Type[BaseException]],
t.Optional[BaseException],
t.Optional[TracebackType],
]


def set_event_tags_from_queue(event: dict, *, queue: QueueItem) -> None:
tags = event.setdefault("tags", {})
tags["saturn.job.name"] = queue.name
for k, v in queue.labels.items():
tags[f"saturn.job.labels.{k}"] = v


def set_event_tags_from_xmsg(event: dict, *, xmsg: ExecutableMessage) -> None:
tags = event.setdefault("tags", {})
tags["saturn.message.id"] = xmsg.id
tags["saturn.pipeline.name"] = xmsg.message.info.name
for k, v in xmsg.message.message.tags.items():
tags[f"saturn.message.tags.{k}"] = v
set_event_tags_from_queue(event, queue=xmsg.queue.definition)


def queue_data(queue: QueueItem) -> dict[str, t.Any]:
return {
"name": queue.name,
"labels": queue.labels,
}


class Sentry(Service[BaseServices, "Sentry.Options"]):
name = "sentry"

class Options:
dsn: Optional[str] = None
dsn: t.Optional[str] = None

async def open(self) -> None:
self.logger = logging.getLogger("saturn.extras.sentry")
Expand All @@ -52,7 +75,7 @@ async def open(self) -> None:
self.services.hooks.message_executed.register(self.on_message_executed)
self.services.hooks.message_published.register(self.on_message_published)

def on_before_send(self, event: Event, hint: Hint) -> Optional[Event]:
def on_before_send(self, event: Event, hint: Hint) -> t.Optional[Event]:
exc_info = hint.get("exc_info")
# RemoteException should have been unwrapped in one of the hooks,
# but some exception might have been catched by other integration such
Expand All @@ -78,10 +101,10 @@ async def on_work_queue_built(

def _event_processor(event: Event, hint: Hint) -> Event:
with capture_internal_exceptions():
tags = event.setdefault("tags", {})
tags["saturn_queue"] = item.name
extra = event.setdefault("extra", {})
extra["saturn-queue"] = asdict(item)
set_event_tags_from_queue(event, queue=item)
extra = event.setdefault("extra", {}).setdefault("saturn", {})
extra["job"] = queue_data(item)

return event

scope.add_event_processor(_event_processor)
Expand All @@ -98,11 +121,10 @@ async def on_message_executed(

def _event_processor(event: Event, hint: Hint) -> Event:
with capture_internal_exceptions():
tags = event.setdefault("tags", {})
tags["saturn_message"] = xmsg.id
tags["saturn_pipeline"] = message.info.name
extra = event.setdefault("extra", {})
extra["saturn-pipeline-message"] = asdict(message)
set_event_tags_from_xmsg(event, xmsg=xmsg)
extra = event.setdefault("extra", {}).setdefault("saturn", {})
extra["pipeline_message"] = asdict(message)
extra["job"] = queue_data(xmsg.queue.definition)
return event

scope.add_event_processor(_event_processor)
Expand All @@ -118,13 +140,12 @@ async def on_message_published(

def _event_processor(event: Event, hint: Hint) -> Event:
with capture_internal_exceptions():
tags = event.setdefault("tags", {})
tags["saturn_message"] = publish.output.message.id
tags["saturn_channel"] = publish.output.channel
tags["saturn_pipeline"] = publish.xmsg.message.info.name
extra = event.setdefault("extra", {})
extra["saturn-message"] = asdict(publish.output.message)
extra["saturn-pipeline-message"] = asdict(publish.xmsg.message)
set_event_tags_from_xmsg(event, xmsg=publish.xmsg)
event["tags"]["saturn.channel.name"] = publish.output.channel
extra = event.setdefault("extra", {}).setdefault("saturn", {})
extra["output_message"] = asdict(publish.output.message)
extra["pipeline_message"] = asdict(publish.xmsg.message)
extra["job"] = queue_data(publish.xmsg.queue.definition)
return event

scope.add_event_processor(_event_processor)
Expand Down Expand Up @@ -155,7 +176,9 @@ def _capture_exception(self, exc_info: Exception) -> None:
hub.capture_event(event, hint=hint)

@staticmethod
def _sanitize_message_resources(pipeline_message: dict[str, Any]) -> dict[str, Any]:
def _sanitize_message_resources(
pipeline_message: dict[str, t.Any]
) -> dict[str, t.Any]:
for resource_arg in pipeline_message.get("info", {}).get("resources", {}):
resource = (
pipeline_message.get("message", {}).get("args", {}).get(resource_arg)
Expand All @@ -168,7 +191,7 @@ def _sanitize_message_resources(pipeline_message: dict[str, Any]) -> dict[str, A

# Following functions are adapted from Sentry-sdk to support
# TracebackData instead of regular Traceback.
def walk_traceback_chain(cause: Optional[TracebackData]) -> Iterator[TracebackData]:
def walk_traceback_chain(cause: t.Optional[TracebackData]) -> Iterator[TracebackData]:
while cause:
yield cause

Expand All @@ -181,10 +204,10 @@ def walk_traceback_chain(cause: Optional[TracebackData]) -> Iterator[TracebackDa
def serialize_frame(
frame: FrameData,
with_locals: bool = True,
) -> dict[str, Any]:
) -> dict[str, t.Any]:
abs_path = frame.filename

rv: dict[str, Any] = {
rv: dict[str, t.Any] = {
"filename": abs_path,
"abs_path": os.path.abspath(abs_path) if abs_path else None,
"function": frame.name or "<unknown>",
Expand All @@ -202,9 +225,9 @@ def serialize_frame(

def single_exception_from_remote_error_tuple(
tb: TracebackData,
client_options: Optional[dict[str, Any]] = None,
mechanism: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
client_options: t.Optional[dict[str, t.Any]] = None,
mechanism: t.Optional[dict[str, t.Any]] = None,
) -> dict[str, t.Any]:
if client_options is None:
with_locals = True
else:
Expand All @@ -227,9 +250,9 @@ def single_exception_from_remote_error_tuple(

def exceptions_from_traceback(
tb_exc: TracebackData,
client_options: Optional[dict[str, Any]] = None,
mechanism: Optional[dict[str, Any]] = None,
) -> list[dict[str, Any]]:
client_options: t.Optional[dict[str, t.Any]] = None,
mechanism: t.Optional[dict[str, t.Any]] = None,
) -> list[dict[str, t.Any]]:
rv = []
for tb in walk_traceback_chain(tb_exc):
rv.append(
Expand All @@ -243,8 +266,8 @@ def exceptions_from_traceback(

def event_from_remote_exception(
exc: RemoteException,
client_options: Optional[dict[str, Any]] = None,
mechanism: Optional[dict[str, Any]] = None,
client_options: t.Optional[dict[str, t.Any]] = None,
mechanism: t.Optional[dict[str, t.Any]] = None,
) -> tuple[Event, Hint]:
return (
{
Expand Down
15 changes: 0 additions & 15 deletions src/saturn_engine/worker/services/labels_propagator.py

This file was deleted.

7 changes: 2 additions & 5 deletions src/saturn_engine/worker/services/loggers/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,11 @@
def executable_message_data(
xmsg: ExecutableMessage, *, verbose: bool = False
) -> dict[str, t.Any]:
labels_dict = {}
if labels := xmsg.message.message.metadata.get("labels"):
labels_dict = labels.copy()

labels = xmsg.queue.definition.labels.copy()
return pipeline_message_data(xmsg.message, verbose=verbose) | {
"job": xmsg.queue.name,
"input": xmsg.queue.definition.input.name,
"labels": labels_dict,
"labels": labels,
}


Expand Down
30 changes: 15 additions & 15 deletions src/saturn_engine/worker/services/tracing/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,25 @@ def on_pipeline_executed(

def executable_message_attributes(
xmsg: ExecutableMessage,
) -> Mapping[str, AttributeValue]:
return {
"saturn.job.name": xmsg.queue.name,
"saturn.input.name": xmsg.queue.definition.input.name,
} | pipeline_message_attributes(xmsg.message)


def pipeline_message_attributes(
message: PipelineMessage,
) -> Mapping[str, AttributeValue]:
return (
{
"saturn.message.id": message.id,
"saturn.resources.names": [n for n in message.resource_names if n],
"saturn.pipeline.name": message.info.name,
"saturn.job.name": xmsg.queue.name,
"saturn.input.name": xmsg.queue.definition.input.name,
}
| {f"saturn.message.tags.{k}": v for k, v in message.message.tags.items()}
| pipeline_message_attributes(xmsg.message)
| {
f"saturn.labels.{k}": str(v)
for k, v in message.message.metadata.get("labels", {}).items()
f"saturn.job.labels.{k}": str(v)
for k, v in xmsg.queue.definition.labels.items()
}
)


def pipeline_message_attributes(
message: PipelineMessage,
) -> Mapping[str, AttributeValue]:
return {
"saturn.message.id": message.id,
"saturn.resources.names": [n for n in message.resource_names if n],
"saturn.pipeline.name": message.info.name,
} | {f"saturn.message.tags.{k}": v for k, v in message.message.tags.items()}
17 changes: 1 addition & 16 deletions tests/worker/services/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import pytest

from saturn_engine.config import Config
from saturn_engine.core import MessageId
from saturn_engine.core import PipelineInfo
from saturn_engine.core import PipelineOutput
Expand All @@ -29,28 +28,14 @@ def fake_pipeline(x: int, r: FakeResource) -> None:
pass


@pytest.fixture
def config(config: Config) -> Config:
return config.load_object(
{
"services_manager": {
"services": [
"saturn_engine.worker.services.labels_propagator.LabelsPropagator",
]
}
}
)


@pytest.mark.asyncio
async def test_logger_message_executed(
services_manager: ServicesManager,
caplog: t.Any,
frozen_time: FreezeTime,
executable_maker: t.Callable[..., ExecutableMessage],
) -> None:
logger = services_manager._load_service(Logger)
await logger.open()
logger = services_manager.services.cast_service(Logger)

pipeline_info = PipelineInfo.from_pipeline(fake_pipeline)
xmsg = executable_maker(pipeline_info=pipeline_info)
Expand Down
Loading

0 comments on commit cc11297

Please sign in to comment.