Skip to content

Commit

Permalink
Start new job API
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastien Deschambault committed Aug 8, 2023
1 parent 2e0d413 commit 63d1f68
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/saturn_engine/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ class JobInput:
error: Optional[str] = None


@dataclasses.dataclass
class StartJobInput:
name: Optional[JobId] = None
job_definition_name: Optional[str] = None


@dataclasses.dataclass
class JobsResponse(ListResponse[JobItem]):
items: list[JobItem]
Expand All @@ -210,3 +216,8 @@ class UpdateResponse:
@dataclasses.dataclass
class JobsSyncResponse:
pass


@dataclasses.dataclass
class JobsStartResponse:
pass
39 changes: 39 additions & 0 deletions src/saturn_engine/worker_manager/api/jobs.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
import time

import flask
from flask import Blueprint

from saturn_engine.core.api import FetchCursorsStatesInput
from saturn_engine.core.api import FetchCursorsStatesResponse
from saturn_engine.core.api import JobInput
from saturn_engine.core.api import JobResponse
from saturn_engine.core.api import JobsResponse
from saturn_engine.core.api import JobsStartResponse
from saturn_engine.core.api import JobsStatesSyncInput
from saturn_engine.core.api import JobsStatesSyncResponse
from saturn_engine.core.api import JobsSyncResponse
from saturn_engine.core.api import StartJobInput
from saturn_engine.core.api import UpdateResponse
from saturn_engine.database import session_scope
from saturn_engine.models.job import Job
from saturn_engine.stores import jobs_store
from saturn_engine.utils import utcnow
from saturn_engine.utils.flask import Json
from saturn_engine.utils.flask import abort
from saturn_engine.utils.flask import check_found
Expand Down Expand Up @@ -93,3 +100,35 @@ def post_fetch_states() -> Json[FetchCursorsStatesResponse]:
session=session,
)
return jsonify(FetchCursorsStatesResponse(cursors=cursors))


@bp.route("/_start", methods=("POST",))
def post_start_job() -> Json[JobsStartResponse]:
restart: bool = flask.request.args.get("restart", "false") == "true"
start_input = marshall_request(StartJobInput)
if not start_input.name and not start_input.job_definition_name:
abort(http_code=404, error_code="MUST_SPECIFY_JOB")

with session_scope() as session:
job: Job | None = None
if start_input.name:
job = jobs_store.get_job(session=session, name=start_input.name)
elif start_input.job_definition_name:
job = jobs_store.get_last_job(
session=session, job_definition_name=start_input.job_definition_name
)

if not job:
abort(http_code=404, error_code="JOB_NOT_FOUND")

if restart:
jobs_store.update_job(session=session, name=job.name, completed_at=utcnow())

job_name: str = f"{job.job_definition_name}-{int(time.time())}"
jobs_store.create_job(
session=session,
name=job_name,
queue_name=job.queue_name,
job_definition_name=job.job_definition_name,
)
return jsonify(JobsStartResponse())
77 changes: 77 additions & 0 deletions tests/worker_manager/api/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,3 +715,80 @@ def test_fetch_cursors_states(
"do-not-exist": {"a": None},
}
}


def test_start_job_without_restart(
client: FlaskClient,
session: Session,
fake_job_definition: api.JobDefinition,
) -> None:
queue = queues_store.create_queue(session=session, name="fake-queue")
session.flush()
first_job = jobs_store.create_job(
name=queue.name,
session=session,
queue_name=queue.name,
job_definition_name=fake_job_definition.name,
)
session.commit()

resp = client.post("/api/jobs/_start", json={"name": first_job.name})

assert resp.status_code == 200
jobs = jobs_store.get_jobs(session=session)
assert len(jobs) == 2
for job in jobs:
assert job.completed_at is None

# Job name is based on time, to avoid duplicate name wait 1 second
time.sleep(1)

resp = client.post(
"/api/jobs/_start", json={"job_definition_name": fake_job_definition.name}
)

assert resp.status_code == 200
jobs = jobs_store.get_jobs(session=session)
assert len(jobs) == 3
for job in jobs:
assert job.completed_at is None


def test_start_job_with_restart(
client: FlaskClient,
session: Session,
fake_job_definition: api.JobDefinition,
) -> None:
queue = queues_store.create_queue(session=session, name="fake-queue")
session.flush()
first_job = jobs_store.create_job(
name=queue.name,
session=session,
queue_name=queue.name,
job_definition_name=fake_job_definition.name,
)
session.commit()

resp = client.post("/api/jobs/_start?restart=true", json={"name": first_job.name})

assert resp.status_code == 200
jobs = jobs_store.get_jobs(session=session)
session.refresh(first_job)

assert len(jobs) == 2
assert first_job.completed_at is not None
second_job = jobs[1]

# Job name is based on time, to avoid duplicate name wait 1 second
time.sleep(1)

resp = client.post(
"/api/jobs/_start?restart=true",
json={"job_definition_name": fake_job_definition.name},
)

assert resp.status_code == 200
jobs = jobs_store.get_jobs(session=session)
session.refresh(second_job)
assert len(jobs) == 3
assert second_job.completed_at is not None

0 comments on commit 63d1f68

Please sign in to comment.