Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker: lock jobs by executors #433

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/saturn_engine/client/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ def __init__(
base_url: str,
worker_id: Optional[str] = None,
selector: Optional[str] = None,
executors: list[str] | None = None,
) -> None:
self.worker_id: str = worker_id or socket.gethostname()
self.selector: Optional[str] = selector
self.selector: str | None = selector
infherny marked this conversation as resolved.
Show resolved Hide resolved
self.executors: list[str] | None = executors
self.http_client = http_client
self.base_url = base_url

Expand Down
4 changes: 4 additions & 0 deletions src/saturn_engine/config_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class SaturnConfig:
standalone: bool
# If set, select jobs matching the selector regex.
selector: t.Optional[str]

# If set, select jobs for specific executors
executors: t.Optional[list[str]]

services_manager: ServicesManagerConfig
worker_manager: WorkerManagerConfig
rabbitmq: RabbitMQConfig
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class LockResponse:
class LockInput:
worker_id: str
selector: t.Optional[str] = None
executors: list[str] | None = None


@dataclasses.dataclass
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/default_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class config(SaturnConfig):
env = Env(os.environ.get("SATURN_ENV", "development"))
worker_id = socket.gethostname()
selector: t.Optional[str] = os.environ.get("SATURN_SELECTOR")
executors: list[str] | None = os.environ.get("SATURN_EXECUTORS", "").split(",")
worker_manager_url = os.environ.get(
"SATURN_WORKER_MANAGER_URL", "http://127.0.0.1:5000"
)
Expand Down
32 changes: 16 additions & 16 deletions src/saturn_engine/stores/queues_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,28 @@ def get_unassigned_queues(
session: AnySyncSession,
assigned_before: datetime.datetime,
selector: t.Optional[str] = None,
limit: int,
limit: int | None,
) -> list[Queue]:
extra_filters = []
if selector:
extra_filters.append(Queue.name.regexp_match(selector))
unassigned_queues: t.Sequence[Queue] = (
session.execute(
select(Queue)
.options(joinedload(Queue.job))
.where(
Queue.enabled.is_(True),
or_(
Queue.assigned_at.is_(None),
Queue.assigned_at < assigned_before,
),
*extra_filters,
)
.limit(limit)

query = (
select(Queue)
.options(joinedload(Queue.job))
.where(
Queue.enabled.is_(True),
or_(
Queue.assigned_at.is_(None),
Queue.assigned_at < assigned_before,
),
*extra_filters,
)
.scalars()
.all()
)
if limit:
query = query.limit(limit=limit)

unassigned_queues: t.Sequence[Queue] = session.execute(query).scalars().all()
return list(unassigned_queues)


Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/worker/services/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async def open(self) -> None:
self.client = WorkerManagerClient(
http_client=self.services.http_client.session,
base_url=self.services.config.c.worker_manager_url,
executors=self.services.config.c.executors,
worker_id=self.services.config.c.worker_id,
selector=self.services.config.c.selector,
)
Expand Down
13 changes: 12 additions & 1 deletion src/saturn_engine/worker_manager/services/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,25 @@ def lock_jobs(
queues_store.get_unassigned_queues(
session=session,
assigned_before=assignation_expiration_cutoff,
limit=max_assigned_items - len(assigned_items),
# We don't set a limit when specifying executors
limit=(
max_assigned_items - len(assigned_items)
if not lock_input.executors
else None
),
selector=lock_input.selector,
)
)

# Join definitions and filtered out by executors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't really work. You select all jobs from the database then filter them. But the get_unassigned_queues returns up to N jobs, so it might only select the wrong job and not return any job using your executor.

We probably need to do the other way around: read the potential jobs from the static_definitions with our executor selector, then get_unassigned_queues that match one of these job.

It's ugly and inneficient, but that's the quickest and most reliable hack for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about removing this limit when we specify an executor?

if len(assigned_items) < max_assigned_items:
        assigned_items.extend(
            queues_store.get_unassigned_queues(
                session=session,
                assigned_before=assignation_expiration_cutoff,
                limit=max_assigned_items - len(assigned_items) if not lock_input.executiors,
                selector=lock_input.selector,
            )
        )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented the limit suggestion

for item in assigned_items.copy():
try:
item.join_definitions(static_definitions)
if (
lock_input.executors
and item.queue_item.executor not in lock_input.executors
):
assigned_items.remove(item)
except Exception as e:
if item.job:
jobs_store.set_failed(item.job.name, session=session, error=repr(e))
Expand Down
37 changes: 37 additions & 0 deletions tests/worker_manager/api/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,40 @@ def test_executors(
assert resp.json
assert resp.json["items"][0]["name"] == "test"
assert resp.json["executors"][0]["name"] == "ray-executor"


def test_api_lock_with_executors(
client: FlaskClient,
session: Session,
frozen_time: FreezeTime,
fake_job_definition: api.JobDefinition,
) -> None:
# Make sure the job is set at the default executor
assert fake_job_definition.template.executor == "default"

queues_store.create_queue(session=session, name="test")
jobs_store.create_job(
session=session,
name="test",
queue_name="test",
job_definition_name=fake_job_definition.name,
)
session.commit()

# Try to lock the queue_item, but the wrong executor is set
resp = client.post(
"/api/lock", json={"worker_id": "worker-1", "executors": ["other-executor"]}
)
assert resp.status_code == 200
assert resp.json
assert not resp.json["items"]
assert not resp.json["resources"]

# Lock with the default executor return the job
resp = client.post(
"/api/lock", json={"worker_id": "worker-1", "executors": ["default"]}
)
assert resp.status_code == 200
assert resp.json
assert resp.json["items"][0]["name"] == "test"
assert resp.json["executors"][0]["name"] == "default"
Loading