Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start new job API #331

Merged
merged 1 commit into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/saturn_engine/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ class JobInput:
error: Optional[str] = None


@dataclasses.dataclass
class StartJobInput:
name: Optional[JobId] = None
job_definition_name: Optional[str] = None


@dataclasses.dataclass
class JobsResponse(ListResponse[JobItem]):
items: list[JobItem]
Expand All @@ -210,3 +216,8 @@ class UpdateResponse:
@dataclasses.dataclass
class JobsSyncResponse:
pass


@dataclasses.dataclass
class JobsStartResponse:
name: str
55 changes: 55 additions & 0 deletions src/saturn_engine/stores/jobs_store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import typing as t
from typing import Optional

import time
from datetime import datetime

from sqlalchemy import select
Expand All @@ -11,6 +12,8 @@
from saturn_engine.core import Cursor
from saturn_engine.core import JobId
from saturn_engine.core.api import JobsStates
from saturn_engine.core.api import QueueItem
from saturn_engine.core.api import StartJobInput
from saturn_engine.models import Job
from saturn_engine.models.job import JobCursorState
from saturn_engine.models.queue import Queue
Expand All @@ -19,6 +22,7 @@
from saturn_engine.utils.sqlalchemy import AnySession
from saturn_engine.utils.sqlalchemy import AnySyncSession
from saturn_engine.utils.sqlalchemy import upsert
from saturn_engine.worker_manager.config.static_definitions import StaticDefinitions


def create_job(
Expand Down Expand Up @@ -206,3 +210,54 @@ def fetch_cursors_states(
states[row.name][row.cursor] = row.state

return states


def start(
start_input: StartJobInput,
static_definitions: StaticDefinitions,
session: AnySyncSession,
restart: bool,
) -> Job:
queue_item: QueueItem | None = None
job_definition_name: str | None = start_input.job_definition_name
job: Job | None = None

if start_input.name:
queue_item = static_definitions.jobs.get(start_input.name)
if not queue_item:
job = get_job(name=start_input.name, session=session)
if job:
job_definition_name = job.job_definition_name

if job_definition_name:
job_definition = static_definitions.job_definitions.get(job_definition_name)
if job_definition:
queue_item = job_definition.template
if not job:
job = get_last_job(
session=session, job_definition_name=job_definition_name
)

if not queue_item:
raise ValueError("Job not Found.")

if job and not job.completed_at:
if restart:
update_job(
session=session,
name=job.name,
completed_at=utcnow(),
error="Cancelled",
)
else:
raise ValueError("Job already started.")

job_name = f"{queue_item.name}-{int(time.time())}"
job_queue = queues_store.create_queue(session=session, name=job_name)
new_job = create_job(
session=session,
name=job_name,
queue_name=job_queue.name,
job_definition_name=job_definition_name,
)
return new_job
26 changes: 26 additions & 0 deletions src/saturn_engine/worker_manager/api/jobs.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import flask
from flask import Blueprint

from saturn_engine.core.api import FetchCursorsStatesInput
from saturn_engine.core.api import FetchCursorsStatesResponse
from saturn_engine.core.api import JobInput
from saturn_engine.core.api import JobResponse
from saturn_engine.core.api import JobsResponse
from saturn_engine.core.api import JobsStartResponse
from saturn_engine.core.api import JobsStatesSyncInput
from saturn_engine.core.api import JobsStatesSyncResponse
from saturn_engine.core.api import JobsSyncResponse
from saturn_engine.core.api import StartJobInput
from saturn_engine.core.api import UpdateResponse
from saturn_engine.database import session_scope
from saturn_engine.stores import jobs_store
Expand Down Expand Up @@ -93,3 +96,26 @@ def post_fetch_states() -> Json[FetchCursorsStatesResponse]:
session=session,
)
return jsonify(FetchCursorsStatesResponse(cursors=cursors))


@bp.route("/_start", methods=("POST",))
def post_start_job() -> Json[JobsStartResponse]:
restart: bool = flask.request.args.get("restart", "false") == "true"
start_input = marshall_request(StartJobInput)
if not start_input.name and not start_input.job_definition_name:
sebd11 marked this conversation as resolved.
Show resolved Hide resolved
abort(http_code=400, error_code="MUST_SPECIFY_JOB")
elif start_input.name and start_input.job_definition_name:
abort(http_code=400, error_code="MUST_SPECIFY_ONE_JOB")

with session_scope() as session:
try:
job = jobs_store.start(
start_input=start_input,
static_definitions=current_app.saturn.static_definitions,
session=session,
restart=restart,
)
except ValueError as e:
abort(http_code=400, error_code="JOB_START_ERROR", message=str(e))

return jsonify(JobsStartResponse(name=job.name))
115 changes: 115 additions & 0 deletions tests/worker_manager/api/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,3 +715,118 @@ def test_fetch_cursors_states(
"do-not-exist": {"a": None},
}
}


def test_start_job_without_restart(
client: FlaskClient,
session: Session,
fake_job_definition: api.JobDefinition,
static_definitions: StaticDefinitions,
frozen_time: FreezeTime,
) -> None:
resp = client.post("/api/jobs/_start", json={})
assert resp.status_code == 400
assert resp.json
assert resp.json["error"]["code"] == "MUST_SPECIFY_JOB"

resp = client.post(
"/api/jobs/_start",
json={"name": "name", "job_definition_name": "job_definition_name"},
)
assert resp.status_code == 400
assert resp.json
assert resp.json["error"]["code"] == "MUST_SPECIFY_ONE_JOB"

resp = client.post("/api/jobs/_start", json={"name": "try to find this"})
assert resp.status_code == 400
assert resp.json
assert resp.json["error"]["code"] == "JOB_START_ERROR"

queue = queues_store.create_queue(session=session, name="fake-queue")
session.flush()
first_job = jobs_store.create_job(
name=queue.name,
session=session,
queue_name=queue.name,
job_definition_name=fake_job_definition.name,
)
session.commit()

frozen_time.tick()

resp = client.post("/api/jobs/_start", json={"name": first_job.name})
assert resp.status_code == 400
assert resp.json
assert resp.json["error"]["code"] == "JOB_START_ERROR"

jobs_store.update_job(session=session, name=first_job.name, completed_at=utcnow())
session.commit()

resp = client.post("/api/jobs/_start", json={"name": first_job.name})
assert resp.status_code == 200
assert resp.json

second_job = jobs_store.get_job(session=session, name=resp.json["name"])
assert second_job

jobs_store.update_job(session=session, name=second_job.name, completed_at=utcnow())
session.commit()

frozen_time.tick()

resp = client.post(
"/api/jobs/_start", json={"job_definition_name": fake_job_definition.name}
)

assert resp.status_code == 200
assert resp.json

third_job = jobs_store.get_job(session=session, name=resp.json["name"])
assert third_job


def test_start_job_with_restart(
client: FlaskClient,
session: Session,
fake_job_definition: api.JobDefinition,
frozen_time: FreezeTime,
) -> None:
queue = queues_store.create_queue(session=session, name="fake-queue")
session.flush()
first_job = jobs_store.create_job(
name=queue.name,
session=session,
queue_name=queue.name,
job_definition_name=fake_job_definition.name,
)
session.commit()

frozen_time.tick()

resp = client.post("/api/jobs/_start?restart=true", json={"name": first_job.name})

assert resp.status_code == 200
assert resp.json

second_job = jobs_store.get_job(session=session, name=resp.json["name"])
session.refresh(first_job)

assert second_job
assert first_job.completed_at is not None
assert first_job.error == "Cancelled"

frozen_time.tick()

resp = client.post(
"/api/jobs/_start?restart=true",
json={"job_definition_name": fake_job_definition.name},
)
assert resp.status_code == 200
assert resp.json

third_job = jobs_store.get_job(session=session, name=resp.json["name"])
session.refresh(second_job)

assert third_job
assert second_job.completed_at is not None
assert second_job.error == "Cancelled"
Loading