-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft: #756 - implement python workflow submissions #762
base: 1.9.latest
Are you sure you want to change the base?
Changes from 18 commits
f03b3e5
1f8a17e
16cb9df
f92af82
36be7c3
8affd9b
2e5e09d
0ba3c1d
b4dfbe7
4a46b79
afc1b5a
df173aa
0078058
3664df2
b3f6dbf
3f100c4
13033ed
0985af6
dfaa6cc
81996f4
214372b
c19b196
e9bc5fc
743ca00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -162,3 +162,265 @@ def submit(self, compiled_code: str) -> None: | |
class ServerlessClusterPythonJobHelper(BaseDatabricksHelper): | ||
def submit(self, compiled_code: str) -> None: | ||
self._submit_through_notebook(compiled_code, {}) | ||
|
||
|
||
class WorkflowPythonJobHelper(BaseDatabricksHelper): | ||
|
||
@property | ||
def default_job_name(self) -> str: | ||
return f"{self.database}-{self.schema}-{self.identifier}__dbt" | ||
|
||
@property | ||
def notebook_path(self) -> str: | ||
return f"{self.notebook_dir}/{self.identifier}" | ||
|
||
@property | ||
def notebook_dir(self) -> str: | ||
return f"/Shared/dbt_python_model/{self.database}/{self.schema}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably use the folder api logic. I've been told that using Shared is an anti-pattern and that we might get rid of it as we continue with improved governance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah ok cool - I hadn't dug into that part of your refactorings. I'll make that change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated to use the new API classes |
||
|
||
def check_credentials(self) -> None: | ||
workflow_config = self.parsed_model["config"].get("workflow_job_config", None) | ||
if not workflow_config: | ||
raise ValueError( | ||
"workflow_job_config is required for the `workflow_job_config` submission method." | ||
) | ||
|
||
def submit(self, compiled_code: str) -> None: | ||
workflow_spec = self.parsed_model["config"]["workflow_job_config"] | ||
cluster_spec = self.parsed_model["config"].get("job_cluster_config", None) | ||
|
||
# This dict gets modified throughout. Settings added through dbt are popped off | ||
# before the spec is sent to the Databricks API | ||
workflow_spec = self._build_job_spec(workflow_spec, cluster_spec) | ||
|
||
self._submit_through_workflow(compiled_code, workflow_spec) | ||
|
||
def _build_job_spec(self, workflow_spec, cluster_spec): | ||
workflow_spec["name"] = workflow_spec.get('name', self.default_job_name) | ||
|
||
cluster_settings = {} # Undefined cluster settings defaults to serverless in the Databricks API | ||
if cluster_spec is not None: | ||
cluster_settings["new_cluster"] = cluster_spec | ||
elif 'existing_cluster_id' in workflow_spec: | ||
cluster_settings['existing_cluster_id'] = workflow_spec['existing_cluster_id'] | ||
|
||
notebook_task = { | ||
'task_key': 'task_a', | ||
'notebook_task': { | ||
"notebook_path": self.notebook_path, | ||
"source": "WORKSPACE", | ||
}, | ||
} | ||
notebook_task.update(cluster_settings) | ||
notebook_task.update(workflow_spec.pop("additional_task_settings", {})) | ||
|
||
post_hook_tasks = workflow_spec.pop("post_hook_tasks", []) | ||
for task in post_hook_tasks: | ||
if not 'existing_cluster_id' in task and not 'new_cluster' in task: | ||
task.update(cluster_settings) | ||
|
||
workflow_spec["tasks"] = [notebook_task] + post_hook_tasks | ||
return workflow_spec | ||
|
||
def _submit_through_workflow(self, compiled_code: str, workflow_spec) -> None: | ||
self.api_client.workspace.upload_notebook(self.notebook_path, compiled_code) | ||
|
||
job_id, is_new = self._get_or_create_job(workflow_spec) | ||
|
||
if not is_new: | ||
self._update_job(job_id, workflow_spec) | ||
|
||
grants = workflow_spec.pop("grants", {}) | ||
self._update_job_permissions(job_id, grants) | ||
|
||
run_id = self._get_or_trigger_job_run(job_id) | ||
|
||
self.tracker.insert_run_id(run_id) | ||
self.polling( | ||
status_func=self.session.get, | ||
status_func_kwargs={ | ||
"url": f"https://{self.credentials.host}/api/2.1/jobs/runs/get?run_id={run_id}", | ||
"headers": self.extra_headers, | ||
}, | ||
get_state_func=lambda response: response.json()["state"]["life_cycle_state"], | ||
terminal_states=("TERMINATED", "SKIPPED", "INTERNAL_ERROR"), | ||
expected_end_state="TERMINATED", | ||
get_state_msg_func=lambda response: response.json()["state"]["state_message"], | ||
) | ||
|
||
run_output = self.api_client.session.get( | ||
kdazzle marked this conversation as resolved.
Show resolved
Hide resolved
|
||
f"https://{self.credentials.host}" f"/api/2.1/jobs/runs/get?run_id={run_id}" | ||
) | ||
json_run_output = run_output.json() | ||
|
||
result_state = json_run_output["state"]["result_state"] | ||
if result_state != "SUCCESS": | ||
raise DbtRuntimeError( | ||
"Python model failed with traceback as:\n" | ||
"(Note that the line number here does not " | ||
"match the line number in your code due to dbt templating)\n" | ||
f"{utils.remove_ansi(json_run_output['error_trace'])}" | ||
) | ||
self.tracker.remove_run_id(run_id) | ||
|
||
def _get_or_create_job(self, workflow_spec: dict) -> tuple[int, bool]: | ||
""" | ||
:return: tuple of job_id and whether the job is new | ||
""" | ||
existing_job_id = workflow_spec.pop('existing_job_id', '') | ||
if existing_job_id: | ||
return existing_job_id, False | ||
|
||
response = self.api_client.session.get( | ||
f"https://{self.credentials.host}/api/2.1/jobs/list", | ||
json={ | ||
"name": workflow_spec['name'], | ||
} | ||
) | ||
|
||
if response.status_code != 200: | ||
raise DbtRuntimeError(f"Error getting job.\n {response.content!r}") | ||
response_json = response.json() | ||
logger.info(f"Job list response={response_json}") | ||
|
||
response_jobs = response_json.get("jobs", []) | ||
if len(response_jobs) > 1: | ||
raise DbtRuntimeError(f"Multiple jobs found with name {workflow_spec['name']}") | ||
|
||
if len(response_jobs) == 1: | ||
return response_json["jobs"][0]["job_id"], False | ||
else: | ||
return self._create_job(workflow_spec), True | ||
|
||
def _create_job(self, workflow_spec: dict): | ||
""" | ||
:return: the job id | ||
""" | ||
response = self.api_client.session.post( | ||
f"https://{self.credentials.host}/api/2.1/jobs/create", | ||
headers=self.extra_headers, | ||
json=workflow_spec, | ||
) | ||
if response.status_code != 200: | ||
raise DbtRuntimeError(f"Error creating Databricks workflow.\n {response.content!r}") | ||
response_json = response.json() | ||
logger.info(f"Workflow create response={response_json}") | ||
return response_json["job_id"] | ||
|
||
def _update_job(self, job_id, workflow_spec): | ||
request_body = { | ||
"job_id": job_id, | ||
"new_settings": workflow_spec, | ||
} | ||
response = self.api_client.session.post( | ||
f"https://{self.credentials.host}/api/2.1/jobs/reset", | ||
headers=self.extra_headers, | ||
json=request_body, | ||
) | ||
|
||
logger.info(f"Workflow update response={response.json()}") | ||
if response.status_code != 200: | ||
raise DbtRuntimeError(f"Error updating Databricks workflow.\n {response.content!r}") | ||
|
||
def _update_job_permissions(self, job_id, job_grants): | ||
access_control_list = self._build_job_permissions(job_id, job_grants) | ||
|
||
request_body = { | ||
"access_control_list": access_control_list | ||
} | ||
|
||
response = self.api_client.session.put( | ||
f"https://{self.credentials.host}/api/2.0/permissions/jobs/{job_id}", | ||
headers=self.extra_headers, | ||
json=request_body, | ||
) | ||
|
||
logger.info(f"Workflow permissions update response={response.json()}") | ||
if response.status_code != 200: | ||
raise DbtRuntimeError(f"Error updating Databricks workflow.\n {response.content!r}") | ||
|
||
def _build_job_permissions(self, job_id, job_grants) -> list: | ||
access_control_list = [] | ||
current_owner, permissions_attribute = self._get_current_job_owner(job_id) | ||
access_control_list.append({ | ||
permissions_attribute: current_owner, | ||
'permission_level': 'IS_OWNER', | ||
}) | ||
|
||
for grant in job_grants.get('view', []): | ||
acl_grant = grant.copy() | ||
acl_grant.update({ | ||
'permission_level': 'CAN_VIEW', | ||
}) | ||
access_control_list.append(acl_grant) | ||
for grant in job_grants.get('run', []): | ||
acl_grant = grant.copy() | ||
acl_grant.update({ | ||
'permission_level': 'CAN_MANAGE_RUN', | ||
}) | ||
access_control_list.append(acl_grant) | ||
for grant in job_grants.get('manage', []): | ||
acl_grant = grant.copy() | ||
acl_grant.update({ | ||
'permission_level': 'CAN_MANAGE', | ||
}) | ||
access_control_list.append(acl_grant) | ||
|
||
return access_control_list | ||
|
||
def _get_current_job_owner(self, job_id) -> tuple[str, str]: | ||
""" | ||
:return: a tuple of the user id and the ACL attribute it came from ie: | ||
[user_name|group_name|service_principal_name] | ||
For example: `("mateizaharia@databricks.com", "user_name")` | ||
""" | ||
response = self.api_client.session.get( | ||
f"https://{self.credentials.host}/api/2.0/permissions/jobs/{job_id}" | ||
) | ||
if response.status_code != 200: | ||
raise DbtRuntimeError(f"Error getting Databricks workflow permissions.\n {response.content!r}") | ||
|
||
for principal in response.json().get("access_control_list", []): | ||
for permission in principal['all_permissions']: | ||
if permission['permission_level'] == 'IS_OWNER' and permission['inherited'] is False: | ||
if principal.get('user_name'): | ||
return principal['user_name'], 'user_name' | ||
elif principal.get('group_name'): | ||
return principal['group_name'], 'group_name' | ||
else: | ||
return principal['service_principal_name'], 'service_principal_name' | ||
|
||
raise DbtRuntimeError(f"Error getting current owner for Databricks workflow.\n {response.content!r}") | ||
|
||
def _get_or_trigger_job_run(self, job_id: dict): | ||
""" | ||
:return: the run id | ||
""" | ||
active_runs_response = self.api_client.session.get( | ||
f"https://{self.credentials.host}/api/2.1/jobs/runs/list", | ||
json={ | ||
'job_id': job_id, | ||
'active_only': True, | ||
} | ||
) | ||
if active_runs_response.status_code != 200: | ||
raise DbtRuntimeError(f"Error getting active runs.\n {active_runs_response.content!r}") | ||
|
||
active_runs = active_runs_response.json().get('runs', []) | ||
if len(active_runs) > 0: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the workflow is already running, I think we still need to schedule a run after it completes...this is one of the edge-cases that Amy is worried about. Consider that in the dbt run, the tables that this model depends on may have been updated after that job run started; in order to ensure that downstream tables get those updates, we need to run the job again. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha - that's a good point. That shouldn't be too hard to work in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't realize there was an option for that in the Databricks API already - that made things easy. Done. |
||
logger.info("Workflow already running, tracking active run instead of creating a new one") | ||
return active_runs[0]['run_id'] | ||
|
||
request_body = { | ||
"job_id": job_id, | ||
} | ||
response = self.api_client.session.post( | ||
f"https://{self.credentials.host}/api/2.1/jobs/run-now", | ||
json=request_body | ||
) | ||
if response.status_code != 200: | ||
raise DbtRuntimeError(f"Error triggering a run for Databricks workflow.\n {response.content!r}") | ||
response_json = response.json() | ||
logger.info(f"Workflow trigger response={response_json}") | ||
|
||
return response_json["run_id"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think it might be helpful to put the dbt up front. That way if you sort in the UI, all of the dbt jobs are together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, thanks Ben
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed