Skip to content

Commit

Permalink
Cancel publish task on client sync timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Jul 20, 2023
1 parent 1c995fc commit ea42df5
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
7 changes: 6 additions & 1 deletion src/saturn_engine/client/saturn.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import TypeVar

import asyncio
import concurrent.futures
import threading

import aiohttp
Expand Down Expand Up @@ -161,7 +162,11 @@ def _run_sync(
timeout: Optional[float] = MEDIUM_TIMEOUT,
) -> T:
future = asyncio.run_coroutine_threadsafe(coroutine, self._loop)
return future.result(timeout=timeout)
try:
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError:
future.cancel()
raise

def close(self) -> None:
try:
Expand Down
34 changes: 33 additions & 1 deletion tests/client/test_saturn_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import typing as t

import asyncio
import concurrent.futures
import threading

import pytest
Expand All @@ -10,6 +11,7 @@
from saturn_engine.config import Config
from saturn_engine.core import TopicMessage
from saturn_engine.utils.inspect import get_import_name
from saturn_engine.worker.topic import Topic
from saturn_engine.worker.topics.memory import MemoryTopic
from saturn_engine.worker.topics.memory import get_queue
from saturn_engine.worker_manager.config.declarative import load_definitions_from_str
Expand All @@ -36,6 +38,22 @@ async def delayed_publish(self, message: TopicMessage, wait: bool) -> None:
self.published_event.set()


class HangingTopic(Topic):
def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
super().__init__(*args, **kwargs)
self.publish_done = threading.Event()
self.publish_result: t.Any = None

async def publish(self, message: TopicMessage, wait: bool) -> bool:
try:
await asyncio.Event().wait()
except BaseException as e:
self.publish_result = e
finally:
self.publish_done.set()
return True


def test_saturn_client_publish_sync(
config: Config,
http_client_mock: HttpClientMock,
Expand All @@ -47,7 +65,12 @@ def test_saturn_client_publish_sync(
"name": "test-topic",
"options": {},
"type": get_import_name(DelayedMemoryTopic),
}
},
{
"name": "hanging-topic",
"options": {},
"type": get_import_name(HangingTopic),
},
]
}

Expand All @@ -71,6 +94,15 @@ async def set_event() -> None:
queue.task_done()
assert queue.qsize() == 0

# Publish on blocking topic should cancel their async task.
with pytest.raises(concurrent.futures.TimeoutError):
saturn_client.publish("hanging-topic", TopicMessage({"a": 0}), True, timeout=1)

hanging_topic = t.cast(HangingTopic, saturn_client._client.topics["hanging-topic"])
hanging_topic.publish_done.wait()
assert isinstance(hanging_topic.publish_result, asyncio.CancelledError)

# Publish on invalid topic fail.
with pytest.raises(KeyError):
saturn_client.publish("test-topic2", TopicMessage({"a": 0}), True)
assert queue.qsize() == 0
Expand Down

0 comments on commit ea42df5

Please sign in to comment.