diff --git a/src/saturn_engine/worker/executors/arq/__init__.py b/src/saturn_engine/worker/executors/arq/__init__.py index 930a41a3..6e014f21 100644 --- a/src/saturn_engine/worker/executors/arq/__init__.py +++ b/src/saturn_engine/worker/executors/arq/__init__.py @@ -1,5 +1,4 @@ -QUEUE_TIMEOUT = 600 -RESULT_TIMEOUT = 1200 +TIMEOUT = 1200 EXECUTE_FUNC_NAME = "remote_execute" healthcheck_interval = 10 diff --git a/src/saturn_engine/worker/executors/arq/executor.py b/src/saturn_engine/worker/executors/arq/executor.py index dd21d076..c19fd41f 100644 --- a/src/saturn_engine/worker/executors/arq/executor.py +++ b/src/saturn_engine/worker/executors/arq/executor.py @@ -22,8 +22,7 @@ from .. import Executor from . import EXECUTE_FUNC_NAME -from . import QUEUE_TIMEOUT -from . import RESULT_TIMEOUT +from . import TIMEOUT from . import executor_healthcheck_key from . import healthcheck_interval from . import worker_healthcheck_key @@ -41,8 +40,7 @@ class Options: redis_url: str concurrency: int queue_name: str = "arq:queue" - queue_timeout: int = QUEUE_TIMEOUT - result_timeout: int = RESULT_TIMEOUT + timeout: int = TIMEOUT def __init__(self, options: Options, services: Services) -> None: self.logger = getLogger(__name__, self) @@ -105,7 +103,7 @@ async def process_message(self, message: ExecutableMessage) -> PipelineResults: job = await (await self.redis_queue).enqueue_job( EXECUTE_FUNC_NAME, message.message.as_remote(), - _expires=options.queue_timeout, + _expires=options.timeout + 5, _queue_name=options.queue_name, ) except (OSError, RedisError): @@ -114,7 +112,7 @@ async def process_message(self, message: ExecutableMessage) -> PipelineResults: tasks = TasksGroup(name=f"saturn.arq.process_message({message.id})") result_task: asyncio.Task[PipelineResults] = tasks.create_task( - job.result(timeout=self.options.result_timeout) + job.result(timeout=self.options.timeout) ) healthcheck_task = tasks.create_task(self.monitor_job_healthcheck(job)) async with tasks: