Skip to content

Commit

Permalink
Start new job API
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastien Deschambault committed Aug 9, 2023
1 parent 2e0d413 commit 40ce315
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/saturn_engine/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ class JobInput:
error: Optional[str] = None


@dataclasses.dataclass
class StartJobInput:
name: Optional[JobId] = None
job_definition_name: Optional[str] = None


@dataclasses.dataclass
class JobsResponse(ListResponse[JobItem]):
items: list[JobItem]
Expand All @@ -210,3 +216,8 @@ class UpdateResponse:
@dataclasses.dataclass
class JobsSyncResponse:
pass


@dataclasses.dataclass
class JobsStartResponse:
name: JobId
50 changes: 50 additions & 0 deletions src/saturn_engine/stores/jobs_store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import typing as t
from typing import Optional

import time
from datetime import datetime

from sqlalchemy import select
Expand All @@ -11,6 +12,8 @@
from saturn_engine.core import Cursor
from saturn_engine.core import JobId
from saturn_engine.core.api import JobsStates
from saturn_engine.core.api import QueueItem
from saturn_engine.core.api import StartJobInput
from saturn_engine.models import Job
from saturn_engine.models.job import JobCursorState
from saturn_engine.models.queue import Queue
Expand All @@ -19,6 +22,7 @@
from saturn_engine.utils.sqlalchemy import AnySession
from saturn_engine.utils.sqlalchemy import AnySyncSession
from saturn_engine.utils.sqlalchemy import upsert
from saturn_engine.worker_manager.config.static_definitions import StaticDefinitions


def create_job(
Expand Down Expand Up @@ -206,3 +210,49 @@ def fetch_cursors_states(
states[row.name][row.cursor] = row.state

return states


def start(
start_input: StartJobInput,
static_definitions: StaticDefinitions,
session: AnySyncSession,
restart: bool,
) -> Job:
queue_item: QueueItem | None = None
job_definition_name: str | None = start_input.job_definition_name
job: Job | None = None

if start_input.name:
queue_item = static_definitions.jobs.get(start_input.name)
if not queue_item:
job = get_job(name=start_input.name, session=session)
if job:
job_definition_name = job.job_definition_name

if job_definition_name:
job_definition = static_definitions.job_definitions.get(job_definition_name)
if job_definition:
queue_item = job_definition.template
if not job:
job = get_last_job(
session=session, job_definition_name=job_definition_name
)

if not queue_item:
raise ValueError("Job not Found.")

if job and not job.completed_at:
if restart:
update_job(session=session, name=job.name, completed_at=utcnow())
else:
raise ValueError("Job already started.")

job_name = f"{queue_item.name}-{int(time.time())}"
job_queue = queues_store.create_queue(session=session, name=job_name)
new_job = create_job(
session=session,
name=job_name,
queue_name=job_queue.name,
job_definition_name=job_definition_name,
)
return new_job
26 changes: 26 additions & 0 deletions src/saturn_engine/worker_manager/api/jobs.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import flask
from flask import Blueprint

from saturn_engine.core.api import FetchCursorsStatesInput
from saturn_engine.core.api import FetchCursorsStatesResponse
from saturn_engine.core.api import JobInput
from saturn_engine.core.api import JobResponse
from saturn_engine.core.api import JobsResponse
from saturn_engine.core.api import JobsStartResponse
from saturn_engine.core.api import JobsStatesSyncInput
from saturn_engine.core.api import JobsStatesSyncResponse
from saturn_engine.core.api import JobsSyncResponse
from saturn_engine.core.api import StartJobInput
from saturn_engine.core.api import UpdateResponse
from saturn_engine.database import session_scope
from saturn_engine.stores import jobs_store
Expand Down Expand Up @@ -93,3 +96,26 @@ def post_fetch_states() -> Json[FetchCursorsStatesResponse]:
session=session,
)
return jsonify(FetchCursorsStatesResponse(cursors=cursors))


@bp.route("/_start", methods=("POST",))
def post_start_job() -> Json[JobsStartResponse]:
restart: bool = flask.request.args.get("restart", "false") == "true"
start_input = marshall_request(StartJobInput)
if not start_input.name and not start_input.job_definition_name:
abort(http_code=400, error_code="MUST_SPECIFY_JOB")
elif start_input.name and start_input.job_definition_name:
abort(http_code=400, error_code="MUST_SPECIFY_ONE_JOB")

with session_scope() as session:
try:
job = jobs_store.start(
start_input=start_input,
static_definitions=current_app.saturn.static_definitions,
session=session,
restart=restart,
)
except ValueError as e:
abort(http_code=400, error_code="JOB_START_ERROR", message=str(e))

return jsonify(JobsStartResponse(name=job.name))
104 changes: 104 additions & 0 deletions tests/worker_manager/api/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,3 +715,107 @@ def test_fetch_cursors_states(
"do-not-exist": {"a": None},
}
}


def test_start_job_without_restart(
client: FlaskClient,
session: Session,
fake_job_definition: api.JobDefinition,
static_definitions: StaticDefinitions,
frozen_time: FreezeTime,
) -> None:
resp = client.post("/api/jobs/_start", json={})
assert resp.status_code == 400
assert resp.json
assert resp.json["error"]["code"] == "MUST_SPECIFY_JOB"

resp = client.post(
"/api/jobs/_start",
json={"name": "name", "job_definition_name": "job_definition_name"},
)
assert resp.status_code == 400
assert resp.json
assert resp.json["error"]["code"] == "MUST_SPECIFY_ONE_JOB"

resp = client.post("/api/jobs/_start", json={"name": "try to find this"})
assert resp.status_code == 400
assert resp.json
assert resp.json["error"]["code"] == "JOB_START_ERROR"

queue = queues_store.create_queue(session=session, name="fake-queue")
session.flush()
first_job = jobs_store.create_job(
name=queue.name,
session=session,
queue_name=queue.name,
job_definition_name=fake_job_definition.name,
)
session.commit()

frozen_time.tick()

resp = client.post("/api/jobs/_start", json={"name": first_job.name})
assert resp.status_code == 400
assert resp.json
assert resp.json["error"]["code"] == "JOB_START_ERROR"

jobs_store.update_job(session=session, name=first_job.name, completed_at=utcnow())
session.commit()

resp = client.post("/api/jobs/_start", json={"name": first_job.name})
assert resp.status_code == 200
second_job = jobs_store.get_job(session=session, name=resp.json["name"])
assert second_job

jobs_store.update_job(session=session, name=second_job.name, completed_at=utcnow())
session.commit()

frozen_time.tick()

resp = client.post(
"/api/jobs/_start", json={"job_definition_name": fake_job_definition.name}
)

assert resp.status_code == 200
third_job = jobs_store.get_job(session=session, name=resp.json["name"])
assert third_job


def test_start_job_with_restart(
client: FlaskClient,
session: Session,
fake_job_definition: api.JobDefinition,
frozen_time: FreezeTime,
) -> None:
queue = queues_store.create_queue(session=session, name="fake-queue")
session.flush()
first_job = jobs_store.create_job(
name=queue.name,
session=session,
queue_name=queue.name,
job_definition_name=fake_job_definition.name,
)
session.commit()

frozen_time.tick()

resp = client.post("/api/jobs/_start?restart=true", json={"name": first_job.name})

assert resp.status_code == 200
second_job = jobs_store.get_job(session=session, name=resp.json["name"])
session.refresh(first_job)

assert second_job
assert first_job.completed_at is not None

frozen_time.tick()

resp = client.post(
"/api/jobs/_start?restart=true",
json={"job_definition_name": fake_job_definition.name},
)
assert resp.status_code == 200
session.refresh(second_job)
third_job = jobs_store.get_job(session=session, name=resp.json["name"])
assert third_job
assert second_job.completed_at is not None

0 comments on commit 40ce315

Please sign in to comment.