Skip to content

Commit

Permalink
Worker: lock jobs by executors
Browse files Browse the repository at this point in the history
  • Loading branch information
Olivier Michaud committed Aug 20, 2024
1 parent 5d5ed28 commit 3ec8a4f
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 2 deletions.
4 changes: 3 additions & 1 deletion src/saturn_engine/client/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ def __init__(
base_url: str,
worker_id: Optional[str] = None,
selector: Optional[str] = None,
executors: list[str] | None = None,
) -> None:
self.worker_id: str = worker_id or socket.gethostname()
self.selector: Optional[str] = selector
self.selector: str | None = selector
self.executors: list[str] | None = executors
self.http_client = http_client
self.base_url = base_url

Expand Down
4 changes: 4 additions & 0 deletions src/saturn_engine/config_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class SaturnConfig:
standalone: bool
# If set, select jobs matching the selector regex.
selector: t.Optional[str]

# If set, select jobs for specific executors
executors: t.Optional[list[str]]

services_manager: ServicesManagerConfig
worker_manager: WorkerManagerConfig
rabbitmq: RabbitMQConfig
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class LockResponse:
class LockInput:
worker_id: str
selector: t.Optional[str] = None
executors: list[str] | None = None


@dataclasses.dataclass
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/default_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class config(SaturnConfig):
env = Env(os.environ.get("SATURN_ENV", "development"))
worker_id = socket.gethostname()
selector: t.Optional[str] = os.environ.get("SATURN_SELECTOR")
executors: list[str] | None = os.environ.get("SATURN_EXECUTORS")
worker_manager_url = os.environ.get(
"SATURN_WORKER_MANAGER_URL", "http://127.0.0.1:5000"
)
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/worker/services/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async def open(self) -> None:
self.client = WorkerManagerClient(
http_client=self.services.http_client.session,
base_url=self.services.config.c.worker_manager_url,
executors=self.services.config.c.executors,
worker_id=self.services.config.c.worker_id,
selector=self.services.config.c.selector,
)
Expand Down
2 changes: 1 addition & 1 deletion src/saturn_engine/worker/topics/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ def channel_closed(
self.logger.error("Channel closed: %s", reason, extra=extra)

def channel_reopened(self, channel: aio_pika.abc.AbstractChannel) -> None:
del self.queue
del self.exchange
del self.queue

self.logger.info(
"Channel reopening", extra={"data": {"topic": {"id": self.name}}}
Expand Down
6 changes: 6 additions & 0 deletions src/saturn_engine/worker_manager/services/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,15 @@ def lock_jobs(
)
)

# Join definitions and filtered out by executors
for item in assigned_items.copy():
try:
item.join_definitions(static_definitions)
if (
lock_input.executors
and item.queue_item.executor not in lock_input.executors
):
assigned_items.remove(item)
except Exception as e:
if item.job:
jobs_store.set_failed(item.job.name, session=session, error=repr(e))
Expand Down
37 changes: 37 additions & 0 deletions tests/worker_manager/api/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,40 @@ def test_executors(
assert resp.json
assert resp.json["items"][0]["name"] == "test"
assert resp.json["executors"][0]["name"] == "ray-executor"


def test_api_lock_with_executors(
client: FlaskClient,
session: Session,
frozen_time: FreezeTime,
fake_job_definition: api.JobDefinition,
) -> None:
# Make sure the job is set at the default executor
assert fake_job_definition.template.executor == "default"

queues_store.create_queue(session=session, name="test")
jobs_store.create_job(
session=session,
name="test",
queue_name="test",
job_definition_name=fake_job_definition.name,
)
session.commit()

# Try to lock the queue_item, but the wrong executor is set
resp = client.post(
"/api/lock", json={"worker_id": "worker-1", "executors": ["other-executor"]}
)
assert resp.status_code == 200
assert resp.json
assert not resp.json["items"]
assert not resp.json["resources"]

# Lock with the default executor return the job
resp = client.post(
"/api/lock", json={"worker_id": "worker-1", "executors": ["default"]}
)
assert resp.status_code == 200
assert resp.json
assert resp.json["items"][0]["name"] == "test"
assert resp.json["executors"][0]["name"] == "default"

0 comments on commit 3ec8a4f

Please sign in to comment.