Skip to content

Commit

Permalink
Clean message config to remove unserializable items
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Sep 21, 2023
1 parent 323fa6e commit 08630c1
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 3 deletions.
1 change: 0 additions & 1 deletion example/definitions/executor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ spec:
type: ProcessExecutor
options:
max_workers: 1
pool_type: thread
---
apiVersion: saturn.flared.io/v1alpha1
kind: SaturnExecutor
Expand Down
2 changes: 1 addition & 1 deletion src/saturn_engine/worker/executors/arq/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def process_message(self, message: ExecutableMessage) -> PipelineResults:
try:
job = await (await self.redis_queue).enqueue_job(
EXECUTE_FUNC_NAME,
message.message,
message.message.as_remote(),
_expires=options.queue_timeout,
_queue_name=options.queue_name,
)
Expand Down
2 changes: 1 addition & 1 deletion src/saturn_engine/worker/executors/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(self, options: Options, services: Services) -> None:

async def process_message(self, message: ExecutableMessage) -> PipelineResults:
loop = asyncio.get_running_loop()
execute = partial(self.remote_execute, message=message.message)
execute = partial(self.remote_execute, message=message.message.as_remote())
return await loop.run_in_executor(self.pool_executor, execute)

@property
Expand Down
5 changes: 5 additions & 0 deletions src/saturn_engine/worker/pipeline_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,8 @@ def execute(self) -> object:

pipeline_args = fromdict(args, pipeline_args_def)
return pipeline_args.call(kwargs=args)

def as_remote(self) -> "PipelineMessage":
return dataclasses.replace(
self, message=dataclasses.replace(self.message, config={})
)

0 comments on commit 08630c1

Please sign in to comment.