Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Revoke Tasks #15

Merged
merged 48 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
094722d
Add aborted error
TariqAHassan Jul 21, 2024
487fb6e
Fix typo
TariqAHassan Jul 21, 2024
660234d
Update
TariqAHassan Jul 21, 2024
371687d
Add support for revoking a task
TariqAHassan Jul 21, 2024
3320b53
Fix
TariqAHassan Jul 21, 2024
730d939
Add default TTL
TariqAHassan Jul 21, 2024
a1446d3
Fix
TariqAHassan Jul 21, 2024
eadfd35
Fix
TariqAHassan Jul 21, 2024
28d096c
Refactor
TariqAHassan Jul 21, 2024
4621f1a
Reduce duplication for `DEFAULT_TTL`
TariqAHassan Jul 21, 2024
98ec023
Add `is_revoked()`
TariqAHassan Jul 21, 2024
56547bb
Simplify
TariqAHassan Jul 21, 2024
c37b37f
Refactor
TariqAHassan Jul 21, 2024
715b560
Add `revoke()` method to task
TariqAHassan Jul 21, 2024
630515b
Refactor
TariqAHassan Jul 21, 2024
afc53e4
Fix
TariqAHassan Jul 21, 2024
d83d9d4
Make the traceback and text of an exception optional
TariqAHassan Jul 21, 2024
ffda70f
Add exception details before revoking a task
TariqAHassan Jul 21, 2024
970e003
Fix
TariqAHassan Jul 21, 2024
8914632
Improve robustness
TariqAHassan Jul 21, 2024
34ac1d2
Revert
TariqAHassan Jul 21, 2024
9f588fa
Add `_revocation_scan`
TariqAHassan Jul 21, 2024
d9bc55e
Fix formatting
TariqAHassan Jul 21, 2024
359ed79
Add `clean_up()` method to `MockFuture()`
TariqAHassan Jul 21, 2024
9cbbce6
Fix typos
TariqAHassan Jul 21, 2024
865be92
Fix
TariqAHassan Jul 21, 2024
a1b84c3
Fix
TariqAHassan Jul 21, 2024
24e9fda
Fix
TariqAHassan Jul 21, 2024
f3eeaca
Improve robustness
TariqAHassan Jul 21, 2024
c59c1b6
Fix
TariqAHassan Jul 21, 2024
94f0440
Add checks
TariqAHassan Jul 21, 2024
e9a1538
Update docstring
TariqAHassan Jul 21, 2024
0f2c389
Add test for `exception_details`
TariqAHassan Jul 21, 2024
e38fcab
Fixes
TariqAHassan Jul 21, 2024
75f8a83
Add `test_task_revoke()`
TariqAHassan Jul 21, 2024
dde7390
Update overview
TariqAHassan Jul 21, 2024
57e641e
Update overview
TariqAHassan Jul 21, 2024
c6e661a
Fix import order
TariqAHassan Jul 21, 2024
52ef62e
Improve check
TariqAHassan Jul 21, 2024
7e57e28
Bump version
TariqAHassan Jul 21, 2024
261257e
Fix typo
TariqAHassan Jul 21, 2024
77edb1e
Add logging
TariqAHassan Jul 21, 2024
1b9fed3
Add logging
TariqAHassan Jul 21, 2024
0eb8798
Add logging
TariqAHassan Jul 21, 2024
bc25bab
Fix scan
TariqAHassan Jul 21, 2024
c3d9c99
Add note
TariqAHassan Jul 21, 2024
06c9328
Fix
TariqAHassan Jul 21, 2024
bb3d470
Improve robustness
TariqAHassan Jul 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ capable while remaining very lightweight. Current functionality includes:
* Interactive Result Iteration
* Cron, date and interval task triggers (with result storage support)
* Robust task timeouts
* Revoking tasks
* Callbacks
* Dead Letter Queues (DLQs)

Expand Down
2 changes: 1 addition & 1 deletion alsek/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from alsek.core.status import StatusTracker
from alsek.core.task import task

