Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: #756 - implement python workflow submissions #762

Open
wants to merge 24 commits into
base: 1.9.latest
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f03b3e5
#756 - stub out implementation for python workflow submissions
Aug 8, 2024
1f8a17e
Make this runnable
Aug 12, 2024
16cb9df
Allow for job updates
Aug 12, 2024
f92af82
doccomments to code
Aug 12, 2024
36be7c3
Allow for existing cluster id; pull notebook paths + dirs into proper…
Aug 12, 2024
8affd9b
Build path from dir, not vice versa
Aug 12, 2024
2e5e09d
Remove comments
Aug 12, 2024
0ba3c1d
linting
Aug 12, 2024
b4dfbe7
Allow user to specify an existing workflow/job id
Aug 13, 2024
4a46b79
Don't override job name - provide a default
Aug 13, 2024
afc1b5a
Allow additional tasks to get added to the workflow as 'post_hook_tasks'
Aug 13, 2024
df173aa
Allow for additional model task settings; allow permissions to be set…
Aug 14, 2024
0078058
Update permissions regardless of whether the job is new or not
Aug 14, 2024
3664df2
Don't skip setting job grants if they aren't defined, as they should …
Aug 14, 2024
b3f6dbf
Use different run endpoint for result status
Sep 20, 2024
3f100c4
If a workflow is already running, return the active run id instead of…
Sep 24, 2024
13033ed
Allow for serverless tasks
Sep 27, 2024
0985af6
BROKEN: starting to merge with 1.9.latest. Need to fix api_client calls
Sep 27, 2024
dfaa6cc
Add new API classes to support workflow jobs
Oct 1, 2024
81996f4
Add lots of types
Oct 1, 2024
214372b
Fix types for python 3.8
Oct 1, 2024
c19b196
Missed a Tuple
Oct 1, 2024
e9bc5fc
Use FolderApi; change dbt namespacing placement
Oct 1, 2024
743ca00
Queue each workflow run behind an active run
Oct 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
from dbt.adapters.databricks.python_models.python_submissions import (
ServerlessClusterPythonJobHelper,
)
from dbt.adapters.databricks.python_models.python_submissions import (
WorkflowPythonJobHelper,
)
from dbt.adapters.databricks.relation import DatabricksRelation
from dbt.adapters.databricks.relation import DatabricksRelationType
from dbt.adapters.databricks.relation import KEY_TABLE_PROVIDER
Expand Down Expand Up @@ -623,6 +626,7 @@ def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
"job_cluster": JobClusterPythonJobHelper,
"all_purpose_cluster": AllPurposeClusterPythonJobHelper,
"serverless_cluster": ServerlessClusterPythonJobHelper,
"workflow_job": WorkflowPythonJobHelper,
}

@available
Expand Down
262 changes: 262 additions & 0 deletions dbt/adapters/databricks/python_models/python_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,265 @@ def submit(self, compiled_code: str) -> None:
class ServerlessClusterPythonJobHelper(BaseDatabricksHelper):
def submit(self, compiled_code: str) -> None:
self._submit_through_notebook(compiled_code, {})


class WorkflowPythonJobHelper(BaseDatabricksHelper):

@property
def default_job_name(self) -> str:
return f"{self.database}-{self.schema}-{self.identifier}__dbt"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think it might be helpful to put the dbt up front. That way if you sort in the UI, all of the dbt jobs are together.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, thanks Ben

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed


@property
def notebook_path(self) -> str:
return f"{self.notebook_dir}/{self.identifier}"

@property
def notebook_dir(self) -> str:
return f"/Shared/dbt_python_model/{self.database}/{self.schema}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably use the folder api logic. I've been told that using Shared is an anti-pattern and that we might get rid of it as we continue with improved governance.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok cool - I hadn't dug into that part of your refactorings. I'll make that change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to use the new API classes


def check_credentials(self) -> None:
workflow_config = self.parsed_model["config"].get("workflow_job_config", None)
if not workflow_config:
raise ValueError(
"workflow_job_config is required for the `workflow_job_config` submission method."
)

def submit(self, compiled_code: str) -> None:
workflow_spec = self.parsed_model["config"]["workflow_job_config"]
cluster_spec = self.parsed_model["config"].get("job_cluster_config", None)

