Skip to content

Commit

Permalink
Worker: lock jobs by executors
Browse files Browse the repository at this point in the history
  • Loading branch information
Olivier Michaud committed Aug 20, 2024
1 parent 5d5ed28 commit be83064
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 18 deletions.
4 changes: 3 additions & 1 deletion src/saturn_engine/client/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ def __init__(
base_url: str,
worker_id: Optional[str] = None,
selector: Optional[str] = None,
executors: list[str] | None = None,
) -> None:
self.worker_id: str = worker_id or socket.gethostname()
self.selector: Optional[str] = selector
self.selector: str | None = selector
self.executors: list[str] | None = executors
self.http_client = http_client
self.base_url = base_url

Expand Down
4 changes: 4 additions & 0 deletions src/saturn_engine/config_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class SaturnConfig:
standalone: bool
# If set, select jobs matching the selector regex.
selector: t.Optional[str]

# If set, select jobs for specific executors
executors: t.Optional[list[str]]

services_manager: ServicesManagerConfig
worker_manager: WorkerManagerConfig
rabbitmq: RabbitMQConfig
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class LockResponse:
class LockInput:
worker_id: str
selector: t.Optional[str] = None
executors: list[str] | None = None


@dataclasses.dataclass
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/default_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class config(SaturnConfig):
env = Env(os.environ.get("SATURN_ENV", "development"))
worker_id = socket.gethostname()
selector: t.Optional[str] = os.environ.get("SATURN_SELECTOR")
executors: list[str] | None = os.environ.get("SATURN_EXECUTORS", "").split(",")
worker_manager_url = os.environ.get(
"SATURN_WORKER_MANAGER_URL", "http://127.0.0.1:5000"
)
Expand Down
32 changes: 16 additions & 16 deletions src/saturn_engine/stores/queues_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,28 @@ def get_unassigned_queues(
session: AnySyncSession,
assigned_before: datetime.datetime,
selector: t.Optional[str] = None,
limit: int,
limit: int | None,
) -> list[Queue]:
extra_filters = []
if selector:
extra_filters.append(Queue.name.regexp_match(selector))
unassigned_queues: t.Sequence[Queue] = (
session.execute(
select(Queue)
.options(joinedload(Queue.job))
.where(
Queue.enabled.is_(True),
or_(
Queue.assigned_at.is_(None),
Queue.assigned_at < assigned_before,
),
*extra_filters,
)
.limit(limit)

query = (
select(Queue)
.options(joinedload(Queue.job))
.where(
Queue.enabled.is_(True),
or_(
Queue.assigned_at.is_(None),
Queue.assigned_at < assigned_before,
),
*extra_filters,
)
.scalars()
.all()
)
if limit:
query = query.limit(limit=limit)

unassigned_queues: t.Sequence[Queue] = session.execute(query).scalars().all()
return list(unassigned_queues)


Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/worker/services/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async def open(self) -> None:
self.client = WorkerManagerClient(
http_client=self.services.http_client.session,
base_url=self.services.config.c.worker_manager_url,
executors=self.services.config.c.executors,
worker_id=self.services.config.c.worker_id,
selector=self.services.config.c.selector,
)
Expand Down
13 changes: 12 additions & 1 deletion src/saturn_engine/worker_manager/services/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,25 @@ def lock_jobs(
queues_store.get_unassigned_queues(
session=session,
assigned_before=assignation_expiration_cutoff,
limit=max_assigned_items - len(assigned_items),
# We don't set a limit when specifying executors
limit=(
max_assigned_items - len(assigned_items)
if not lock_input.executors
else None
),
selector=lock_input.selector,
)
)

# Join definitions and filtered out by executors
for item in assigned_items.copy():
try:
item.join_definitions(static_definitions)
if (
lock_input.executors
and item.queue_item.executor not in lock_input.executors
):
assigned_items.remove(item)
except Exception as e:
if item.job:
jobs_store.set_failed(item.job.name, session=session, error=repr(e))
Expand Down
37 changes: 37 additions & 0 deletions tests/worker_manager/api/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,40 @@ def test_executors(
assert resp.json
assert resp.json["items"][0]["name"] == "test"
assert resp.json["executors"][0]["name"] == "ray-executor"


def test_api_lock_with_executors(
client: FlaskClient,
session: Session,
frozen_time: FreezeTime,
fake_job_definition: api.JobDefinition,
) -> None:
# Make sure the job is set at the default executor
assert fake_job_definition.template.executor == "default"

queues_store.create_queue(session=session, name="test")
jobs_store.create_job(
session=session,
name="test",
queue_name="test",
job_definition_name=fake_job_definition.name,
)
session.commit()

# Try to lock the queue_item, but the wrong executor is set
resp = client.post(
"/api/lock", json={"worker_id": "worker-1", "executors": ["other-executor"]}
)
assert resp.status_code == 200
assert resp.json
assert not resp.json["items"]
assert not resp.json["resources"]

# Lock with the default executor return the job
resp = client.post(
"/api/lock", json={"worker_id": "worker-1", "executors": ["default"]}
)
assert resp.status_code == 200
assert resp.json
assert resp.json["items"][0]["name"] == "test"
assert resp.json["executors"][0]["name"] == "default"

0 comments on commit be83064

Please sign in to comment.