Skip to content

Commit

Permalink
Merge pull request #13 from TariqAHassan/exception-parsing
Browse files Browse the repository at this point in the history
Exception Parsing
  • Loading branch information
TariqAHassan committed Jul 10, 2024
2 parents ab5c260 + 62ea2cf commit 7d54d6a
Show file tree
Hide file tree
Showing 45 changed files with 161 additions and 60 deletions.
2 changes: 1 addition & 1 deletion alsek/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from alsek.core.status import StatusTracker
from alsek.core.task import task

__version__: str = "0.2.2"
__version__: str = "0.3.0"
2 changes: 1 addition & 1 deletion alsek/cli/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
from types import ModuleType
from typing import Any, Iterable

from alsek._utils.logging import magic_logger
from alsek.core.task import Task
from alsek.exceptions import NoTasksFoundError, TaskNameCollisionError
from alsek.utils.logging import magic_logger

log = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion alsek/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import click

from alsek import __version__
from alsek._utils.logging import setup_logging
from alsek.cli._helpers import collect_tasks, parse_logging_level
from alsek.core.backoff import LinearBackoff
from alsek.core.worker import WorkerPool
from alsek.utils.logging import setup_logging


@click.command()
Expand Down
2 changes: 1 addition & 1 deletion alsek/core/backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from functools import lru_cache
from typing import Any, Optional, Type

from alsek._utils.printing import auto_repr
from alsek.utils.printing import auto_repr

BackoffSettingsType = dict[str, Any]

Expand Down
4 changes: 2 additions & 2 deletions alsek/core/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
from typing import Optional

from alsek._defaults import DEFAULT_TASK_TTL
from alsek._utils.logging import magic_logger
from alsek._utils.printing import auto_repr
from alsek.core.concurrency import Lock
from alsek.core.message import Message
from alsek.exceptions import MessageAlreadyExistsError, MessageDoesNotExistsError
from alsek.storage.backends import Backend
from alsek.utils.logging import magic_logger
from alsek.utils.printing import auto_repr

log = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion alsek/core/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from types import TracebackType
from typing import Optional, Type, cast

from alsek._utils.printing import auto_repr
from alsek.storage.backends import Backend
from alsek.utils.printing import auto_repr


