Skip to content

Commit

Permalink
Fix bug with cursor state naemspace
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed May 7, 2024
1 parent d4b7727 commit bf18f8e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 6 deletions.
1 change: 1 addition & 0 deletions example/definitions/simple.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ spec:
config:
job_state:
cursors_states_enabled: true
cursors_states_namespace: example-namespace
job:
buffer_size: 5

13 changes: 10 additions & 3 deletions src/saturn_engine/stores/jobs_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
from datetime import datetime

import sqlalchemy as sa
from sqlalchemy import select
from sqlalchemy import union_all
from sqlalchemy import update
Expand Down Expand Up @@ -190,13 +191,19 @@ def fetch_cursors_states(
fetch_stmts = []
for job, cursors in query.items():
fetch_stmts.append(
select(Job.name, JobCursorState)
select(JobCursorState, sa.func.coalesce(Job.name, job).label("name"))
.join(
JobCursorState,
Job,
Job.job_definition_name == JobCursorState.job_definition_name,
isouter=True,
)
.where(
Job.name == job,
sa.or_(
Job.name == job,
sa.and_(
Job.name.is_(None), JobCursorState.job_definition_name == job
),
),
JobCursorState.cursor.in_(cursors),
)
)
Expand Down
17 changes: 14 additions & 3 deletions tests/worker_manager/api/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ def test_sync_states(
"completed_at": "2020-01-01T01:02:03+04:00",
"error": "ValueError: boo",
},
}
},
}
}
},
Expand All @@ -649,7 +649,7 @@ def test_sync_states(
},
},
orphan_job.name: {"cursors_states": {"a": {"x": 1}}},
}
},
}
},
)
Expand Down Expand Up @@ -677,6 +677,7 @@ def test_fetch_cursors_states(
new_job: Job,
orphan_job: Job,
) -> None:
custom_ns = "custom-namespace"
resp = client.post(
"/api/jobs/_states",
json={
Expand All @@ -687,7 +688,12 @@ def test_fetch_cursors_states(
"a": {"x": 1},
"b": {"x": 2},
},
}
},
custom_ns: {
"cursors_states": {
"c": {"x": 1},
}
},
}
}
},
Expand All @@ -702,6 +708,7 @@ def test_fetch_cursors_states(
new_job.name: ["a", "c"],
orphan_job.name: ["a"],
"do-not-exist": ["a"],
custom_ns: ["c", "d"],
}
},
)
Expand All @@ -714,6 +721,10 @@ def test_fetch_cursors_states(
},
orphan_job.name: {"a": None},
"do-not-exist": {"a": None},
custom_ns: {
"c": {"x": 1},
"d": None,
},
}
}

Expand Down

0 comments on commit bf18f8e

Please sign in to comment.