Skip to content

Commit

Permalink
If a workflow is already running, return the active run id instead of…
Browse files Browse the repository at this point in the history
… attempting to trigger a new run
  • Loading branch information
Kyle Valade committed Sep 24, 2024
1 parent b3f6dbf commit 3f100c4
Showing 1 changed file with 18 additions and 2 deletions.
20 changes: 18 additions & 2 deletions dbt/adapters/databricks/python_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ def _submit_through_workflow(self, compiled_code: str, workflow_spec) -> None:
grants = workflow_spec.pop("grants", {})
self._update_job_permissions(job_id, grants)

run_id = self._trigger_job(job_id)
run_id = self._get_or_trigger_job_run(job_id)

self.tracker.insert_run_id(run_id)
self.polling(
Expand Down Expand Up @@ -847,10 +847,26 @@ def _get_current_job_owner(self, job_id) -> tuple[str, str]:

raise DbtRuntimeError(f"Error getting current owner for Databricks workflow.\n {response.content!r}")

def _trigger_job(self, job_id: dict):
def _get_or_trigger_job_run(self, job_id: dict):
"""
:return: the run id
"""
active_runs_response = self.session.get(
f"https://{self.credentials.host}/api/2.1/jobs/runs/list",
headers=self.extra_headers,
json={
'job_id': job_id,
'active_only': True,
}
)
if active_runs_response.status_code != 200:
raise DbtRuntimeError(f"Error getting active runs.\n {active_runs_response.content!r}")

active_runs = active_runs_response.json().get('runs', [])
if len(active_runs) > 0:
logger.info("Workflow already running, tracking active run instead of creating a new one")
return active_runs[0]['run_id']

request_body = {
"job_id": job_id,
}
Expand Down

0 comments on commit 3f100c4

Please sign in to comment.