class Lock:
Expand Down
2 changes: 1 addition & 1 deletion alsek/core/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
"""
from typing import Iterable, Optional, Union

from alsek._utils.system import StopSignalListener
from alsek.core.backoff import Backoff, ConstantBackoff, LinearBackoff
from alsek.core.broker import Broker
from alsek.core.concurrency import Lock
from alsek.core.message import Message
from alsek.storage.backends import Backend
from alsek.utils.system import StopSignalListener


class _ConsumptionMutex(Lock):
Expand Down
19 changes: 16 additions & 3 deletions alsek/core/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
import dill

from alsek import Message
from alsek._utils.logging import get_logger, setup_logging
from alsek._utils.system import thread_raise
from alsek._utils.temporal import utcnow_timestamp_ms
from alsek.core.status import TaskStatus
from alsek.core.task import Task
from alsek.utils.logging import get_logger, setup_logging
from alsek.utils.parsing import parse_exception
from alsek.utils.system import thread_raise
from alsek.utils.temporal import utcnow_timestamp_ms

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -156,13 +157,16 @@ def _wrapper(self) -> None:
log.info("Received %s...", self.message.summary)
self.task._update_status(self.message, status=TaskStatus.RUNNING)

self.task.pre_op(self.message)
result, exception = None, None
try:
result = self.task.execute(self.message)
self.message.update(exception_details=None) # clear any existing errors
log.info("Successfully processed %s.", self.message.summary)
except BaseException as error:
log.error("Error processing %s.", self.message.summary, exc_info=True)
exception = error
self.message.update(exception_details=parse_exception(exception).as_dict())

if self._wrapper_exit:
log.debug("Thread task future finished after termination.")
Expand All @@ -171,6 +175,9 @@ def _wrapper(self) -> None:
else:
_complete_future_handler(self.task, self.message, result=result)

# Post op is called here so that exception_details can be set
self.task.post_op(self.message, result=result)

self._wrapper_exit = True

def stop(self, exception: Type[BaseException]) -> None:
Expand Down Expand Up @@ -250,13 +257,16 @@ def _wrapper(
log.info("Received %s...", message.summary)
task._update_status(message, status=TaskStatus.RUNNING)

task.pre_op(message)
result, exception = None, None
try:
result = task.execute(message)
message.update(exception_details=None) # clear any existing errors
log.info("Successfully processed %s.", message.summary)
except BaseException as error:
log.error("Error processing %s.", message.summary, exc_info=True)
exception = error
message.update(exception_details=parse_exception(exception).as_dict())

if not wrapper_exit_queue.empty():
log.debug("Process task future finished after termination.")
Expand All @@ -265,6 +275,9 @@ def _wrapper(
else:
_complete_future_handler(task, message=message, result=result)

# Post op is called here so that exception_details can be set
task.post_op(message, result=result)

wrapper_exit_queue.put(1)

def _shutdown(self) -> None:
Expand Down
7 changes: 5 additions & 2 deletions alsek/core/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from uuid import uuid1

from alsek._defaults import DEFAULT_MECHANISM, DEFAULT_QUEUE, DEFAULT_TASK_TIMEOUT
from alsek._utils.printing import auto_repr
from alsek._utils.temporal import fromtimestamp_ms, utcnow_timestamp_ms
from alsek.core.backoff import ExponentialBackoff, settings2backoff
from alsek.core.concurrency import Lock
from alsek.types import SupportedMechanismType
from alsek.utils.printing import auto_repr
from alsek.utils.temporal import fromtimestamp_ms, utcnow_timestamp_ms


def _make_uuid() -> str:
Expand Down Expand Up @@ -86,6 +86,7 @@ def __init__(
args: Optional[Union[list[Any], tuple[Any, ...]]] = None,
kwargs: Optional[dict[Any, Any]] = None,
metadata: Optional[dict[Any, Any]] = None,
exception_details: Optional[dict[str, Any]] = None,
result_ttl: Optional[int] = None,
uuid: Optional[str] = None,
progenitor_uuid: Optional[str] = None,
Expand All @@ -105,6 +106,7 @@ def __init__(
self.args = tuple(args) if args else tuple()
self.kwargs = kwargs or dict()
self.metadata = metadata
self.exception_details = exception_details
self.result_ttl = result_ttl
self.retries = retries
self.timeout = timeout
Expand Down Expand Up @@ -135,6 +137,7 @@ def data(self) -> dict[str, Any]:
args=self.args,
kwargs=self.kwargs,
metadata=self.metadata,
exception_details=self.exception_details,
result_ttl=self.result_ttl,
uuid=self.uuid,
progenitor_uuid=self.progenitor_uuid,
Expand Down
16 changes: 8 additions & 8 deletions alsek/core/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from enum import Enum
from typing import Any, Iterable, Optional, Union, NamedTuple
from typing import Any, Iterable, NamedTuple, Optional, Union

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
Expand Down Expand Up @@ -35,12 +35,12 @@ class StatusUpdate(NamedTuple):
"""Status information."""

status: TaskStatus
detail: Optional[Any]
details: Optional[Any]

def as_dict(self) -> dict[str, Any]:
return dict(
status=self.status.name,
detail=self.detail,
details=self.details,
)


Expand Down Expand Up @@ -177,7 +177,7 @@ def listen_to_updates(
if i.get("type", "").lower() == "message":
update = StatusUpdate(
status=TaskStatus[i["data"]["status"]],
detail=i["data"]["detail"],
details=i["data"]["details"],
)
yield update
if auto_exit and update.status in TERMINAL_TASK_STATUSES:
Expand All @@ -187,20 +187,20 @@ def set(
self,
message: Message,
status: TaskStatus,
detail: Optional[Any] = None,
details: Optional[Any] = None,
) -> None:
"""Set a ``status`` for ``message``.
Args:
message (Message): an Alsek message
status (TaskStatus): a status to set
detail (Any, optional): additional information about the status (e.g., progress percentage)
details (Any, optional): additional information about the status (e.g., progress percentage)
Returns:
None
"""
update = StatusUpdate(status=status, detail=detail)
update = StatusUpdate(status=status, details=details)
self._backend.set(
self.get_storage_name(message),
value=update.as_dict(),
Expand All @@ -222,7 +222,7 @@ def get(self, message: Message) -> StatusUpdate:
value = self._backend.get(self.get_storage_name(message))
return StatusUpdate(
status=TaskStatus[value["status"]],
detail=value["detail"],
details=value["details"],
)

def delete(self, message: Message, check: bool = True) -> None:
Expand Down
9 changes: 3 additions & 6 deletions alsek/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
DEFAULT_QUEUE,
DEFAULT_TASK_TIMEOUT,
)
from alsek._utils.aggregation import gather_init_params
from alsek._utils.printing import auto_repr
from alsek.core.backoff import Backoff, ConstantBackoff, ExponentialBackoff
from alsek.core.broker import Broker
from alsek.core.message import Message
from alsek.core.status import StatusTracker, TaskStatus
from alsek.exceptions import SchedulingError, ValidationError
from alsek.storage.result import ResultStore
from alsek.types import SUPPORTED_MECHANISMS, SupportedMechanismType
from alsek.utils.aggregation import gather_init_params
from alsek.utils.printing import auto_repr

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -381,10 +381,7 @@ def execute(self, message: Message) -> Any:
result (Any): output of ``op()``
"""
self.pre_op(message)
result = self.op(message)
self.post_op(message, result=result)
return result
return self.op(message)

