Skip to content

Commit

Permalink
Handle jop failing to sync
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Mar 28, 2024
1 parent f484b3a commit 9db23f4
Showing 1 changed file with 50 additions and 39 deletions.
89 changes: 50 additions & 39 deletions src/saturn_engine/worker_manager/services/sync.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import threading
import time
from datetime import datetime
Expand Down Expand Up @@ -28,52 +29,62 @@ def _sync_jobs(
static_definitions: StaticDefinitions,
session: AnySyncSession,
) -> None:
logger = logging.getLogger(__name__)

# Jobs with no interval
for saturn_job in static_definitions.jobs.values():
# Check if job exists and create if not
existing_job = jobs_store.get_job(saturn_job.name, session)
if not existing_job:
job_queue = queues_store.create_queue(session=session, name=saturn_job.name)
jobs_store.create_job(
name=saturn_job.name,
session=session,
queue_name=job_queue.name,
)
try:
# Check if job exists and create if not
existing_job = jobs_store.get_job(saturn_job.name, session)
if not existing_job:
job_queue = queues_store.create_queue(
session=session, name=saturn_job.name
)
jobs_store.create_job(
name=saturn_job.name,
session=session,
queue_name=job_queue.name,
)
except Exception:
logger.exception("Failed to create %s", saturn_job.name)

# Jobs ran at an interval
for job_definition in static_definitions.job_definitions.values():
last_job = jobs_store.get_last_job(
session=session,
job_definition_name=job_definition.name,
)

if last_job:
# If a job already exists, check it has completed and
# the interval has elapsed to start a new one.
if not last_job.completed_at:
continue
try:
last_job = jobs_store.get_last_job(
session=session,
job_definition_name=job_definition.name,
)

# If the last job completed with success, we check for
# the job definition interval.
if not last_job.error:
scheduled_at = croniter(
job_definition.minimal_interval,
last_job.started_at,
).get_next(ret_type=datetime)
if scheduled_at > utcnow():
if last_job:
# If a job already exists, check it has completed and
# the interval has elapsed to start a new one.
if not last_job.completed_at:
continue

job_name: str = f"{job_definition.name}-{int(time.time())}"
queue = queues_store.create_queue(session=session, name=job_name)
job = jobs_store.create_job(
name=job_name,
session=session,
queue_name=queue.name,
job_definition_name=job_definition.name,
)
# If the last job completed with success, we check for
# the job definition interval.
if not last_job.error:
scheduled_at = croniter(
job_definition.minimal_interval,
last_job.started_at,
).get_next(ret_type=datetime)
if scheduled_at > utcnow():
continue

job_name: str = f"{job_definition.name}-{int(time.time())}"
queue = queues_store.create_queue(session=session, name=job_name)
job = jobs_store.create_job(
name=job_name,
session=session,
queue_name=queue.name,
job_definition_name=job_definition.name,
)

# If the last job was an error, we resume from where we were.
if last_job and last_job.error:
job.cursor = last_job.cursor
# If the last job was an error, we resume from where we were.
if last_job and last_job.error:
job.cursor = last_job.cursor

session.commit()
session.commit()
except Exception:
logger.exception("Failed to create %s", job_definition.name)

0 comments on commit 9db23f4

Please sign in to comment.