From 9db23f4f44525ecb0827d8b071839c09c4f04122 Mon Sep 17 00:00:00 2001 From: isra17 Date: Thu, 28 Mar 2024 09:51:46 -0400 Subject: [PATCH] Handle jop failing to sync --- .../worker_manager/services/sync.py | 89 +++++++++++-------- 1 file changed, 50 insertions(+), 39 deletions(-) diff --git a/src/saturn_engine/worker_manager/services/sync.py b/src/saturn_engine/worker_manager/services/sync.py index 1dbb771e..ccc13af1 100644 --- a/src/saturn_engine/worker_manager/services/sync.py +++ b/src/saturn_engine/worker_manager/services/sync.py @@ -1,3 +1,4 @@ +import logging import threading import time from datetime import datetime @@ -28,52 +29,62 @@ def _sync_jobs( static_definitions: StaticDefinitions, session: AnySyncSession, ) -> None: + logger = logging.getLogger(__name__) + # Jobs with no interval for saturn_job in static_definitions.jobs.values(): - # Check if job exists and create if not - existing_job = jobs_store.get_job(saturn_job.name, session) - if not existing_job: - job_queue = queues_store.create_queue(session=session, name=saturn_job.name) - jobs_store.create_job( - name=saturn_job.name, - session=session, - queue_name=job_queue.name, - ) + try: + # Check if job exists and create if not + existing_job = jobs_store.get_job(saturn_job.name, session) + if not existing_job: + job_queue = queues_store.create_queue( + session=session, name=saturn_job.name + ) + jobs_store.create_job( + name=saturn_job.name, + session=session, + queue_name=job_queue.name, + ) + except Exception: + logger.exception("Failed to create %s", saturn_job.name) # Jobs ran at an interval for job_definition in static_definitions.job_definitions.values(): - last_job = jobs_store.get_last_job( - session=session, - job_definition_name=job_definition.name, - ) - - if last_job: - # If a job already exists, check it has completed and - # the interval has elapsed to start a new one. - if not last_job.completed_at: - continue + try: + last_job = jobs_store.get_last_job( + session=session, + job_definition_name=job_definition.name, + ) - # If the last job completed with success, we check for - # the job definition interval. - if not last_job.error: - scheduled_at = croniter( - job_definition.minimal_interval, - last_job.started_at, - ).get_next(ret_type=datetime) - if scheduled_at > utcnow(): + if last_job: + # If a job already exists, check it has completed and + # the interval has elapsed to start a new one. + if not last_job.completed_at: continue - job_name: str = f"{job_definition.name}-{int(time.time())}" - queue = queues_store.create_queue(session=session, name=job_name) - job = jobs_store.create_job( - name=job_name, - session=session, - queue_name=queue.name, - job_definition_name=job_definition.name, - ) + # If the last job completed with success, we check for + # the job definition interval. + if not last_job.error: + scheduled_at = croniter( + job_definition.minimal_interval, + last_job.started_at, + ).get_next(ret_type=datetime) + if scheduled_at > utcnow(): + continue + + job_name: str = f"{job_definition.name}-{int(time.time())}" + queue = queues_store.create_queue(session=session, name=job_name) + job = jobs_store.create_job( + name=job_name, + session=session, + queue_name=queue.name, + job_definition_name=job_definition.name, + ) - # If the last job was an error, we resume from where we were. - if last_job and last_job.error: - job.cursor = last_job.cursor + # If the last job was an error, we resume from where we were. + if last_job and last_job.error: + job.cursor = last_job.cursor - session.commit() + session.commit() + except Exception: + logger.exception("Failed to create %s", job_definition.name)