# This dict gets modified throughout. Settings added through dbt are popped off
# before the spec is sent to the Databricks API
workflow_spec = self._build_job_spec(workflow_spec, cluster_spec)

self._submit_through_workflow(compiled_code, workflow_spec)

def _build_job_spec(self, workflow_spec, cluster_spec):
workflow_spec["name"] = workflow_spec.get('name', self.default_job_name)

cluster_settings = {} # Undefined cluster settings defaults to serverless in the Databricks API
if cluster_spec is not None:
cluster_settings["new_cluster"] = cluster_spec
elif 'existing_cluster_id' in workflow_spec:
cluster_settings['existing_cluster_id'] = workflow_spec['existing_cluster_id']

notebook_task = {
'task_key': 'task_a',
'notebook_task': {
"notebook_path": self.notebook_path,
"source": "WORKSPACE",
},
}
notebook_task.update(cluster_settings)
notebook_task.update(workflow_spec.pop("additional_task_settings", {}))

post_hook_tasks = workflow_spec.pop("post_hook_tasks", [])
for task in post_hook_tasks:
if not 'existing_cluster_id' in task and not 'new_cluster' in task:
task.update(cluster_settings)

workflow_spec["tasks"] = [notebook_task] + post_hook_tasks
return workflow_spec

def _submit_through_workflow(self, compiled_code: str, workflow_spec) -> None:
self.api_client.workspace.upload_notebook(self.notebook_path, compiled_code)

job_id, is_new = self._get_or_create_job(workflow_spec)

if not is_new:
self._update_job(job_id, workflow_spec)

grants = workflow_spec.pop("grants", {})
self._update_job_permissions(job_id, grants)

run_id = self._get_or_trigger_job_run(job_id)

self.tracker.insert_run_id(run_id)
self.polling(
status_func=self.session.get,
status_func_kwargs={
"url": f"https://{self.credentials.host}/api/2.1/jobs/runs/get?run_id={run_id}",
"headers": self.extra_headers,
},
get_state_func=lambda response: response.json()["state"]["life_cycle_state"],
terminal_states=("TERMINATED", "SKIPPED", "INTERNAL_ERROR"),
expected_end_state="TERMINATED",
get_state_msg_func=lambda response: response.json()["state"]["state_message"],
)

run_output = self.api_client.session.get(
kdazzle marked this conversation as resolved.
Show resolved Hide resolved
f"https://{self.credentials.host}" f"/api/2.1/jobs/runs/get?run_id={run_id}"
)
json_run_output = run_output.json()

result_state = json_run_output["state"]["result_state"]
if result_state != "SUCCESS":
raise DbtRuntimeError(
"Python model failed with traceback as:\n"
"(Note that the line number here does not "
"match the line number in your code due to dbt templating)\n"
f"{utils.remove_ansi(json_run_output['error_trace'])}"
)
self.tracker.remove_run_id(run_id)

def _get_or_create_job(self, workflow_spec: dict) -> tuple[int, bool]:
"""
:return: tuple of job_id and whether the job is new
"""
existing_job_id = workflow_spec.pop('existing_job_id', '')
if existing_job_id:
return existing_job_id, False

response = self.api_client.session.get(
f"https://{self.credentials.host}/api/2.1/jobs/list",
json={
"name": workflow_spec['name'],
}
)

if response.status_code != 200:
raise DbtRuntimeError(f"Error getting job.\n {response.content!r}")
response_json = response.json()
logger.info(f"Job list response={response_json}")

response_jobs = response_json.get("jobs", [])
if len(response_jobs) > 1:
raise DbtRuntimeError(f"Multiple jobs found with name {workflow_spec['name']}")

if len(response_jobs) == 1:
return response_json["jobs"][0]["job_id"], False
else:
return self._create_job(workflow_spec), True

def _create_job(self, workflow_spec: dict):
"""
:return: the job id
"""
response = self.api_client.session.post(
f"https://{self.credentials.host}/api/2.1/jobs/create",
headers=self.extra_headers,
json=workflow_spec,
)
if response.status_code != 200:
raise DbtRuntimeError(f"Error creating Databricks workflow.\n {response.content!r}")
response_json = response.json()
logger.info(f"Workflow create response={response_json}")
return response_json["job_id"]

