Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init Topics #377

Merged
merged 2 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/saturn_engine/worker/executors/executable.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from saturn_engine.core import ResourceUsed
from saturn_engine.core import TopicMessage
from saturn_engine.core.api import QueueItem
from saturn_engine.utils import flatten
from saturn_engine.utils import iterators
from saturn_engine.utils.config import LazyConfig
from saturn_engine.utils.log import getLogger
Expand Down Expand Up @@ -138,6 +139,8 @@ class Options:
max_concurrency: t.Optional[int] = None

async def run(self) -> AsyncGenerator[ExecutableMessage, None]:
await self.open()

try:
async for context, message in self._make_iterator():
with message_context(message):
Expand Down Expand Up @@ -175,6 +178,11 @@ async def close(self) -> None:
for topic in topics:
await topic.close()

async def open(self) -> None:
await self.topic.open()
for output in flatten(self.output.values()):
await output.open()

async def wait_for_done(self) -> None:
if self.pending_messages_count:
await self.done.wait()
Expand Down
3 changes: 3 additions & 0 deletions src/saturn_engine/worker/inventories/batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def __init__(

self.inventory = build_inventory(options.inventory, services=services)

async def open(self) -> None:
await self.inventory.open()

async def _next_batch(self) -> list[Item]:
batch: list[Item] = await alib.list(
alib.islice(self._run_iter, self.batch_size)
Expand Down
10 changes: 7 additions & 3 deletions src/saturn_engine/worker/inventories/chained.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ def __init__(self, options: Options, services: Services, **kwargs: object) -> No
self.__services = services
self.__current: t.Optional[CurrentInventory] = None

async def iterate(self, after: t.Optional[Cursor] = None) -> AsyncIterator[Item]:
cursors = json.loads(after) if after else {}
inventories = self.options.inventories

self.inventories = [(i.name, self.build_inventory(i)) for i in inventories]

async def open(self) -> None:
for _, inventory in self.inventories:
await inventory.open()

async def iterate(self, after: t.Optional[Cursor] = None) -> AsyncIterator[Item]:
cursors = json.loads(after) if after else {}

start_inventory = 0
if cursors:
for i, (name, _) in enumerate(self.inventories):
Expand Down
4 changes: 4 additions & 0 deletions src/saturn_engine/worker/inventories/fanin.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def __init__(self, options: Options, services: Services, **kwargs: object) -> No
for input_def in options.inputs
}

async def open(self) -> None:
for inventory in self.inputs.values():
await inventory.open()

async def iterate(self, after: t.Optional[Cursor] = None) -> t.AsyncIterator[Item]:
cursors = json.loads(after) if after else {}

Expand Down
5 changes: 5 additions & 0 deletions src/saturn_engine/worker/inventories/joined.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ async def iterate(self, after: t.Optional[Cursor] = None) -> t.AsyncIterator[Ite
async for item in self.scheduler.run():
yield item

async def open(self) -> None:
await self.root.open()

async def run(self, after: t.Optional[Cursor] = None) -> t.AsyncIterator[Item]:
async for item in self.iterate(after=after):
yield item
Expand All @@ -85,6 +88,8 @@ async def join_inventories(self) -> None:
options=self.options.join.options | {"parent_item": item},
)
subinv = self.build_inventory(subdef)
await subinv.open()

subcursors = self._multi_cursors.process_sub(subinv, parent=item)

fut = self.scheduler.add(
Expand Down
3 changes: 3 additions & 0 deletions src/saturn_engine/worker/inventories/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ def __init__(self, options: Options, services: Services, **kwargs: object) -> No

self.topic = build_topic(options.topic, services=services)

async def open(self) -> None:
await self.topic.open()

async def iterate(self, after: t.Optional[Cursor] = None) -> t.AsyncIterator[Item]:
async for message_ctx in self.topic.run():
try:
Expand Down
3 changes: 3 additions & 0 deletions src/saturn_engine/worker/inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ def has_pendings(self) -> bool:
return self._cursors.has_pendings()
return False

async def open(self) -> None:
pass


class IteratorInventory(Inventory):
def __init__(self, *, batch_size: t.Optional[int] = None, **kwargs: object) -> None:
Expand Down
3 changes: 3 additions & 0 deletions src/saturn_engine/worker/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,6 @@ async def item_to_topic(self, item_ctx: Item) -> t.AsyncIterator[TopicMessage]:
self.state_service.set_job_cursor(
self.queue_item.name, cursor=self.inventory.cursor
)

async def open(self) -> None:
await self.inventory.open()
3 changes: 3 additions & 0 deletions src/saturn_engine/worker/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ async def run(self) -> AsyncGenerator[TopicOutput, None]:
async def publish(self, message: TopicMessage, wait: bool) -> bool:
raise NotImplementedError()

async def open(self) -> None:
pass

async def close(self) -> None:
pass

Expand Down
3 changes: 3 additions & 0 deletions src/saturn_engine/worker/topics/batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def __init__(self, options: Options, services: Services, **kwargs: object) -> No
self.batch: list[TopicOutput] = []
self.start_time: datetime = datetime.utcnow()

async def open(self) -> None:
await self.topic.open()

@property
def _is_done(self) -> bool:
return self.force_done and self.queue.empty()
Expand Down
3 changes: 3 additions & 0 deletions src/saturn_engine/worker/topics/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ def __init__(self, options: Options, services: Services, **kwargs: object) -> No
)
self.queue_arguments.setdefault("x-overflow", self.options.overflow)

async def open(self) -> None:
await self.ensure_queue()

async def run(self) -> AsyncGenerator[t.AsyncContextManager[TopicMessage], None]:
if self.is_closed:
raise TopicClosedError()
Expand Down
Loading