From bf68fa13d977c2ee69fa175aef14fbce7d13a994 Mon Sep 17 00:00:00 2001 From: isra17 Date: Thu, 9 May 2024 16:17:16 -0400 Subject: [PATCH] Fix cursor fetch query --- src/saturn_engine/stores/jobs_store.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/src/saturn_engine/stores/jobs_store.py b/src/saturn_engine/stores/jobs_store.py index 94836108..232adb9e 100644 --- a/src/saturn_engine/stores/jobs_store.py +++ b/src/saturn_engine/stores/jobs_store.py @@ -189,21 +189,15 @@ def fetch_cursors_states( ) -> CursorsStates: # Generate a query for each jobs so we can UNION them. fetch_stmts = [] + jobs = session.execute( + select(Job.name, Job.job_definition_name).where(Job.name.in_(query.keys())) + ).all() + job_names = {j.name: j.job_definition_name for j in jobs} for job, cursors in query.items(): + job_name = job_names.get(job) or job fetch_stmts.append( - select(JobCursorState, sa.func.coalesce(Job.name, job).label("name")) - .join( - Job, - Job.job_definition_name == JobCursorState.job_definition_name, - isouter=True, - ) - .where( - sa.or_( - Job.name == job, - sa.and_( - Job.name.is_(None), JobCursorState.job_definition_name == job - ), - ), + select(JobCursorState, sa.literal(job).label("name")).where( + JobCursorState.job_definition_name == job_name, JobCursorState.cursor.in_(cursors), ) )