Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: #756 - implement python workflow submissions #762

Open
wants to merge 24 commits into
base: 1.9.latest
Choose a base branch
from

Conversation

kdazzle
Copy link

@kdazzle kdazzle commented Aug 8, 2024

WIP - Stubs out implementation for #756

This pretty much implements what a workflow job submission type would look like, though I'm sure I'm missing something. Tests haven't been added yet.

Sample

Outside of the new submission type, models are the same. Here is what one could look like:

# my_model.py
import pyspark.sql.types as T
import pyspark.sql.functions as F


def model(dbt, session):
    dbt.config(
        materialized='incremental',
        submission_method='workflow_job'
    )

    output_schema = T.StructType([
        T.StructField("id", T.StringType(), True),
        T.StructField("odometer_meters", T.DoubleType(), True),
        T.StructField("timestamp", T.TimestampType(), True),
    ])
    return spark.createDataFrame(data=spark.sparkContext.emptyRDD(), schema=output_schema)

The config for a model could look like (forgive my jsonification...yaml data structures still freak me out):

models:
  - name: my_model
      workflow_job_config:
        email_notifications: {
          on_failure: ["reynoldxin@databricks.com"]
        }
        max_retries: 2
        timeout_seconds: 18000
        existing_job_id: 12341234  # added: optional
        additional_task_settings: {  # added: Optional
          "task_key": "my_dbt_task"
        }
        post_hook_tasks: [{  # added: optional
          "depends_on": [{ "task_key": "my_dbt_task" }],
          "task_key": 'OPTIMIZE_AND_VACUUM',
          "notebook_task": {
            "notebook_path": "/my_notebook_path",
            "source": "WORKSPACE",
          },
        }]
        grants:  # added: Optional
          view: [
            {"group_name": "marketing-team"},
          ]
          run: [
            {"user_name": "alighodsi@databricks.com"}
          ]
          manage: []
      job_cluster_config:
        spark_version: "12.2.x-scala2.12"
        node_type_id: "rd-fleet.8xlarge"
        runtime_engine: "{{ var('job_cluster_defaults.runtime_engine') }}"
        data_security_mode: "{{ var('job_cluster_defaults.data_security_mode') }}"
        autoscale: {
          "min_workers": 1,
          "max_workers": 4
        }

Explanation

For all of the dbt configs that I added (in addition to the Databricks API attributes), I tried to roughly mediate between the dbt convention of requiring minimal configuration, but also allowing for the full flexibility of the Databricks API. Attribute names were trying to split the difference between the Databricks API and the dbt API. Happy to change the approach for anything.

  • added existing_job_id in case users want to reuse an existing workflow. If no name is provided in this config, it will get renamed to the default job name (currently f"{self.database}-{self.schema}-{self.identifier}__dbt")
  • Job names must be unique unless existing_job_id is also provided
  • The task key for the model run task is hardcoded as task_a - configurable in additional_task_settings
  • Allow for "post_hook tasks"
    • Can specify a different cluster type using Databricks' new_cluster or existing_cluster_id. Leaving blank reuses the model's cluster config
    • post_hook might be a misnomer, because you could technically set the dbt model to depend on one of these tasks, making it also a pre hook
  • grants - allow for permissions to be set on the workflow so that additional users/teams can run the job ad hoc if needed (for initial runs/backfills, etc). The owner is carried forward (partly because I wasn't sure if there was a great way to determine whether the current user is a user or service principal), and the format needs to follow the Databricks API where you specify whether the user is a user, group, or SP.
  • additional_task_settings to add to/override the default dbt model task

Todo:

  • Reuse all_purpose_cluster attribute, similar to job_cluster_config?
  • Can I use a serverless job cluster? (by not defining any cluster)
  • Fix the run tracker
  • What happens if the workflow is already running?
    • I'd like the new dbt job run to start tracking the current Databricks workflow run, rather than failing
  • Log if workflow permissions are being changed? (Kind of mimicking TF apply logs, which have been helpful in the past when table permissions had been unexpectedly broadened)

Description

Checklist

  • I have run this code in development and it appears to resolve the stated issue
  • This PR includes tests, or tests are not required/relevant for this PR
  • I have updated the CHANGELOG.md and added information about my change to the "dbt-databricks next" section.

@kdazzle kdazzle changed the title #756 - stub out implementation for python workflow submissions Draft: #756 - stub out implementation for python workflow submissions Aug 8, 2024
Kyle Valade added 2 commits August 12, 2024 14:21
Signed-off-by: Kyle Valade <kylevalade@rivian.com>
@kdazzle kdazzle changed the title Draft: #756 - stub out implementation for python workflow submissions Draft: #756 - implement for python workflow submissions Aug 14, 2024
@kdazzle kdazzle changed the title Draft: #756 - implement for python workflow submissions Draft: #756 - implement python workflow submissions Aug 14, 2024
@benc-db
Copy link
Collaborator

benc-db commented Sep 27, 2024

@kdazzle can you rebase/target your PR against 1.9.latest? I have a couple of things that I need to wrap up, but I'm planning to take some version of this into the 1.9 release.

@kdazzle kdazzle changed the base branch from main to 1.9.latest September 27, 2024 21:12
@benc-db
Copy link
Collaborator

benc-db commented Oct 1, 2024

Looks like some syntax you're using does not work with python 3.8 based on unit test failures.

def __init__(self, session: Session, host: str):
super().__init__(session, host, "/api/2.0/permissions/jobs")

def put(self, job_id, access_control_list):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be part of the operations that dbt handles? If so, should there be an equivalent for the job runs approach?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue it should be since dbt creates these workflows, so they should also at least allow for the possibility of managing permissions. Otherwise these objects are kind of orphaned and there has to be some other process. Dbt abdicates that responsibility on schemas, which puts everyone in an awkward position.

If so, should there be an equivalent for the job runs approach?

I'm not sure it's as necessary for job runs, since those are usually just a one-time thing. But I'm happy to add it in there if you think that makes the most sense.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about schema. That's in my backlog of pain points :P

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a look at pulling that into job runs too

logger.info(f"Workflow creation response={response.json()}")
return response.json()["job_id"]

def update_by_reset(self, job_id, job_spec):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this intended for? It's obscure enough that a doc string might be useful.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good call. It's basically a PUT, but it's called a reset in the Databricks API. It's to make the job reflect the config, with any changes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed method name to update_job_settings per your other comment


@property
def default_job_name(self) -> str:
return f"{self.database}-{self.schema}-{self.identifier}__dbt"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think it might be helpful to put the dbt up front. That way if you sort in the UI, all of the dbt jobs are together.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, thanks Ben

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed


@property
def notebook_dir(self) -> str:
return f"/Shared/dbt_python_model/{self.database}/{self.schema}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably use the folder api logic. I've been told that using Shared is an anti-pattern and that we might get rid of it as we continue with improved governance.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok cool - I hadn't dug into that part of your refactorings. I'll make that change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to use the new API classes

job_id, is_new = self._get_or_create_job(workflow_spec)

if not is_new:
self.api_client.workflows.update_by_reset(job_id, workflow_spec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though the API is 'reset', I think update_job_settings is probably more apt.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I like that

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

:return: the run id
"""
active_runs = self.api_client.job_runs.list_active_runs_for_job(job_id)
if len(active_runs) > 0:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the workflow is already running, I think we still need to schedule a run after it completes...this is one of the edge-cases that Amy is worried about. Consider that in the dbt run, the tables that this model depends on may have been updated after that job run started; in order to ensure that downstream tables get those updates, we need to run the job again.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha - that's a good point. That shouldn't be too hard to work in

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't realize there was an option for that in the Databricks API already - that made things easy. Done.

Copy link
Collaborator

@benc-db benc-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly minor. Biggest thing is that I think if there is already an active run, we need to wait and then execute another one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants