-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add type three scd upserts functionality #127
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -578,6 +578,63 @@ This function is designed to rename a Delta table. It can operate either within | |
rename_delta_table(existing_delta_table, "new_table_name") | ||
``` | ||
|
||
## Type 3 SCD Upserts | ||
|
||
Perform a type 3 scd upsert on a target delta table. | ||
|
||
Parameters: | ||
|
||
- `delta_table` (`DeltaTable`): An object representing the delta table to be upserted. | ||
- `updates_df` (`DataFrame`): The data to be used in order to upsert the target delta table. | ||
- `primary_key` (`str`): The primary key (i.e. business key) uniquely identifiy each row in the target delta table. | ||
- `curr_prev_col_names` (`dict[str,str]`): A dictionary of column names to store current and previous values. | ||
`Key`: Column name for current value. | ||
`Value`: Column name for previous value. | ||
|
||
|
||
Suppose you have the following delta table: | ||
|
||
``` | ||
+----+----+----+-------+--------+------------+-------------+--------------+ | ||
|pkey|name|job|prev_job| country|prev_country| continent|prev_continent| | ||
+----+----+---+--------+--------+------------+-------------+--------------+ | ||
| 1| A| AA| null| Japan| null| Asia| null| | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add some examples where the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okey-dokey |
||
| 2| B| BB| null| London| null| Europe| null| | ||
| 3| C| CC| null| canada| null|North America| null| | ||
+----+----+---+--------+--------+------------+-------------+--------------+ | ||
``` | ||
|
||
The source data to be upserted on target delta table: | ||
|
||
``` | ||
+----+----+----+-----------+-------------+ | ||
|pkey|name|job| country| continent| | ||
+----+----+---+------------+-------------+ | ||
| 1| A1| AA| Japan| Asia| // update on name | ||
| 2| B1|BBB| Peru|South America| // updates on name,job,country,continent --> storing previous values on prev_job,prev_country,prev_continent | ||
| 3| C| CC| New Zeland| Oceania| // updates on country,continent --> storing previous values on prev_country,prev_continent | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should demonstrate these two cases:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
| 5| D| DD|South Africa| Africa| // new row | ||
+----+----+---+------------+-------------+ | ||
``` | ||
|
||
Here's how to perform the type 3 scd upsert: | ||
|
||
```scala | ||
mack.type_3_scd_upsert(delta_table, updatesDF, "pkey", {"country":"prev_country", "job":"prev_job", "continent":"prev_continent"}) | ||
``` | ||
|
||
Here's the table after the upsert: | ||
|
||
``` | ||
+----+----+----+-------+------------+------------+-------------+--------------+ | ||
|pkey|name|job|prev_job| country|prev_country| continent|prev_continent| | ||
+----+----+---+--------+------------+------------+-------------+--------------+ | ||
| 1| A1| AA| null| Japan| null| Asia| null| | ||
| 2| B1|BBB| BB| Peru| London|South America| Europe| | ||
| 3| C| CC| null| New Zeland| canada| Oceania| North America| | ||
| 5| D| DD| null|South Africa| null| Africa| null| | ||
+----+----+---+--------+------------+------------+-------------+--------------+ | ||
``` | ||
|
||
## Dictionary | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
from itertools import combinations | ||
from typing import List, Union, Dict, Optional | ||
from collections import Counter | ||
|
||
from delta import DeltaTable | ||
import pyspark | ||
|
@@ -735,3 +736,119 @@ def rename_delta_table( | |
delta_table.toDF().write.format("delta").mode("overwrite").saveAsTable( | ||
new_table_name | ||
) | ||
|
||
def type_3_scd_upsert( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How would the user use this function with an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I recomend to use it as an optional parameter. If needed, the function have to update the A structure of a delta table with an +----+----+----+-------+------------------+ or much more wider +----+----+----+-------+------------------+-------+------------+-----------------+---------+-------------------------+ Now how we will provide the column names representing the A parameter as a list of list ? --> param = ( ( A three parameters: I don't like the second option unless we consider that the function cannot handle multiple scd columns at a time. |
||
delta_table: DeltaTable, | ||
updates_df: DataFrame, | ||
primary_key: str, | ||
curr_prev_col_names: dict[str,str] | ||
) -> None: | ||
""" | ||
Apply scd type 3 updates on a target delta table. | ||
|
||
:param delta_table: The target delta table. | ||
:type delta_table: DeltaTable | ||
|
||
:param updates_df: The source dataframe that will be used to apply scd type 3 on the target delta table. | ||
:type updates_df: DataFrame | ||
|
||
:param primary_key: The primary key (i.e. business key) uniquely identifiy each row in the target delta table. | ||
:type primary_key: str | ||
|
||
:param curr_prev_col_names: A dictionary of column names to store current and previous values. | ||
-> Key: Column name for current value. | ||
-> Value: Column name for previous value. | ||
:type curr_prev_col_names: dict[str,str] | ||
|
||
:raises TypeError: Raises type error when find a duplication in the items' value of the dictionary 'curr_prev_col_names'. | ||
:raises TypeError: Raises type error when find a key equals to a value in items of the dictionary 'curr_prev_col_names'. | ||
:raises TypeError: Raises type error when required columns are missing in the delta table. | ||
:raises TypeError: Raises type error when required columns are missing in the update dataframe. | ||
""" | ||
|
||
# validate the curr_prev_col_names parameters | ||
## raise an error in case of dict values duplication | ||
count_dict = Counter(curr_prev_col_names.values()) | ||
prev_col_name_duplicates = [(key,value) for key, value in curr_prev_col_names.items() if count_dict[value] > 1] | ||
|
||
if prev_col_name_duplicates: | ||
raise TypeError( | ||
f"Find duplication in the values of the dictionary curr_prev_col_names: {prev_col_name_duplicates!r}" | ||
) | ||
## raise error when find key equals to value | ||
keys_equal_to_values = [(key,value) for key, value in curr_prev_col_names.items() if key == value] | ||
if keys_equal_to_values: | ||
raise TypeError( | ||
f"Keys cannot be equal to values in the dictionary curr_prev_col_names: {keys_equal_to_values!r}" | ||
) | ||
|
||
# validate the existing Delta table | ||
base_col_names = delta_table.toDF().columns | ||
required_base_col_names = ( | ||
[primary_key] | ||
+ [items for item in curr_prev_col_names.items() for items in item] | ||
) | ||
missing_col_names = [item for item in required_base_col_names if item not in base_col_names] | ||
if missing_col_names: | ||
raise TypeError( | ||
f"Cannot find these columns {missing_col_names!r} in the base table {base_col_names!r}" | ||
) | ||
|
||
# validate the updates DataFrame | ||
updates_col_names = updates_df.columns | ||
prev_col_names = list(curr_prev_col_names.values()) | ||
required_updates_col_names = [item for item in base_col_names if item not in (prev_col_names)] # filter out all prev_col_names from base_col_names | ||
if sorted(updates_col_names) != sorted(required_updates_col_names): | ||
raise TypeError( | ||
f"The updates DataFrame has these columns {updates_col_names!r}, but these columns are required {required_updates_col_names!r}" | ||
) | ||
|
||
# merge condition | ||
merge_condition = pyspark.sql.functions.expr(f"trg.{primary_key} = src.{primary_key}") | ||
|
||
# update condition | ||
updates_attr = [attr for attr in base_col_names if attr not in (primary_key,prev_col_names)] | ||
updates_condition = list( | ||
map(lambda attr: f"trg.{attr} <> src.{attr}", updates_attr) | ||
) | ||
updates_condition = " OR ".join(updates_condition) | ||
|
||
# rows to be inserted | ||
previous_state_for_inserts = list( | ||
map(lambda item: f"NULL as {item}", prev_col_names) | ||
) | ||
|
||
staged_inserts_df = ( | ||
updates_df.alias('inserts') | ||
.join(delta_table.toDF().alias('trg'),primary_key,'leftanti') | ||
.selectExpr(["inserts.*"] + previous_state_for_inserts) | ||
) | ||
|
||
# rows to be updated | ||
previous_state_for_updates = list( | ||
map(lambda item: f"coalesce(nullif(trg.{item[0]},updates.{item[0]}),trg.{item[1]}) as {item[1]}" ,curr_prev_col_names.items()) | ||
) | ||
|
||
staged_updates_df = ( | ||
updates_df.alias('updates') | ||
.join(delta_table.toDF().alias('trg'),primary_key) | ||
.selectExpr(["updates.*"] + previous_state_for_updates) | ||
) | ||
|
||
# input data = staged_updates_df + staged_inserts_df | ||
staged_inputs_df = staged_updates_df.union(staged_inserts_df) | ||
|
||
# perform the merge | ||
res = ( | ||
delta_table.alias('trg') | ||
.merge( | ||
source=staged_inputs_df.alias('src'), | ||
condition=merge_condition | ||
) | ||
.whenMatchedUpdateAll( | ||
condition=updates_condition | ||
) | ||
.whenNotMatchedInsertAll() | ||
.execute() | ||
) | ||
return res |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we separate this to two parameters:
current_col_name
andprevious_col_name
. I generally prefer exposing public interfaces with full works instead of abbreviations. Is there any advantage to grouping these two arguments in a dictionary?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think that, it depends on if we want to:
Suppose that we have a delta table having this structure:
+----+----+----+-------+--------+------------+-------------+--------------+
|pkey|name|job|prev_job| country|prev_country| continent|prev_continent|
+----+----+---+--------+--------+------------+-------------+--------------+
The columns
prev_job
,prev_country
andprev_continent
will store the previous values for the columnsjob
,country
andcontinent
after applying the type 3 scd.Uisng a dictionary as a parameter to store the column names (for current and previous values) is less prone to error than storing them in two separate lists.
Option #1: Using a dictinary --> Triggering the type_3_scd_upsert once
col_names
= {"country":"prev_country", "job":"prev_job", "continent":"prev_continent"}mack.type_3_scd_upsert(delta_table, updates_df, "pkey",col_names)
Option #2: Using two parameters --> Triggering the type_3_scd_upsert multiple times
current_col_names
= ["country","job","continent"]previous_col_names
= ["prev_country","prev_job","prev_continent"]mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)
Here an error on columns order can cause a cartastrophy, like storing the previous values of the column 'job' in the column 'prev_country' (or the opposite):
current_col_names
= ["country", "job", "continent"]previous_col_names
= ["prev_job","prev_country","prev_continent"]let's use the same delta table structure mentioned above.
In this case we will have something like this:
//type 3 scd upserts for column
job
current_col_names
= "job"previous_col_names
= "prev_job"mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)
//type 3 scd upserts for column
country
current_col_names
= "country"previous_col_names
= "prev_country"mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)
//type 3 scd upserts for column
continent
current_col_names
= "continent"previous_col_names
= "prev_continent"mack.type_3_scd_upsert(delta_table, updates_df, "pkey",current_col_names,previous_col_names)