-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft: #756 - implement python workflow submissions #762
base: 1.9.latest
Are you sure you want to change the base?
Draft: #756 - implement python workflow submissions #762
Conversation
… on job Signed-off-by: Kyle Valade <kylevalade@rivian.com>
Signed-off-by: Kyle Valade <kylevalade@rivian.com>
…be reset Signed-off-by: Kyle Valade <kylevalade@rivian.com>
@kdazzle can you rebase/target your PR against 1.9.latest? I have a couple of things that I need to wrap up, but I'm planning to take some version of this into the 1.9 release. |
Looks like some syntax you're using does not work with python 3.8 based on unit test failures. |
def __init__(self, session: Session, host: str): | ||
super().__init__(session, host, "/api/2.0/permissions/jobs") | ||
|
||
def put(self, job_id, access_control_list): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be part of the operations that dbt handles? If so, should there be an equivalent for the job runs approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd argue it should be since dbt creates these workflows, so they should also at least allow for the possibility of managing permissions. Otherwise these objects are kind of orphaned and there has to be some other process. Dbt abdicates that responsibility on schemas, which puts everyone in an awkward position.
If so, should there be an equivalent for the job runs approach?
I'm not sure it's as necessary for job runs, since those are usually just a one-time thing. But I'm happy to add it in there if you think that makes the most sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point about schema. That's in my backlog of pain points :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take a look at pulling that into job runs too
logger.info(f"Workflow creation response={response.json()}") | ||
return response.json()["job_id"] | ||
|
||
def update_by_reset(self, job_id, job_spec): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this intended for? It's obscure enough that a doc string might be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good call. It's basically a PUT, but it's called a reset in the Databricks API. It's to make the job reflect the config, with any changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed method name to update_job_settings
per your other comment
|
||
@property | ||
def default_job_name(self) -> str: | ||
return f"{self.database}-{self.schema}-{self.identifier}__dbt" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think it might be helpful to put the dbt up front. That way if you sort in the UI, all of the dbt jobs are together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, thanks Ben
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
|
||
@property | ||
def notebook_dir(self) -> str: | ||
return f"/Shared/dbt_python_model/{self.database}/{self.schema}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably use the folder api logic. I've been told that using Shared is an anti-pattern and that we might get rid of it as we continue with improved governance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok cool - I hadn't dug into that part of your refactorings. I'll make that change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to use the new API classes
job_id, is_new = self._get_or_create_job(workflow_spec) | ||
|
||
if not is_new: | ||
self.api_client.workflows.update_by_reset(job_id, workflow_spec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though the API is 'reset', I think update_job_settings is probably more apt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I like that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
:return: the run id | ||
""" | ||
active_runs = self.api_client.job_runs.list_active_runs_for_job(job_id) | ||
if len(active_runs) > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the workflow is already running, I think we still need to schedule a run after it completes...this is one of the edge-cases that Amy is worried about. Consider that in the dbt run, the tables that this model depends on may have been updated after that job run started; in order to ensure that downstream tables get those updates, we need to run the job again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha - that's a good point. That shouldn't be too hard to work in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't realize there was an option for that in the Databricks API already - that made things easy. Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly minor. Biggest thing is that I think if there is already an active run, we need to wait and then execute another one.
WIP - Stubs out implementation for #756
This pretty much implements what a workflow job submission type would look like, though I'm sure I'm missing something. Tests haven't been added yet.
Sample
Outside of the new submission type, models are the same. Here is what one could look like:
The config for a model could look like (forgive my jsonification...yaml data structures still freak me out):
Explanation
For all of the dbt configs that I added (in addition to the Databricks API attributes), I tried to roughly mediate between the dbt convention of requiring minimal configuration, but also allowing for the full flexibility of the Databricks API. Attribute names were trying to split the difference between the Databricks API and the dbt API. Happy to change the approach for anything.
existing_job_id
in case users want to reuse an existing workflow. If noname
is provided in this config, it will get renamed to the default job name (currentlyf"{self.database}-{self.schema}-{self.identifier}__dbt"
)existing_job_id
is also providedtask_a
- configurable inadditional_task_settings
new_cluster
orexisting_cluster_id
. Leaving blank reuses the model's cluster configpost_hook
might be a misnomer, because you could technically set the dbt model to depend on one of these tasks, making it also a pre hookgrants
- allow for permissions to be set on the workflow so that additional users/teams can run the job ad hoc if needed (for initial runs/backfills, etc). The owner is carried forward (partly because I wasn't sure if there was a great way to determine whether the current user is a user or service principal), and the format needs to follow the Databricks API where you specify whether the user is a user, group, or SP.additional_task_settings
to add to/override the default dbt model taskTodo:
all_purpose_cluster
attribute, similar tojob_cluster_config
?Description
Checklist
CHANGELOG.md
and added information about my change to the "dbt-databricks next" section.