Skip to content

Commit

Permalink
Add additionnal topic output to manage error
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Aug 7, 2023
1 parent 8177e8c commit 3475317
Show file tree
Hide file tree
Showing 14 changed files with 351 additions and 57 deletions.
5 changes: 5 additions & 0 deletions example/definitions/simple.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ spec:
- topic: stdout
error::ZeroDivisionError:
- topic: logging-error
- error_handler:
set_handled: false
republish:
channel: default
max_retry: 5

pipeline:
name: example.pipelines.echo_with_error
Expand Down
6 changes: 4 additions & 2 deletions example/tests/pipeline_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ spec:
- outputs:
- channel: error::ZeroDivisionError
args:
cause: __any__
error:
type: ZeroDivisionError
type: builtins.ZeroDivisionError
module: 'example.pipelines'
message: division by zero
traceback: __any__
Expand All @@ -91,8 +92,9 @@ spec:
- outputs:
- channel: error:.*Ignorable Exception.*:Exception
args:
cause: __any__
error:
type: Exception
type: builtins.Exception
module: 'example.pipelines'
message: Ignorable Exception
traceback: __any__
Expand Down
17 changes: 16 additions & 1 deletion src/saturn_engine/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ class ComponentDefinition:
options: dict[str, Any] = field(default_factory=dict)


@dataclasses.dataclass
class RepublishOptions:
channel: str
max_retry: int


@dataclasses.dataclass
class ErrorHandler:
set_handled: bool = True
republish: t.Optional[RepublishOptions] = None


OutputDefinition = t.Union[ComponentDefinition, ErrorHandler]


