Skip to content

Commit

Permalink
WorkerManager: Add selector parameter to lock a subset of jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Jul 28, 2023
1 parent 4914b50 commit 173bf3e
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/saturn_engine/client/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ def __init__(
http_client: aiohttp.ClientSession,
base_url: str,
worker_id: Optional[str] = None,
selector: Optional[str] = None,
) -> None:
self.worker_id: str = worker_id or socket.gethostname()
self.selector: Optional[str] = selector
self.http_client = http_client
self.base_url = base_url

async def lock(self) -> LockResponse:
lock_url = urlcat(self.base_url, "api/lock")
json = asdict(LockInput(worker_id=self.worker_id))
json = asdict(LockInput(worker_id=self.worker_id, selector=self.selector))
async with self.http_client.post(lock_url, json=json) as response:
return fromdict(await response.json(), LockResponse)

Expand Down
2 changes: 2 additions & 0 deletions src/saturn_engine/config_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class SaturnConfig:
# Worker Manager URL used by clients and workers.
worker_id: str
worker_manager_url: str
# If set, select jobs matching the selector regex.
selector: t.Optional[str]
services_manager: ServicesManagerConfig
worker_manager: WorkerManagerConfig
rabbitmq: RabbitMQConfig
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class LockResponse:
@dataclasses.dataclass
class LockInput:
worker_id: str
selector: t.Optional[str] = None


@dataclasses.dataclass
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/default_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
class config(SaturnConfig):
env = Env(os.environ.get("SATURN_ENV", "development"))
worker_id = socket.gethostname()
selector: t.Optional[str] = os.environ.get("SATURN_SELECTOR")
worker_manager_url = os.environ.get(
"SATURN_WORKER_MANAGER_URL", "http://127.0.0.1:5000"
)
Expand Down
12 changes: 12 additions & 0 deletions src/saturn_engine/stores/queues_store.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import typing as t

import datetime

from sqlalchemy import or_
Expand Down Expand Up @@ -25,7 +27,11 @@ def get_assigned_queues(
session: AnySyncSession,
worker_id: str,
assigned_after: datetime.datetime,
selector: t.Optional[str] = None,
) -> list[Queue]:
extra_filters = []
if selector:
extra_filters.append(Queue.name.regexp_match(selector))
assigned_jobs: list[Queue] = (
session.execute(
select(Queue)
Expand All @@ -34,6 +40,7 @@ def get_assigned_queues(
Queue.enabled.is_(True),
Queue.assigned_to == worker_id,
Queue.assigned_at >= assigned_after,
*extra_filters,
)
.order_by(Queue.name)
)
Expand All @@ -47,8 +54,12 @@ def get_unassigned_queues(
*,
session: AnySyncSession,
assigned_before: datetime.datetime,
selector: t.Optional[str] = None,
limit: int,
) -> list[Queue]:
extra_filters = []
if selector:
extra_filters.append(Queue.name.regexp_match(selector))
unassigned_queues: list[Queue] = (
session.execute(
select(Queue)
Expand All @@ -59,6 +70,7 @@ def get_unassigned_queues(
Queue.assigned_at.is_(None),
Queue.assigned_at < assigned_before,
),
*extra_filters,
)
.limit(limit)
)
Expand Down
1 change: 1 addition & 0 deletions src/saturn_engine/worker/services/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ async def open(self) -> None:
http_client=self.services.http_client.session,
base_url=self.services.config.c.worker_manager_url,
worker_id=self.services.config.c.worker_id,
selector=self.services.config.c.selector,
)
2 changes: 2 additions & 0 deletions src/saturn_engine/worker_manager/services/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def lock_jobs(
queues_store.get_assigned_queues(
session=session,
worker_id=lock_input.worker_id,
selector=lock_input.selector,
assigned_after=assignation_expiration_cutoff,
)
)
Expand All @@ -52,6 +53,7 @@ def lock_jobs(
session=session,
assigned_before=assignation_expiration_cutoff,
limit=max_assigned_items - len(assigned_items),
selector=lock_input.selector,
)
)

Expand Down
4 changes: 4 additions & 0 deletions tests/worker_manager/api/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ def create_job(
"job-10",
}

resp = client.post("/api/lock", json={"worker_id": "worker-2", "selector": "j.*-9"})
assert resp.status_code == 200
assert ids(resp) == {"job-9"}


def test_api_lock_with_resources(
client: FlaskClient,
Expand Down

0 comments on commit 173bf3e

Please sign in to comment.