__version__: str = "0.3.1"
__version__: str = "0.4.0"
3 changes: 2 additions & 1 deletion alsek/_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
DEFAULT_QUEUE: str = "alsek_queue"
DEFAULT_NAMESPACE: str = "alsek"
DEFAULT_MAX_RETRIES: int = 3
DEFAULT_TASK_TTL: int = 60 * 60 * 24 * 7 * 1000
DEFAULT_TASK_TIMEOUT: int = 60 * 60 * 1000
DEFAULT_MECHANISM: SupportedMechanismType = "process"

DEFAULT_TTL: int = 60 * 60 * 24 * 7 * 1000
19 changes: 13 additions & 6 deletions alsek/core/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
from typing import Optional

from alsek._defaults import DEFAULT_TASK_TTL
from alsek._defaults import DEFAULT_TTL
from alsek.core.concurrency import Lock
from alsek.core.message import Message
from alsek.exceptions import MessageAlreadyExistsError, MessageDoesNotExistsError
Expand All @@ -28,12 +28,16 @@ class Broker:

"""

def __init__(self, backend: Backend, dlq_ttl: Optional[int] = None) -> None:
def __init__(self, backend: Backend, dlq_ttl: Optional[int] = DEFAULT_TTL) -> None:
self.backend = backend
self.dlq_ttl = dlq_ttl

def __repr__(self) -> str:
return auto_repr(self, backend=self.backend, dlq_ttl=self.dlq_ttl)
return auto_repr(
self,
backend=self.backend,
dlq_ttl=self.dlq_ttl,
)

@staticmethod
def get_subnamespace(
Expand Down Expand Up @@ -94,7 +98,7 @@ def exists(self, message: Message) -> bool:
before=lambda message: log.debug("Submitting %s...", message.summary),
after=lambda input_: log.debug("Submitted %s.", input_["message"].summary),
)
def submit(self, message: Message, ttl: int = DEFAULT_TASK_TTL) -> None:
def submit(self, message: Message, ttl: int = DEFAULT_TTL) -> None:
"""Submit a message for processing.

Args:
Expand Down Expand Up @@ -163,7 +167,7 @@ def remove(self, message: Message) -> None:
None

"""
self.backend.delete(self.get_message_name(message))
self.backend.delete(self.get_message_name(message), missing_ok=True)
self._clear_lock(message)

@magic_logger(
Expand Down Expand Up @@ -202,6 +206,9 @@ def nack(self, message: Message) -> None:
"""
self._clear_lock(message)

def _make_dlq_key_name(self, message: Message) -> str:
return f"dtq:{self.get_message_name(message)}"

@magic_logger(
before=lambda message: log.debug("Failing %s...", message.summary),
after=lambda input_: log.debug("Failed %s.", input_["message"].summary),
Expand All @@ -221,7 +228,7 @@ def fail(self, message: Message) -> None:
self.ack(message)
if self.dlq_ttl:
self.backend.set(
f"dlq:{self.get_message_name(message)}",
self._make_dlq_key_name(message),
value=message.data,
ttl=self.dlq_ttl,
)
Expand Down
59 changes: 58 additions & 1 deletion alsek/core/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
Futures

"""
from __future__ import annotations

import logging
import os
from abc import ABC, abstractmethod
from copy import deepcopy
from platform import python_implementation
from threading import Thread
from threading import Event, Thread
from typing import Any, Type, cast

import dill

from alsek import Message
from alsek.core.status import TaskStatus
from alsek.core.task import Task
from alsek.exceptions import RevokedError
from alsek.utils.logging import get_logger, setup_logging
from alsek.utils.parsing import parse_exception
from alsek.utils.system import thread_raise
Expand Down Expand Up @@ -64,6 +67,9 @@ def _retry_future_handler(
message: Message,
exception: BaseException,
) -> None:
if task.is_revoked(message):
return None

if task.do_retry(message, exception=exception):
task._update_status(message, status=TaskStatus.RETRYING)
task.broker.retry(message)
Expand All @@ -74,6 +80,9 @@ def _retry_future_handler(


def _complete_future_handler(task: Task, message: Message, result: Any) -> None:
if task.is_revoked(message):
return None

if task.result_store:
task.result_store.set(message, result=result)
if message.callback_message_data and task.do_callback(message, result=result):
Expand Down Expand Up @@ -104,6 +113,12 @@ def __init__(self, task: Task, message: Message) -> None:

self.created_at = utcnow_timestamp_ms()

self._revocation_stop_event = Event()
self._revocation_scan_thread = Thread(
target=self._revocation_scan,
daemon=True,
)

@property
@abstractmethod
def complete(self) -> bool:
Expand Down Expand Up @@ -131,6 +146,26 @@ def stop(self, exception: Type[BaseException]) -> None:
"""
raise NotImplementedError()

def _revocation_scan(self, check_interval: int | float = 0.5) -> None:
while not self.complete and not self._revocation_stop_event.is_set():
if self.task.is_revoked(self.message):
self.stop(RevokedError)
break
self._revocation_stop_event.wait(check_interval)

def clean_up(self, ignore_errors: bool = False) -> None:
try:
self._revocation_stop_event.set()
self._revocation_scan_thread.join(timeout=0)
except BaseException as error: # noqa
log.error(
"Clean up error encountered for task %s with message %s.",
self.task.name,
self.message.summary,
)
if not ignore_errors:
raise error


class ThreadTaskFuture(TaskFuture):
"""Future for task execution in a separate thread.
Expand All @@ -148,6 +183,10 @@ def __init__(self, task: Task, message: Message) -> None:
self._thread = Thread(target=self._wrapper, daemon=True)
self._thread.start()

# Note: this must go here b/c the scan depends on
# `.complete`, which in turn depends on `_thread`.
self._revocation_scan_thread.start()

@property
def complete(self) -> bool:
"""Whether the task has finished."""
Expand All @@ -161,6 +200,13 @@ def _wrapper(self) -> None:
result, exception = None, None
try:
result = self.task.execute(self.message)
if self.task.is_revoked(self.message):
log.info(
"Result for %s recovered after revocation. Discarding.",
self.message.summary,
)
return None

self.message.update(exception_details=None) # clear any existing errors
log.info("Successfully processed %s.", self.message.summary)
except BaseException as error:
Expand Down Expand Up @@ -241,6 +287,10 @@ def __init__(self, task: Task, message: Message, patience: int = 1 * 1000) -> No
)
self._process.start()

# Note: this must go here b/c the scan depends on
# `.complete`, which in turn depends on `_process`.
self._revocation_scan_thread.start()

@property
def complete(self) -> bool:
"""Whether the task has finished."""
Expand All @@ -261,6 +311,13 @@ def _wrapper(
result, exception = None, None
try:
result = task.execute(message)
if task.is_revoked(message):
log.info(
"Result for %s recovered after revocation. Discarding.",
message.summary,
)
return None

message.update(exception_details=None) # clear any existing errors
log.info("Successfully processed %s.", message.summary)
except BaseException as error:
Expand Down
2 changes: 2 additions & 0 deletions alsek/core/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class Message:
the task's function during the execution of ``op()``
metadata (dict, optional): a dictionary of user-defined message metadata.
This can store any data types supported by the backend's serializer.
exception_details (dict, optional): a dictionary of information about exceptions
with ``name``, ``text`` and ``traceback`` information. See ``ExceptionDetails()``.
result_ttl (int, optional): time to live (in milliseconds) for the
result in the result store. If a result store is provided and
this parameter is ``None``, the result will be persisted indefinitely.
Expand Down
5 changes: 3 additions & 2 deletions alsek/core/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger

from alsek._defaults import DEFAULT_TTL
from alsek.core.broker import Broker
from alsek.core.message import Message
from alsek.exceptions import ValidationError
Expand Down Expand Up @@ -56,7 +57,7 @@ class StatusTracker:
broker (Broker): broker used by tasks.
ttl (int, optional): time to live (in milliseconds) for the status
enable_pubsub (bool, optional): if ``True`` automatically publish PUBSUB updates.
If ``None`` determine automatically given the capabilies of the backend
If ``None`` determine automatically given the capabilities of the backend
used by ``broker``.
integrity_scan_trigger (CronTrigger, DateTrigger, IntervalTrigger, optional):
trigger which determines how often to scan for messages with non-terminal
Expand All @@ -69,7 +70,7 @@ class StatusTracker:
def __init__(
self,
broker: Broker,
ttl: Optional[int] = 60 * 60 * 24 * 7 * 1000,
ttl: Optional[int] = DEFAULT_TTL,
enable_pubsub: Optional[bool] = None,
integrity_scan_trigger: Optional[
Union[CronTrigger, DateTrigger, IntervalTrigger]
Expand Down
56 changes: 54 additions & 2 deletions alsek/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@
DEFAULT_MECHANISM,
DEFAULT_QUEUE,
DEFAULT_TASK_TIMEOUT,
DEFAULT_TTL,
)
from alsek.core.backoff import Backoff, ConstantBackoff, ExponentialBackoff
from alsek.core.broker import Broker
from alsek.core.message import Message
from alsek.core.status import StatusTracker, TaskStatus
from alsek.exceptions import SchedulingError, ValidationError
from alsek.exceptions import RevokedError, SchedulingError, ValidationError
from alsek.storage.result import ResultStore
from alsek.types import SUPPORTED_MECHANISMS, SupportedMechanismType
from alsek.utils.aggregation import gather_init_params
from alsek.utils.logging import magic_logger
from alsek.utils.parsing import ExceptionDetails
from alsek.utils.printing import auto_repr

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -394,6 +397,8 @@ def do_retry(self, message: Message, exception: BaseException) -> bool: # noqa
bool

"""
if self.is_revoked(message):
return False
return self.max_retries is None or message.retries < self.max_retries

def do_callback(self, message: Message, result: Any) -> bool: # noqa
Expand All @@ -408,11 +413,58 @@ def do_callback(self, message: Message, result: Any) -> bool: # noqa

Warning:
* If the task message does not have a callback this
this method will *not* be invoked.
method will *not* be invoked.

"""
return True

def _make_revoked_key_name(self, message: Message) -> str:
return f"revoked:{self.broker.get_message_name(message)}"

@magic_logger(
before=lambda message: log.info("Revoking %s...", message.summary),
after=lambda input_: log.debug("Revoked %s.", input_["message"].summary),
)
def revoke(self, message: Message) -> None:
"""Revoke the task.

Args:
message (Message): message to revoke

Returns:
None

"""
self.broker.backend.set(
self._make_revoked_key_name(message),
value=True,
ttl=DEFAULT_TTL,
)
message.update(
exception_details=ExceptionDetails(
name=RevokedError.__name__,
text="task revoked",
traceback=None,
).as_dict()
)
self._update_status(message, status=TaskStatus.FAILED)
self.broker.fail(message)

def is_revoked(self, message: Message) -> bool:
"""Check if a message is revoked.

Args:
message (Message): an Alsek message

Returns:
None

"""
if self.broker.backend.get(self._make_revoked_key_name(message)):
return True
else:
return False


class TriggerTask(Task):
"""Triggered Task.
Expand Down
2 changes: 2 additions & 0 deletions alsek/core/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def _stop_all_futures(self) -> None:
for futures in self._futures.values():
for future in futures:
future.stop(TerminationError)
future.clean_up(ignore_errors=True)

def _manage_futures(self) -> None:
for mechanism, futures in self._futures.items():
Expand All @@ -135,6 +136,7 @@ def _manage_futures(self) -> None:
to_remove.append(future)
elif future.time_limit_exceeded:
future.stop(TimeoutError)
future.clean_up(ignore_errors=True)
to_remove.append(future)

for f in to_remove:
Expand Down
6 changes: 5 additions & 1 deletion alsek/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class MessageAlreadyExistsError(AlsekError):


class MessageDoesNotExistsError(AlsekError):
"""Message does not exists in backend."""
"""Message does not exist in backend."""


class MultipleBrokersError(AlsekError):
Expand All @@ -39,3 +39,7 @@ class TaskNameCollisionError(AlsekError):

class TerminationError(AlsekError):
"""Alsek termination error."""


class RevokedError(AlsekError):
"""Alsek task revoked error."""
6 changes: 3 additions & 3 deletions alsek/utils/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

"""
import traceback
from typing import NamedTuple
from typing import NamedTuple, Optional


class ExceptionDetails(NamedTuple):
name: str
text: str
traceback: str
text: Optional[str] = None
traceback: Optional[str] = None

def as_dict(self) -> dict[str, str]:
return self._asdict()
Expand Down
Loading
Loading