@dataclasses.dataclass
class ResourceRateLimitItem:
rate_limits: list[str]
Expand Down Expand Up @@ -52,7 +67,7 @@ class ResourcesProviderItem:
class QueueItem:
name: JobId
pipeline: QueuePipeline
output: dict[str, list[ComponentDefinition]]
output: dict[str, list[OutputDefinition]]
input: ComponentDefinition
config: dict[str, Any] = field(default_factory=dict)
labels: dict[str, str] = field(default_factory=dict)
Expand Down
20 changes: 12 additions & 8 deletions src/saturn_engine/utils/tester/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from saturn_engine.utils.tester.json_utils import get_node_value
from saturn_engine.utils.tester.json_utils import normalize_json
from saturn_engine.utils.tester.json_utils import replace_node
from saturn_engine.worker.error_handling import HandledError
from saturn_engine.worker.error_handling import process_pipeline_exception
from saturn_engine.worker.executors.bootstrap import PipelineBootstrap
from saturn_engine.worker.pipeline_message import PipelineMessage
Expand Down Expand Up @@ -51,14 +52,17 @@ def run_saturn_pipeline_test(
# Shouldn't be None but we want to make mypy happy
assert exc_type and exc_value and exc_traceback # noqa: S101

pipeline_result = process_pipeline_exception(
queue=job_definition.template,
message=pipeline_message.message,
exc_type=exc_type,
exc_value=exc_value,
exc_traceback=exc_traceback,
)
if pipeline_result is None:
try:
process_pipeline_exception(
queue=job_definition.template,
message=pipeline_message.message,
exc_type=exc_type,
exc_value=exc_value,
exc_traceback=exc_traceback,
)
except HandledError as e:
pipeline_result = e.results
else:
raise

pipeline_results.append(
Expand Down
113 changes: 91 additions & 22 deletions src/saturn_engine/worker/error_handling.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import typing as t
from typing import Optional
from typing import Type

Expand All @@ -8,7 +9,10 @@
from textwrap import dedent
from types import TracebackType

from saturn_engine.core.api import ErrorHandler
from saturn_engine.core.api import OutputDefinition
from saturn_engine.core.api import QueueItem
from saturn_engine.core.api import RepublishOptions
from saturn_engine.core.error import ErrorMessageArgs
from saturn_engine.core.pipeline import PipelineOutput
from saturn_engine.core.pipeline import PipelineResults
Expand All @@ -17,6 +21,10 @@
from saturn_engine.worker.executors.bootstrap import RemoteException


class ErrorHandlerMeta(t.TypedDict):
retried: int


@dataclass
class ExceptionDetails:
message: str
Expand All @@ -34,19 +42,33 @@ class ExceptionFilter:
lineno: int


class HandledError(Exception):
def __init__(self, *, results: PipelineResults, handled: bool) -> None:
super().__init__("Error handled")
self.results = results
self.handled = handled

def reraise(self) -> None:
"""reraise self if handled, otherwise reraise the original exception"""
cause = self.__cause__
if self.handled or not cause:
raise self from self.__cause__
raise cause from cause.__cause__


def process_pipeline_exception(
*,
queue: QueueItem,
message: TopicMessage,
exc_type: Type[BaseException],
exc_value: BaseException,
exc_traceback: TracebackType,
) -> Optional[PipelineResults]:
) -> None:
outputs = queue.output
exc_details = get_exception_details(exc_type, exc_value, exc_traceback)

# Match the exception and consume it if found, else reraise the exception
for channel in outputs.keys():
for channel, handlers in outputs.items():
if not is_output_error_handler(channel):
continue

Expand All @@ -55,33 +77,80 @@ def process_pipeline_exception(

if not does_error_match(exception=exc_details, exception_filter=exc_filter):
continue

return PipelineResults(
outputs=[
PipelineOutput(
channel=channel,
message=TopicMessage(
args={
"cause": message,
"error": ErrorMessageArgs(
type=exc_details.exception_type,
module=exc_details.module,
message=exc_details.message,
traceback=exc_details.traceback,
),
}
),
)
],
resources=[],
)
except Exception:
logging.getLogger(__name__).exception(
"Failed to process error channel", extra={"data": {"channel": channel}}
)

output_msgs = [
PipelineOutput(
channel=channel,
message=TopicMessage(
args={
"cause": message,
"error": ErrorMessageArgs(
type=exc_details.exception_type,
module=exc_details.module,
message=exc_details.message,
traceback=exc_details.traceback,
),
}
),
)
]
handled = is_handled(handlers)
republish_outputs = republish_handlers(handlers)
if republish_outputs:
republished_msgs = republish(message, republish_outputs)
output_msgs.extend(republished_msgs)
if not republished_msgs:
handled = False

raise HandledError(
results=PipelineResults(
outputs=output_msgs,
resources=[],
),
handled=handled,
) from exc_value
return None


def republish(
message: TopicMessage, republish_handlers: list[RepublishOptions]
) -> list[PipelineOutput]:
meta: ErrorHandlerMeta = t.cast(
ErrorHandlerMeta, message.metadata.setdefault("error_handler", {})
)
retried: int = meta.setdefault("retried", 0)
meta["retried"] += 1

outputs = []

for handler in republish_handlers:
if retried >= handler.max_retry:
continue
outputs.append(
PipelineOutput(
channel=handler.channel,
message=message,
)
)
return outputs


def republish_handlers(
handlers: t.Iterable[OutputDefinition],
) -> list[RepublishOptions]:
return [
h.republish for h in handlers if isinstance(h, ErrorHandler) and h.republish
]


def is_handled(handlers: t.Iterable[OutputDefinition]) -> bool:
return all(h.set_handled for h in handlers if isinstance(h, ErrorHandler))


def does_error_match(
*, exception: ExceptionDetails, exception_filter: ExceptionFilter
) -> bool:
Expand Down
38 changes: 24 additions & 14 deletions src/saturn_engine/worker/executors/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from saturn_engine.utils.asyncutils import Cancellable
from saturn_engine.utils.asyncutils import TasksGroupRunner
from saturn_engine.utils.log import getLogger
from saturn_engine.worker.error_handling import HandledError
from saturn_engine.worker.error_handling import process_pipeline_exception
from saturn_engine.worker.resources.manager import ResourceUnavailable
from saturn_engine.worker.services import Services
Expand Down Expand Up @@ -73,26 +74,35 @@ async def scope(
assert ( # noqa: S101
exc_type and exc_value and exc_traceback
)
result = process_pipeline_exception(
process_pipeline_exception(
queue=xmsg.queue.definition,
message=xmsg.message.message,
exc_type=exc_type,
exc_value=exc_value,
exc_traceback=exc_traceback,
)
if not result:
raise
return result

results = await scope(processable)
self.consuming_tasks.create_task(
self.process_results(
xmsg=processable,
results=results,
# Transfer the message context to the results processing scope.
context=processable._context.pop_all(),
)
)
raise

results = None
error = None
try:
results = await scope(processable)
except HandledError as e:
results = e.results
error = e
finally:
if results:
self.consuming_tasks.create_task(
self.process_results(
xmsg=processable,
results=results,
# Transfer the message context to the results
# processing scope.
context=processable._context.pop_all(),
)
)
if error:
error.reraise()

async def process_results(
self,
Expand Down
5 changes: 4 additions & 1 deletion src/saturn_engine/worker/services/extras/sentry.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import typing as t

import contextlib
import logging
import os
from collections.abc import AsyncGenerator
Expand All @@ -16,6 +17,7 @@
from saturn_engine.utils.options import asdict
from saturn_engine.utils.traceback_data import FrameData
from saturn_engine.utils.traceback_data import TracebackData
from saturn_engine.worker.error_handling import HandledError
from saturn_engine.worker.executors.bootstrap import RemoteException
from saturn_engine.worker.executors.executable import ExecutableMessage
from saturn_engine.worker.executors.executable import ExecutableQueue
Expand Down Expand Up @@ -131,7 +133,8 @@ def _event_processor(event: Event, hint: Hint) -> Event:

scope.add_event_processor(_event_processor)
try:
yield
with contextlib.suppress(HandledError):
yield
except Exception as e:
self._capture_exception(e)

Expand Down
14 changes: 14 additions & 0 deletions src/saturn_engine/worker/services/loggers/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from saturn_engine.core import ResourceUsed
from saturn_engine.core import TopicMessage
from saturn_engine.core.api import QueueItem
from saturn_engine.worker.error_handling import HandledError
from saturn_engine.worker.executors.bootstrap import PipelineBootstrap
from saturn_engine.worker.executors.executable import ExecutableMessage
from saturn_engine.worker.executors.executable import ExecutableQueue
Expand Down Expand Up @@ -144,6 +145,19 @@ async def on_message_executed(
| executable_message_data(xmsg, verbose=self.verbose)
},
)
except HandledError as e:
self.message_logger.warning(
"Failed to execute message, error handled: %s: %s",
e.__cause__.__class__.__name__,
str(e.__cause__),
extra={
"data": {
"result": self.result_data(e.results),
}
| trace_info
| executable_message_data(xmsg, verbose=self.verbose)
},
)
except Exception:
self.message_logger.exception(
"Failed to execute message",
Expand Down
11 changes: 9 additions & 2 deletions src/saturn_engine/worker/services/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from saturn_engine.core import PipelineResults
from saturn_engine.utils.telemetry import get_timer
from saturn_engine.worker.error_handling import HandledError
from saturn_engine.worker.executors.executable import ExecutableMessage
from saturn_engine.worker.services.hooks import MessagePublished
from saturn_engine.worker.topic import Topic
Expand Down Expand Up @@ -86,8 +87,14 @@ async def on_message_executed(
self.message_counter.add(1, params | {"state": "executing"})
try:
with get_timer(self.message_duration).time(params):
results = yield
self.message_counter.add(1, params | {"state": "success"})
try:
results = yield
except HandledError as e:
results = e.results
self.message_counter.add(1, params | {"state": "failed_handled"})
else:
self.message_counter.add(1, params | {"state": "success"})

for resource in results.resources:
self.resource_counter.add(1, {"type": resource.type})

Expand Down
Loading

0 comments on commit 3475317

Please sign in to comment.