Skip to content

Commit

Permalink
Start new job API
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastien Deschambault committed Aug 9, 2023
1 parent 2e0d413 commit 483cbd0
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/saturn_engine/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ class JobInput:
error: Optional[str] = None


@dataclasses.dataclass
class StartJobInput:
name: Optional[JobId] = None
job_definition_name: Optional[str] = None


@dataclasses.dataclass
class JobsResponse(ListResponse[JobItem]):
items: list[JobItem]
Expand All @@ -210,3 +216,8 @@ class UpdateResponse:
@dataclasses.dataclass
class JobsSyncResponse:
pass


@dataclasses.dataclass
class JobsStartResponse:
pass
13 changes: 13 additions & 0 deletions src/saturn_engine/stores/jobs_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from saturn_engine.core import Cursor
from saturn_engine.core import JobId
from saturn_engine.core.api import JobsStates
from saturn_engine.core.api import QueueItem
from saturn_engine.models import Job
from saturn_engine.models.job import JobCursorState
from saturn_engine.models.queue import Queue
Expand Down Expand Up @@ -206,3 +207,15 @@ def fetch_cursors_states(
states[row.name][row.cursor] = row.state

return states


def start(queue_item: QueueItem, session: AnySyncSession, restart: bool) -> None:
if restart:
update_job(session=session, name=queue_item.name, completed_at=utcnow())

job_queue = queues_store.create_queue(session=session, name=queue_item.name)
create_job(
session=session,
name=queue_item.name,
queue_name=job_queue.name,
)
33 changes: 33 additions & 0 deletions src/saturn_engine/worker_manager/api/jobs.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import flask
from flask import Blueprint

from saturn_engine.core.api import FetchCursorsStatesInput
from saturn_engine.core.api import FetchCursorsStatesResponse
from saturn_engine.core.api import JobInput
from saturn_engine.core.api import JobResponse
from saturn_engine.core.api import JobsResponse
from saturn_engine.core.api import JobsStartResponse
from saturn_engine.core.api import JobsStatesSyncInput
from saturn_engine.core.api import JobsStatesSyncResponse
from saturn_engine.core.api import JobsSyncResponse
from saturn_engine.core.api import QueueItem
from saturn_engine.core.api import StartJobInput
from saturn_engine.core.api import UpdateResponse
from saturn_engine.database import session_scope
from saturn_engine.stores import jobs_store
Expand Down Expand Up @@ -93,3 +97,32 @@ def post_fetch_states() -> Json[FetchCursorsStatesResponse]:
session=session,
)
return jsonify(FetchCursorsStatesResponse(cursors=cursors))


@bp.route("/_start", methods=("POST",))
def post_start_job() -> Json[JobsStartResponse]:
restart: bool = flask.request.args.get("cancel_prev", "false") == "true"
start_input = marshall_request(StartJobInput)
if not start_input.name and not start_input.job_definition_name:
abort(http_code=400, error_code="MUST_SPECIFY_JOB")
elif start_input.name and start_input.job_definition_name:
abort(http_code=400, error_code="MUST_SPECIFY_ONE_JOB")

static_definitions = current_app.saturn.static_definitions

with session_scope() as session:
queue_item: QueueItem | None = None
if start_input.name:
queue_item = static_definitions.jobs.get(start_input.name)
elif start_input.job_definition_name:
job_definition = static_definitions.job_definitions.get(
start_input.job_definition_name
)
queue_item = job_definition.template if job_definition else None

if not queue_item:
abort(http_code=404, error_code="JOB_NOT_FOUND")

jobs_store.start(queue_item=queue_item, session=session, restart=restart)

return jsonify(JobsStartResponse())
97 changes: 97 additions & 0 deletions tests/worker_manager/api/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,3 +715,100 @@ def test_fetch_cursors_states(
"do-not-exist": {"a": None},
}
}


def test_start_job_without_restart(
client: FlaskClient,
session: Session,
fake_job_definition: api.JobDefinition,
) -> None:
resp = client.post("/api/jobs/_start", json={})
assert resp.status_code == 400
assert resp.json
assert resp.json["error"]["code"] == "MUST_SPECIFY_JOB"

resp = client.post(
"/api/jobs/_start",
json={"name": "name", "job_definition_name": "job_definition_name"},
)
assert resp.status_code == 400
assert resp.json
assert resp.json["error"]["code"] == "MUST_SPECIFY_ONE_JOB"

resp = client.post("/api/jobs/_start", json={"name": "try to find this"})
assert resp.status_code == 404
assert resp.json
assert resp.json["error"]["code"] == "JOB_NOT_FOUND"

queue = queues_store.create_queue(session=session, name="fake-queue")
session.flush()
first_job = jobs_store.create_job(
name=queue.name,
session=session,
queue_name=queue.name,
job_definition_name=fake_job_definition.name,
)
session.commit()

resp = client.post("/api/jobs/_start", json={"name": first_job.name})

assert resp.status_code == 200
jobs = jobs_store.get_jobs(session=session)
assert len(jobs) == 2
for job in jobs:
assert job.completed_at is None

# Job name is based on time, to avoid duplicate name wait 1 second
time.sleep(1)

resp = client.post(
"/api/jobs/_start", json={"job_definition_name": fake_job_definition.name}
)

assert resp.status_code == 200
jobs = jobs_store.get_jobs(session=session)
assert len(jobs) == 3
for job in jobs:
assert job.completed_at is None


def test_start_job_with_restart(
client: FlaskClient,
session: Session,
fake_job_definition: api.JobDefinition,
) -> None:
queue = queues_store.create_queue(session=session, name="fake-queue")
session.flush()
first_job = jobs_store.create_job(
name=queue.name,
session=session,
queue_name=queue.name,
job_definition_name=fake_job_definition.name,
)
session.commit()

resp = client.post(
"/api/jobs/_start?cancel_prev=true", json={"name": first_job.name}
)

assert resp.status_code == 200
jobs = jobs_store.get_jobs(session=session)
session.refresh(first_job)

assert len(jobs) == 2
assert first_job.completed_at is not None
second_job = jobs[1]

# Job name is based on time, to avoid duplicate name wait 1 second
time.sleep(1)

resp = client.post(
"/api/jobs/_start?cancel_prev=true",
json={"job_definition_name": fake_job_definition.name},
)

assert resp.status_code == 200
jobs = jobs_store.get_jobs(session=session)
session.refresh(second_job)
assert len(jobs) == 3
assert second_job.completed_at is not None

0 comments on commit 483cbd0

Please sign in to comment.