Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WorkerManager: Add selector parameter to lock a subset of jobs #323

Merged
merged 1 commit into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/saturn_engine/client/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ def __init__(
http_client: aiohttp.ClientSession,
base_url: str,
worker_id: Optional[str] = None,
selector: Optional[str] = None,
) -> None:
self.worker_id: str = worker_id or socket.gethostname()
self.selector: Optional[str] = selector
self.http_client = http_client
self.base_url = base_url

async def lock(self) -> LockResponse:
lock_url = urlcat(self.base_url, "api/lock")
json = asdict(LockInput(worker_id=self.worker_id))
json = asdict(LockInput(worker_id=self.worker_id, selector=self.selector))
async with self.http_client.post(lock_url, json=json) as response:
return fromdict(await response.json(), LockResponse)

Expand Down
2 changes: 2 additions & 0 deletions src/saturn_engine/config_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class SaturnConfig:
# Worker Manager URL used by clients and workers.
worker_id: str
worker_manager_url: str
# If set, select jobs matching the selector regex.
selector: t.Optional[str]
services_manager: ServicesManagerConfig
worker_manager: WorkerManagerConfig
rabbitmq: RabbitMQConfig
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class LockResponse:
@dataclasses.dataclass
class LockInput:
worker_id: str
selector: t.Optional[str] = None


@dataclasses.dataclass
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/default_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
class config(SaturnConfig):
env = Env(os.environ.get("SATURN_ENV", "development"))
worker_id = socket.gethostname()
selector: t.Optional[str] = os.environ.get("SATURN_SELECTOR")
worker_manager_url = os.environ.get(
"SATURN_WORKER_MANAGER_URL", "http://127.0.0.1:5000"
)
Expand Down
12 changes: 12 additions & 0 deletions src/saturn_engine/stores/queues_store.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import typing as t

import datetime

from sqlalchemy import or_
Expand Down Expand Up @@ -25,7 +27,11 @@ def get_assigned_queues(
session: AnySyncSession,
worker_id: str,
assigned_after: datetime.datetime,
selector: t.Optional[str] = None,
) -> list[Queue]:
extra_filters = []
if selector:
extra_filters.append(Queue.name.regexp_match(selector))
assigned_jobs: list[Queue] = (
session.execute(
select(Queue)
Expand All @@ -34,6 +40,7 @@ def get_assigned_queues(
Queue.enabled.is_(True),
Queue.assigned_to == worker_id,
Queue.assigned_at >= assigned_after,
*extra_filters,
)
.order_by(Queue.name)
)
Expand All @@ -47,8 +54,12 @@ def get_unassigned_queues(
*,
session: AnySyncSession,
assigned_before: datetime.datetime,
selector: t.Optional[str] = None,
limit: int,
) -> list[Queue]:
extra_filters = []
if selector:
extra_filters.append(Queue.name.regexp_match(selector))
unassigned_queues: list[Queue] = (
session.execute(
select(Queue)
Expand All @@ -59,6 +70,7 @@ def get_unassigned_queues(
Queue.assigned_at.is_(None),
Queue.assigned_at < assigned_before,
),
*extra_filters,
)
.limit(limit)
)
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/worker/services/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ async def open(self) -> None:
http_client=self.services.http_client.session,
base_url=self.services.config.c.worker_manager_url,
worker_id=self.services.config.c.worker_id,
selector=self.services.config.c.selector,
)
2 changes: 2 additions & 0 deletions src/saturn_engine/worker_manager/services/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def lock_jobs(
queues_store.get_assigned_queues(
session=session,
worker_id=lock_input.worker_id,
selector=lock_input.selector,
assigned_after=assignation_expiration_cutoff,
)
)
Expand All @@ -52,6 +53,7 @@ def lock_jobs(
session=session,
assigned_before=assignation_expiration_cutoff,
limit=max_assigned_items - len(assigned_items),
selector=lock_input.selector,
)
)

Expand Down
4 changes: 4 additions & 0 deletions tests/worker_manager/api/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ def create_job(
"job-10",
}

resp = client.post("/api/lock", json={"worker_id": "worker-2", "selector": "j.*-9"})
assert resp.status_code == 200
assert ids(resp) == {"job-9"}


def test_api_lock_with_resources(
client: FlaskClient,
Expand Down
Loading