def do_retry(self, message: Message, exception: BaseException) -> bool: # noqa
"""Whether a failed task should be retried.
Expand Down
8 changes: 4 additions & 4 deletions alsek/core/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
from apscheduler.triggers.interval import IntervalTrigger

from alsek import __version__
from alsek._utils.checks import has_duplicates
from alsek._utils.logging import magic_logger
from alsek._utils.sorting import dict_sort
from alsek._utils.system import smart_cpu_count
from alsek.core.broker import Broker
from alsek.core.consumer import Consumer
from alsek.core.futures import (
Expand All @@ -27,6 +23,10 @@
from alsek.core.task import Task
from alsek.exceptions import MultipleBrokersError, NoTasksFoundError, TerminationError
from alsek.types import SupportedMechanismType
from alsek.utils.checks import has_duplicates
from alsek.utils.logging import magic_logger
from alsek.utils.sorting import dict_sort
from alsek.utils.system import smart_cpu_count

log = logging.getLogger(__name__)

Expand Down
4 changes: 2 additions & 2 deletions alsek/storage/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import dill

from alsek._defaults import DEFAULT_NAMESPACE
from alsek._utils.aggregation import gather_init_params
from alsek._utils.printing import auto_repr
from alsek.storage.serialization import JsonSerializer, Serializer
from alsek.utils.aggregation import gather_init_params
from alsek.utils.printing import auto_repr

log = logging.getLogger(__name__)

Expand Down
4 changes: 2 additions & 2 deletions alsek/storage/backends/disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from typing import Any, Callable, Iterable, Optional, Union, cast

from alsek._defaults import DEFAULT_NAMESPACE
from alsek._utils.printing import auto_repr
from alsek._utils.string import name_matcher
from alsek.storage.backends import Backend, LazyClient
from alsek.storage.serialization import JsonSerializer, Serializer
from alsek.utils.printing import auto_repr
from alsek.utils.string import name_matcher

try:
from diskcache import Cache as DiskCache
Expand Down
5 changes: 3 additions & 2 deletions alsek/storage/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
from redis import ConnectionPool, Redis

from alsek._defaults import DEFAULT_NAMESPACE
from alsek._utils.aggregation import gather_init_params
from alsek._utils.printing import auto_repr
from alsek.storage.backends import Backend, LazyClient
from alsek.storage.serialization import JsonSerializer, Serializer
from alsek.utils.aggregation import gather_init_params
from alsek.utils.printing import auto_repr


class RedisBackend(Backend):
"""Redis Backend.
Expand Down
4 changes: 2 additions & 2 deletions alsek/storage/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
"""
from typing import Any, Iterable, Union

from alsek._utils.temporal import utcnow_timestamp_ms
from alsek._utils.waiting import waiter
from alsek.core.message import Message
from alsek.storage.backends import Backend
from alsek.utils.temporal import utcnow_timestamp_ms
from alsek.utils.waiting import waiter

_GET_RESULT_WAIT_SLEEP_INTERVAL: int = 500

Expand Down
2 changes: 1 addition & 1 deletion alsek/storage/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import dill

from alsek._utils.printing import auto_repr
from alsek.utils.printing import auto_repr


class Serializer(ABC):
Expand Down
4 changes: 2 additions & 2 deletions alsek/tools/iteration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
"""
from typing import Any, Iterable

from alsek._utils.checks import has_duplicates
from alsek._utils.system import StopSignalListener
from alsek.core.message import Message
from alsek.exceptions import ValidationError
from alsek.storage.result import ResultStore
from alsek.utils.checks import has_duplicates
from alsek.utils.system import StopSignalListener


def _idx_drop(items: list[Any], indexes: set[int]) -> list[Any]:
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 7d54d6a

Please sign in to comment.