From 3ec8a4f383ca1db0e2b3c17e5341ddc730d5a1c8 Mon Sep 17 00:00:00 2001 From: Olivier Michaud Date: Tue, 20 Aug 2024 10:03:59 -0400 Subject: [PATCH] Worker: lock jobs by executors --- src/saturn_engine/client/worker_manager.py | 4 +- src/saturn_engine/config_definitions.py | 4 ++ src/saturn_engine/core/api.py | 1 + src/saturn_engine/default_config.py | 1 + .../worker/services/api_client.py | 1 + src/saturn_engine/worker/topics/rabbitmq.py | 2 +- .../worker_manager/services/lock.py | 6 +++ tests/worker_manager/api/test_lock.py | 37 +++++++++++++++++++ 8 files changed, 54 insertions(+), 2 deletions(-) diff --git a/src/saturn_engine/client/worker_manager.py b/src/saturn_engine/client/worker_manager.py index bd50e65e..f2a22c01 100644 --- a/src/saturn_engine/client/worker_manager.py +++ b/src/saturn_engine/client/worker_manager.py @@ -39,9 +39,11 @@ def __init__( base_url: str, worker_id: Optional[str] = None, selector: Optional[str] = None, + executors: list[str] | None = None, ) -> None: self.worker_id: str = worker_id or socket.gethostname() - self.selector: Optional[str] = selector + self.selector: str | None = selector + self.executors: list[str] | None = executors self.http_client = http_client self.base_url = base_url diff --git a/src/saturn_engine/config_definitions.py b/src/saturn_engine/config_definitions.py index 975d802e..7fd7cc8f 100644 --- a/src/saturn_engine/config_definitions.py +++ b/src/saturn_engine/config_definitions.py @@ -54,6 +54,10 @@ class SaturnConfig: standalone: bool # If set, select jobs matching the selector regex. selector: t.Optional[str] + + # If set, select jobs for specific executors + executors: t.Optional[list[str]] + services_manager: ServicesManagerConfig worker_manager: WorkerManagerConfig rabbitmq: RabbitMQConfig diff --git a/src/saturn_engine/core/api.py b/src/saturn_engine/core/api.py index 4ff5ebf9..d4a8fcee 100644 --- a/src/saturn_engine/core/api.py +++ b/src/saturn_engine/core/api.py @@ -108,6 +108,7 @@ class LockResponse: class LockInput: worker_id: str selector: t.Optional[str] = None + executors: list[str] | None = None @dataclasses.dataclass diff --git a/src/saturn_engine/default_config.py b/src/saturn_engine/default_config.py index 48f4bac0..86bd0413 100644 --- a/src/saturn_engine/default_config.py +++ b/src/saturn_engine/default_config.py @@ -15,6 +15,7 @@ class config(SaturnConfig): env = Env(os.environ.get("SATURN_ENV", "development")) worker_id = socket.gethostname() selector: t.Optional[str] = os.environ.get("SATURN_SELECTOR") + executors: list[str] | None = os.environ.get("SATURN_EXECUTORS") worker_manager_url = os.environ.get( "SATURN_WORKER_MANAGER_URL", "http://127.0.0.1:5000" ) diff --git a/src/saturn_engine/worker/services/api_client.py b/src/saturn_engine/worker/services/api_client.py index 2c44619a..1ed7194a 100644 --- a/src/saturn_engine/worker/services/api_client.py +++ b/src/saturn_engine/worker/services/api_client.py @@ -27,6 +27,7 @@ async def open(self) -> None: self.client = WorkerManagerClient( http_client=self.services.http_client.session, base_url=self.services.config.c.worker_manager_url, + executors=self.services.config.c.executors, worker_id=self.services.config.c.worker_id, selector=self.services.config.c.selector, ) diff --git a/src/saturn_engine/worker/topics/rabbitmq.py b/src/saturn_engine/worker/topics/rabbitmq.py index 0c5fcc7d..7bd03a01 100644 --- a/src/saturn_engine/worker/topics/rabbitmq.py +++ b/src/saturn_engine/worker/topics/rabbitmq.py @@ -301,8 +301,8 @@ def channel_closed( self.logger.error("Channel closed: %s", reason, extra=extra) def channel_reopened(self, channel: aio_pika.abc.AbstractChannel) -> None: - del self.queue del self.exchange + del self.queue self.logger.info( "Channel reopening", extra={"data": {"topic": {"id": self.name}}} diff --git a/src/saturn_engine/worker_manager/services/lock.py b/src/saturn_engine/worker_manager/services/lock.py index db2f2d88..9dba91b5 100644 --- a/src/saturn_engine/worker_manager/services/lock.py +++ b/src/saturn_engine/worker_manager/services/lock.py @@ -57,9 +57,15 @@ def lock_jobs( ) ) + # Join definitions and filtered out by executors for item in assigned_items.copy(): try: item.join_definitions(static_definitions) + if ( + lock_input.executors + and item.queue_item.executor not in lock_input.executors + ): + assigned_items.remove(item) except Exception as e: if item.job: jobs_store.set_failed(item.job.name, session=session, error=repr(e)) diff --git a/tests/worker_manager/api/test_lock.py b/tests/worker_manager/api/test_lock.py index 4c404176..a05dd203 100644 --- a/tests/worker_manager/api/test_lock.py +++ b/tests/worker_manager/api/test_lock.py @@ -290,3 +290,40 @@ def test_executors( assert resp.json assert resp.json["items"][0]["name"] == "test" assert resp.json["executors"][0]["name"] == "ray-executor" + + +def test_api_lock_with_executors( + client: FlaskClient, + session: Session, + frozen_time: FreezeTime, + fake_job_definition: api.JobDefinition, +) -> None: + # Make sure the job is set at the default executor + assert fake_job_definition.template.executor == "default" + + queues_store.create_queue(session=session, name="test") + jobs_store.create_job( + session=session, + name="test", + queue_name="test", + job_definition_name=fake_job_definition.name, + ) + session.commit() + + # Try to lock the queue_item, but the wrong executor is set + resp = client.post( + "/api/lock", json={"worker_id": "worker-1", "executors": ["other-executor"]} + ) + assert resp.status_code == 200 + assert resp.json + assert not resp.json["items"] + assert not resp.json["resources"] + + # Lock with the default executor return the job + resp = client.post( + "/api/lock", json={"worker_id": "worker-1", "executors": ["default"]} + ) + assert resp.status_code == 200 + assert resp.json + assert resp.json["items"][0]["name"] == "test" + assert resp.json["executors"][0]["name"] == "default"