diff --git a/src/saturn_engine/core/api.py b/src/saturn_engine/core/api.py index e3d39f6a..bdc386f1 100644 --- a/src/saturn_engine/core/api.py +++ b/src/saturn_engine/core/api.py @@ -192,6 +192,12 @@ class JobInput: error: Optional[str] = None +@dataclasses.dataclass +class StartJobInput: + name: Optional[JobId] = None + job_definition_name: Optional[str] = None + + @dataclasses.dataclass class JobsResponse(ListResponse[JobItem]): items: list[JobItem] @@ -210,3 +216,8 @@ class UpdateResponse: @dataclasses.dataclass class JobsSyncResponse: pass + + +@dataclasses.dataclass +class JobsStartResponse: + name: str diff --git a/src/saturn_engine/stores/jobs_store.py b/src/saturn_engine/stores/jobs_store.py index 96f6ae43..20c6c139 100644 --- a/src/saturn_engine/stores/jobs_store.py +++ b/src/saturn_engine/stores/jobs_store.py @@ -1,6 +1,7 @@ import typing as t from typing import Optional +import time from datetime import datetime from sqlalchemy import select @@ -11,6 +12,8 @@ from saturn_engine.core import Cursor from saturn_engine.core import JobId from saturn_engine.core.api import JobsStates +from saturn_engine.core.api import QueueItem +from saturn_engine.core.api import StartJobInput from saturn_engine.models import Job from saturn_engine.models.job import JobCursorState from saturn_engine.models.queue import Queue @@ -19,6 +22,7 @@ from saturn_engine.utils.sqlalchemy import AnySession from saturn_engine.utils.sqlalchemy import AnySyncSession from saturn_engine.utils.sqlalchemy import upsert +from saturn_engine.worker_manager.config.static_definitions import StaticDefinitions def create_job( @@ -206,3 +210,54 @@ def fetch_cursors_states( states[row.name][row.cursor] = row.state return states + + +def start( + start_input: StartJobInput, + static_definitions: StaticDefinitions, + session: AnySyncSession, + restart: bool, +) -> Job: + queue_item: QueueItem | None = None + job_definition_name: str | None = start_input.job_definition_name + job: Job | None = None + + if start_input.name: + queue_item = static_definitions.jobs.get(start_input.name) + if not queue_item: + job = get_job(name=start_input.name, session=session) + if job: + job_definition_name = job.job_definition_name + + if job_definition_name: + job_definition = static_definitions.job_definitions.get(job_definition_name) + if job_definition: + queue_item = job_definition.template + if not job: + job = get_last_job( + session=session, job_definition_name=job_definition_name + ) + + if not queue_item: + raise ValueError("Job not Found.") + + if job and not job.completed_at: + if restart: + update_job( + session=session, + name=job.name, + completed_at=utcnow(), + error="Cancelled", + ) + else: + raise ValueError("Job already started.") + + job_name = f"{queue_item.name}-{int(time.time())}" + job_queue = queues_store.create_queue(session=session, name=job_name) + new_job = create_job( + session=session, + name=job_name, + queue_name=job_queue.name, + job_definition_name=job_definition_name, + ) + return new_job diff --git a/src/saturn_engine/worker_manager/api/jobs.py b/src/saturn_engine/worker_manager/api/jobs.py index d07ced55..dd3d4fd2 100644 --- a/src/saturn_engine/worker_manager/api/jobs.py +++ b/src/saturn_engine/worker_manager/api/jobs.py @@ -1,3 +1,4 @@ +import flask from flask import Blueprint from saturn_engine.core.api import FetchCursorsStatesInput @@ -5,9 +6,11 @@ from saturn_engine.core.api import JobInput from saturn_engine.core.api import JobResponse from saturn_engine.core.api import JobsResponse +from saturn_engine.core.api import JobsStartResponse from saturn_engine.core.api import JobsStatesSyncInput from saturn_engine.core.api import JobsStatesSyncResponse from saturn_engine.core.api import JobsSyncResponse +from saturn_engine.core.api import StartJobInput from saturn_engine.core.api import UpdateResponse from saturn_engine.database import session_scope from saturn_engine.stores import jobs_store @@ -93,3 +96,26 @@ def post_fetch_states() -> Json[FetchCursorsStatesResponse]: session=session, ) return jsonify(FetchCursorsStatesResponse(cursors=cursors)) + + +@bp.route("/_start", methods=("POST",)) +def post_start_job() -> Json[JobsStartResponse]: + restart: bool = flask.request.args.get("restart", "false") == "true" + start_input = marshall_request(StartJobInput) + if not start_input.name and not start_input.job_definition_name: + abort(http_code=400, error_code="MUST_SPECIFY_JOB") + elif start_input.name and start_input.job_definition_name: + abort(http_code=400, error_code="MUST_SPECIFY_ONE_JOB") + + with session_scope() as session: + try: + job = jobs_store.start( + start_input=start_input, + static_definitions=current_app.saturn.static_definitions, + session=session, + restart=restart, + ) + except ValueError as e: + abort(http_code=400, error_code="JOB_START_ERROR", message=str(e)) + + return jsonify(JobsStartResponse(name=job.name)) diff --git a/tests/worker_manager/api/test_jobs.py b/tests/worker_manager/api/test_jobs.py index b38f7717..89f1baa4 100644 --- a/tests/worker_manager/api/test_jobs.py +++ b/tests/worker_manager/api/test_jobs.py @@ -715,3 +715,118 @@ def test_fetch_cursors_states( "do-not-exist": {"a": None}, } } + + +def test_start_job_without_restart( + client: FlaskClient, + session: Session, + fake_job_definition: api.JobDefinition, + static_definitions: StaticDefinitions, + frozen_time: FreezeTime, +) -> None: + resp = client.post("/api/jobs/_start", json={}) + assert resp.status_code == 400 + assert resp.json + assert resp.json["error"]["code"] == "MUST_SPECIFY_JOB" + + resp = client.post( + "/api/jobs/_start", + json={"name": "name", "job_definition_name": "job_definition_name"}, + ) + assert resp.status_code == 400 + assert resp.json + assert resp.json["error"]["code"] == "MUST_SPECIFY_ONE_JOB" + + resp = client.post("/api/jobs/_start", json={"name": "try to find this"}) + assert resp.status_code == 400 + assert resp.json + assert resp.json["error"]["code"] == "JOB_START_ERROR" + + queue = queues_store.create_queue(session=session, name="fake-queue") + session.flush() + first_job = jobs_store.create_job( + name=queue.name, + session=session, + queue_name=queue.name, + job_definition_name=fake_job_definition.name, + ) + session.commit() + + frozen_time.tick() + + resp = client.post("/api/jobs/_start", json={"name": first_job.name}) + assert resp.status_code == 400 + assert resp.json + assert resp.json["error"]["code"] == "JOB_START_ERROR" + + jobs_store.update_job(session=session, name=first_job.name, completed_at=utcnow()) + session.commit() + + resp = client.post("/api/jobs/_start", json={"name": first_job.name}) + assert resp.status_code == 200 + assert resp.json + + second_job = jobs_store.get_job(session=session, name=resp.json["name"]) + assert second_job + + jobs_store.update_job(session=session, name=second_job.name, completed_at=utcnow()) + session.commit() + + frozen_time.tick() + + resp = client.post( + "/api/jobs/_start", json={"job_definition_name": fake_job_definition.name} + ) + + assert resp.status_code == 200 + assert resp.json + + third_job = jobs_store.get_job(session=session, name=resp.json["name"]) + assert third_job + + +def test_start_job_with_restart( + client: FlaskClient, + session: Session, + fake_job_definition: api.JobDefinition, + frozen_time: FreezeTime, +) -> None: + queue = queues_store.create_queue(session=session, name="fake-queue") + session.flush() + first_job = jobs_store.create_job( + name=queue.name, + session=session, + queue_name=queue.name, + job_definition_name=fake_job_definition.name, + ) + session.commit() + + frozen_time.tick() + + resp = client.post("/api/jobs/_start?restart=true", json={"name": first_job.name}) + + assert resp.status_code == 200 + assert resp.json + + second_job = jobs_store.get_job(session=session, name=resp.json["name"]) + session.refresh(first_job) + + assert second_job + assert first_job.completed_at is not None + assert first_job.error == "Cancelled" + + frozen_time.tick() + + resp = client.post( + "/api/jobs/_start?restart=true", + json={"job_definition_name": fake_job_definition.name}, + ) + assert resp.status_code == 200 + assert resp.json + + third_job = jobs_store.get_job(session=session, name=resp.json["name"]) + session.refresh(second_job) + + assert third_job + assert second_job.completed_at is not None + assert second_job.error == "Cancelled"