def _update_job(self, job_id, workflow_spec):
request_body = {
"job_id": job_id,
"new_settings": workflow_spec,
}
response = self.api_client.session.post(
f"https://{self.credentials.host}/api/2.1/jobs/reset",
headers=self.extra_headers,
json=request_body,
)

logger.info(f"Workflow update response={response.json()}")
if response.status_code != 200:
raise DbtRuntimeError(f"Error updating Databricks workflow.\n {response.content!r}")

def _update_job_permissions(self, job_id, job_grants):
access_control_list = self._build_job_permissions(job_id, job_grants)

request_body = {
"access_control_list": access_control_list
}

response = self.api_client.session.put(
f"https://{self.credentials.host}/api/2.0/permissions/jobs/{job_id}",
headers=self.extra_headers,
json=request_body,
)

logger.info(f"Workflow permissions update response={response.json()}")
if response.status_code != 200:
raise DbtRuntimeError(f"Error updating Databricks workflow.\n {response.content!r}")

def _build_job_permissions(self, job_id, job_grants) -> list:
access_control_list = []
current_owner, permissions_attribute = self._get_current_job_owner(job_id)
access_control_list.append({
permissions_attribute: current_owner,
'permission_level': 'IS_OWNER',
})

for grant in job_grants.get('view', []):
acl_grant = grant.copy()
acl_grant.update({
'permission_level': 'CAN_VIEW',
})
access_control_list.append(acl_grant)
for grant in job_grants.get('run', []):
acl_grant = grant.copy()
acl_grant.update({
'permission_level': 'CAN_MANAGE_RUN',
})
access_control_list.append(acl_grant)
for grant in job_grants.get('manage', []):
acl_grant = grant.copy()
acl_grant.update({
'permission_level': 'CAN_MANAGE',
})
access_control_list.append(acl_grant)

return access_control_list

def _get_current_job_owner(self, job_id) -> tuple[str, str]:
"""
:return: a tuple of the user id and the ACL attribute it came from ie:
[user_name|group_name|service_principal_name]
For example: `("mateizaharia@databricks.com", "user_name")`
"""
response = self.api_client.session.get(
f"https://{self.credentials.host}/api/2.0/permissions/jobs/{job_id}"
)
if response.status_code != 200:
raise DbtRuntimeError(f"Error getting Databricks workflow permissions.\n {response.content!r}")

for principal in response.json().get("access_control_list", []):
for permission in principal['all_permissions']:
if permission['permission_level'] == 'IS_OWNER' and permission['inherited'] is False:
if principal.get('user_name'):
return principal['user_name'], 'user_name'
elif principal.get('group_name'):
return principal['group_name'], 'group_name'
else:
return principal['service_principal_name'], 'service_principal_name'

raise DbtRuntimeError(f"Error getting current owner for Databricks workflow.\n {response.content!r}")

def _get_or_trigger_job_run(self, job_id: dict):
"""
:return: the run id
"""
active_runs_response = self.api_client.session.get(
f"https://{self.credentials.host}/api/2.1/jobs/runs/list",
json={
'job_id': job_id,
'active_only': True,
}
)
if active_runs_response.status_code != 200:
raise DbtRuntimeError(f"Error getting active runs.\n {active_runs_response.content!r}")

active_runs = active_runs_response.json().get('runs', [])
if len(active_runs) > 0:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the workflow is already running, I think we still need to schedule a run after it completes...this is one of the edge-cases that Amy is worried about. Consider that in the dbt run, the tables that this model depends on may have been updated after that job run started; in order to ensure that downstream tables get those updates, we need to run the job again.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha - that's a good point. That shouldn't be too hard to work in

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't realize there was an option for that in the Databricks API already - that made things easy. Done.

logger.info("Workflow already running, tracking active run instead of creating a new one")
return active_runs[0]['run_id']

request_body = {
"job_id": job_id,
}
response = self.api_client.session.post(
f"https://{self.credentials.host}/api/2.1/jobs/run-now",
json=request_body
)
if response.status_code != 200:
raise DbtRuntimeError(f"Error triggering a run for Databricks workflow.\n {response.content!r}")
response_json = response.json()
logger.info(f"Workflow trigger response={response_json}")

return response_json["run_id"]