Skip to content

Commit

Permalink
Merge pull request #15 from TariqAHassan/feature/revoke-tasks
Browse files Browse the repository at this point in the history
Feature: Revoke Tasks
  • Loading branch information
TariqAHassan committed Jul 21, 2024
2 parents a83e2d4 + bb3d470 commit 9b9b613
Show file tree
Hide file tree
Showing 17 changed files with 169 additions and 27 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ capable while remaining very lightweight. Current functionality includes:
* Interactive Result Iteration
* Cron, date and interval task triggers (with result storage support)
* Robust task timeouts
* Revoking tasks
* Callbacks
* Dead Letter Queues (DLQs)

Expand Down
2 changes: 1 addition & 1 deletion alsek/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from alsek.core.status import StatusTracker
from alsek.core.task import task

__version__: str = "0.3.1"
__version__: str = "0.4.0"
3 changes: 2 additions & 1 deletion alsek/_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
DEFAULT_QUEUE: str = "alsek_queue"
DEFAULT_NAMESPACE: str = "alsek"
DEFAULT_MAX_RETRIES: int = 3
DEFAULT_TASK_TTL: int = 60 * 60 * 24 * 7 * 1000
DEFAULT_TASK_TIMEOUT: int = 60 * 60 * 1000
DEFAULT_MECHANISM: SupportedMechanismType = "process"

DEFAULT_TTL: int = 60 * 60 * 24 * 7 * 1000
19 changes: 13 additions & 6 deletions alsek/core/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
from typing import Optional

from alsek._defaults import DEFAULT_TASK_TTL
from alsek._defaults import DEFAULT_TTL
from alsek.core.concurrency import Lock
from alsek.core.message import Message
from alsek.exceptions import MessageAlreadyExistsError, MessageDoesNotExistsError
Expand All @@ -28,12 +28,16 @@ class Broker:
"""

def __init__(self, backend: Backend, dlq_ttl: Optional[int] = None) -> None:
def __init__(self, backend: Backend, dlq_ttl: Optional[int] = DEFAULT_TTL) -> None:
self.backend = backend
self.dlq_ttl = dlq_ttl

def __repr__(self) -> str:
return auto_repr(self, backend=self.backend, dlq_ttl=self.dlq_ttl)
return auto_repr(
self,
backend=self.backend,
dlq_ttl=self.dlq_ttl,
)

@staticmethod
def get_subnamespace(
Expand Down Expand Up @@ -94,7 +98,7 @@ def exists(self, message: Message) -> bool:
before=lambda message: log.debug("Submitting %s...", message.summary),
after=lambda input_: log.debug("Submitted %s.", input_["message"].summary),
)
def submit(self, message: Message, ttl: int = DEFAULT_TASK_TTL) -> None:
def submit(self, message: Message, ttl: int = DEFAULT_TTL) -> None:
"""Submit a message for processing.
Args:
Expand Down Expand Up @@ -163,7 +167,7 @@ def remove(self, message: Message) -> None:
None
"""
self.backend.delete(self.get_message_name(message))
self.backend.delete(self.get_message_name(message), missing_ok=True)
self._clear_lock(message)

@magic_logger(
Expand Down Expand Up @@ -202,6 +206,9 @@ def nack(self, message: Message) -> None:
"""
self._clear_lock(message)

def _make_dlq_key_name(self, message: Message) -> str:
return f"dtq:{self.get_message_name(message)}"

@magic_logger(
before=lambda message: log.debug("Failing %s...", message.summary),
after=lambda input_: log.debug("Failed %s.", input_["message"].summary),
Expand All @@ -221,7 +228,7 @@ def fail(self, message: Message) -> None:
self.ack(message)
if self.dlq_ttl:
self.backend.set(
f"dlq:{self.get_message_name(message)}",
self._make_dlq_key_name(message),
value=message.data,
ttl=self.dlq_ttl,
)
Expand Down
59 changes: 58 additions & 1 deletion alsek/core/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
Futures
"""
from __future__ import annotations

import logging
import os
from abc import ABC, abstractmethod
from copy import deepcopy
from platform import python_implementation
from threading import Thread
from threading import Event, Thread
from typing import Any, Type, cast

import dill

from alsek import Message
from alsek.core.status import TaskStatus
from alsek.core.task import Task
from alsek.exceptions import RevokedError
from alsek.utils.logging import get_logger, setup_logging
from alsek.utils.parsing import parse_exception
from alsek.utils.system import thread_raise
Expand Down Expand Up @@ -64,6 +67,9 @@ def _retry_future_handler(
message: Message,
exception: BaseException,
) -> None:
if task.is_revoked(message):
return None

if task.do_retry(message, exception=exception):
task._update_status(message, status=TaskStatus.RETRYING)
task.broker.retry(message)
Expand All @@ -74,6 +80,9 @@ def _retry_future_handler(


def _complete_future_handler(task: Task, message: Message, result: Any) -> None:
if task.is_revoked(message):
return None

if task.result_store:
task.result_store.set(message, result=result)
if message.callback_message_data and task.do_callback(message, result=result):
Expand Down Expand Up @@ -104,6 +113,12 @@ def __init__(self, task: Task, message: Message) -> None:

self.created_at = utcnow_timestamp_ms()

self._revocation_stop_event = Event()
self._revocation_scan_thread = Thread(
target=self._revocation_scan,
daemon=True,
)

@property
@abstractmethod
def complete(self) -> bool:
Expand Down Expand Up @@ -131,6 +146,26 @@ def stop(self, exception: Type[BaseException]) -> None:
"""
raise NotImplementedError()

def _revocation_scan(self, check_interval: int | float = 0.5) -> None:
while not self.complete and not self._revocation_stop_event.is_set():
if self.task.is_revoked(self.message):
self.stop(RevokedError)
break
self._revocation_stop_event.wait(check_interval)

def clean_up(self, ignore_errors: bool = False) -> None:
try:
self._revocation_stop_event.set()
self._revocation_scan_thread.join(timeout=0)
except BaseException as error: # noqa
log.error(
"Clean up error encountered for task %s with message %s.",
self.task.name,
self.message.summary,
)
if not ignore_errors:
raise error


class ThreadTaskFuture(TaskFuture):
"""Future for task execution in a separate thread.
Expand All @@ -148,6 +183,10 @@ def __init__(self, task: Task, message: Message) -> None:
self._thread = Thread(target=self._wrapper, daemon=True)
self._thread.start()

# Note: this must go here b/c the scan depends on
# `.complete`, which in turn depends on `_thread`.
self._revocation_scan_thread.start()

@property
def complete(self) -> bool:
"""Whether the task has finished."""
Expand All @@ -161,6 +200,13 @@ def _wrapper(self) -> None:
result, exception = None, None
try:
result = self.task.execute(self.message)
if self.task.is_revoked(self.message):
log.info(
"Result for %s recovered after revocation. Discarding.",
self.message.summary,
)
return None

self.message.update(exception_details=None) # clear any existing errors
log.info("Successfully processed %s.", self.message.summary)
except BaseException as error:
Expand Down Expand Up @@ -241,6 +287,10 @@ def __init__(self, task: Task, message: Message, patience: int = 1 * 1000) -> No
)
self._process.start()

# Note: this must go here b/c the scan depends on
# `.complete`, which in turn depends on `_process`.
self._revocation_scan_thread.start()

@property
def complete(self) -> bool:
"""Whether the task has finished."""
Expand All @@ -261,6 +311,13 @@ def _wrapper(
result, exception = None, None
try:
result = task.execute(message)
if task.is_revoked(message):
log.info(
"Result for %s recovered after revocation. Discarding.",
message.summary,
)
return None

message.update(exception_details=None) # clear any existing errors
log.info("Successfully processed %s.", message.summary)
except BaseException as error:
Expand Down
2 changes: 2 additions & 0 deletions alsek/core/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class Message:
the task's function during the execution of ``op()``
metadata (dict, optional): a dictionary of user-defined message metadata.
This can store any data types supported by the backend's serializer.
exception_details (dict, optional): a dictionary of information about exceptions
with ``name``, ``text`` and ``traceback`` information. See ``ExceptionDetails()``.
result_ttl (int, optional): time to live (in milliseconds) for the
result in the result store. If a result store is provided and
this parameter is ``None``, the result will be persisted indefinitely.
Expand Down
5 changes: 3 additions & 2 deletions alsek/core/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger

from alsek._defaults import DEFAULT_TTL
from alsek.core.broker import Broker
from alsek.core.message import Message
from alsek.exceptions import ValidationError
Expand Down Expand Up @@ -56,7 +57,7 @@ class StatusTracker:
broker (Broker): broker used by tasks.
ttl (int, optional): time to live (in milliseconds) for the status
enable_pubsub (bool, optional): if ``True`` automatically publish PUBSUB updates.
If ``None`` determine automatically given the capabilies of the backend
If ``None`` determine automatically given the capabilities of the backend
used by ``broker``.
integrity_scan_trigger (CronTrigger, DateTrigger, IntervalTrigger, optional):
trigger which determines how often to scan for messages with non-terminal
Expand All @@ -69,7 +70,7 @@ class StatusTracker:
def __init__(
self,
broker: Broker,
ttl: Optional[int] = 60 * 60 * 24 * 7 * 1000,
ttl: Optional[int] = DEFAULT_TTL,
enable_pubsub: Optional[bool] = None,
integrity_scan_trigger: Optional[
Union[CronTrigger, DateTrigger, IntervalTrigger]
Expand Down
56 changes: 54 additions & 2 deletions alsek/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@
DEFAULT_MECHANISM,
DEFAULT_QUEUE,
DEFAULT_TASK_TIMEOUT,
DEFAULT_TTL,
)
from alsek.core.backoff import Backoff, ConstantBackoff, ExponentialBackoff
from alsek.core.broker import Broker
from alsek.core.message import Message
from alsek.core.status import StatusTracker, TaskStatus
from alsek.exceptions import SchedulingError, ValidationError
from alsek.exceptions import RevokedError, SchedulingError, ValidationError
from alsek.storage.result import ResultStore
from alsek.types import SUPPORTED_MECHANISMS, SupportedMechanismType
from alsek.utils.aggregation import gather_init_params
from alsek.utils.logging import magic_logger
from alsek.utils.parsing import ExceptionDetails
from alsek.utils.printing import auto_repr

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -394,6 +397,8 @@ def do_retry(self, message: Message, exception: BaseException) -> bool: # noqa
bool
"""
if self.is_revoked(message):
return False
return self.max_retries is None or message.retries < self.max_retries

def do_callback(self, message: Message, result: Any) -> bool: # noqa
Expand All @@ -408,11 +413,58 @@ def do_callback(self, message: Message, result: Any) -> bool: # noqa
Warning:
* If the task message does not have a callback this
this method will *not* be invoked.
method will *not* be invoked.
"""
return True

def _make_revoked_key_name(self, message: Message) -> str:
return f"revoked:{self.broker.get_message_name(message)}"

@magic_logger(
before=lambda message: log.info("Revoking %s...", message.summary),
after=lambda input_: log.debug("Revoked %s.", input_["message"].summary),
)
def revoke(self, message: Message) -> None:
"""Revoke the task.
Args:
message (Message): message to revoke
Returns:
None
"""
self.broker.backend.set(
self._make_revoked_key_name(message),
value=True,
ttl=DEFAULT_TTL,
)
message.update(
exception_details=ExceptionDetails(
name=RevokedError.__name__,
text="task revoked",
traceback=None,
).as_dict()
)
self._update_status(message, status=TaskStatus.FAILED)
self.broker.fail(message)

def is_revoked(self, message: Message) -> bool:
"""Check if a message is revoked.
Args:
message (Message): an Alsek message
Returns:
None
"""
if self.broker.backend.get(self._make_revoked_key_name(message)):
return True
else:
return False


class TriggerTask(Task):
"""Triggered Task.
Expand Down
2 changes: 2 additions & 0 deletions alsek/core/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def _stop_all_futures(self) -> None:
for futures in self._futures.values():
for future in futures:
future.stop(TerminationError)
future.clean_up(ignore_errors=True)

def _manage_futures(self) -> None:
for mechanism, futures in self._futures.items():
Expand All @@ -135,6 +136,7 @@ def _manage_futures(self) -> None:
to_remove.append(future)
elif future.time_limit_exceeded:
future.stop(TimeoutError)
future.clean_up(ignore_errors=True)
to_remove.append(future)

for f in to_remove:
Expand Down
6 changes: 5 additions & 1 deletion alsek/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class MessageAlreadyExistsError(AlsekError):


class MessageDoesNotExistsError(AlsekError):
"""Message does not exists in backend."""
"""Message does not exist in backend."""


class MultipleBrokersError(AlsekError):
Expand All @@ -39,3 +39,7 @@ class TaskNameCollisionError(AlsekError):

class TerminationError(AlsekError):
"""Alsek termination error."""


class RevokedError(AlsekError):
"""Alsek task revoked error."""
6 changes: 3 additions & 3 deletions alsek/utils/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
"""
import traceback
from typing import NamedTuple
from typing import NamedTuple, Optional


class ExceptionDetails(NamedTuple):
name: str
text: str
traceback: str
text: Optional[str] = None
traceback: Optional[str] = None

def as_dict(self) -> dict[str, str]:
return self._asdict()
Expand Down
Loading

0 comments on commit 9b9b613

Please sign in to comment.