Skip to content

Commit

Permalink
Bigquery Row-Level Deletes + Erase After on Database Collections (#5293)
Browse files Browse the repository at this point in the history
-  Specifying a delete masking strategy override at the collection level on a bigquery dataset yaml will cause matching rows to be deleted instead of updated when running an erasure request. The collection-level override takes precedence over the policy override. Again this just takes affect for Bigquery only as a first iteration.
- Extends erase_after functionality to be incorporated into the graph if specified for a dataset connector, not just a saas connector. This lets customers specify they want a specific node to run after another if necessary, while there is no intelligent ordering for erasure deletes.
  • Loading branch information
pattisdr committed Sep 25, 2024
1 parent ba2e4ee commit 0e2db7c
Show file tree
Hide file tree
Showing 14 changed files with 400 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The types of changes are:

### Added
- Added support for hierarchical notices in Privacy Center [#5291](https://github.com/ethyca/fides/pull/5291)
- Support row-level deletes for BigQuery and add erase_after support for database connectors [#5293](https://github.com/ethyca/fides/pull/5293)
- Added PUT endpoint for dataset configs [#5324](https://github.com/ethyca/fides/pull/5324)

### Changed
Expand Down
9 changes: 8 additions & 1 deletion data/dataset/bigquery_example_test_dataset.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ dataset:
description: Example of a BigQuery dataset containing a variety of related tables like customers, products, addresses, etc.
collections:
- name: address
fides_meta:
erase_after: [ bigquery_example_test_dataset.employee ]
fields:
- name: city
data_categories: [user.contact.address.city]
Expand All @@ -21,6 +23,8 @@ dataset:
data_categories: [user.contact.address.postal_code]

- name: customer
fides_meta:
erase_after: [ bigquery_example_test_dataset.address ]
fields:
- name: address_id
data_categories: [system.operations]
Expand All @@ -47,14 +51,17 @@ dataset:
length: 40

- name: employee
fides_meta:
masking_strategy_override:
strategy: delete
fields:
- name: address_id
data_categories: [system.operations]
fides_meta:
references:
- dataset: bigquery_example_test_dataset
field: address.id
direction: to
direction: from
- name: email
data_categories: [user.contact.email]
fides_meta:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ types-defusedxml==0.7.0.20240218
expandvars==0.9.0
fastapi[all]==0.111.0
fastapi-pagination[sqlalchemy]==0.12.25
fideslang==3.0.4
fideslang==3.0.6
fideslog==1.2.10
firebase-admin==5.3.0
GitPython==3.1.41
Expand Down
2 changes: 2 additions & 0 deletions src/fides/api/graph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union

from fideslang.models import MaskingStrategyOverride
from fideslang.validation import FidesKey
from pydantic import BaseModel, ConfigDict, field_serializer, field_validator

Expand Down Expand Up @@ -454,6 +455,7 @@ class Collection(BaseModel):
# An optional set of dependent fields that need to be queried together
grouped_inputs: Set[str] = set()
data_categories: Set[FidesKey] = set()
masking_strategy_override: Optional[MaskingStrategyOverride] = None

@property
def field_dict(self) -> Dict[FieldPath, Field]:
Expand Down
13 changes: 13 additions & 0 deletions src/fides/api/models/datasetconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,23 @@ def convert_dataset_to_graph(
CollectionAddress(*s.split(".")) for s in collection.fides_meta.after
}

collection_erase_after: Set[CollectionAddress] = set()
if collection.fides_meta and collection.fides_meta.erase_after:
collection_erase_after = {
CollectionAddress(*s.split("."))
for s in collection.fides_meta.erase_after
}

masking_override = None
if collection.fides_meta and collection.fides_meta.masking_strategy_override:
masking_override = collection.fides_meta.masking_strategy_override

graph_collection = Collection(
name=collection.name,
fields=graph_fields,
after=collection_after,
erase_after=collection_erase_after,
masking_strategy_override=masking_override,
skip_processing=collection_skip_processing,
data_categories=(
set(collection.data_categories) if collection.data_categories else set()
Expand Down
56 changes: 54 additions & 2 deletions src/fides/api/service/connectors/query_config.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# pylint: disable=too-many-lines
import re
from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar
from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar, Union

import pydash
from boto3.dynamodb.types import TypeSerializer
from fideslang.models import MaskingStrategies
from loguru import logger
from sqlalchemy import MetaData, Table, text
from sqlalchemy.engine import Engine
from sqlalchemy.sql import Executable, Update # type: ignore
from sqlalchemy.sql import Delete, Executable, Update # type: ignore
from sqlalchemy.sql.elements import ColumnElement, TextClause

from fides.api.graph.config import (
Expand Down Expand Up @@ -819,6 +820,28 @@ def get_formatted_query_string(
BigQuery reserved words."""
return f'SELECT {field_list} FROM `{self.node.collection.name}` WHERE {" OR ".join(clauses)}'

def generate_masking_stmt(
self,
node: ExecutionNode,
row: Row,
policy: Policy,
request: PrivacyRequest,
client: Engine,
) -> Union[Optional[Update], Optional[Delete]]:
"""
Generate a masking statement for BigQuery.
If a masking override is present, it will take precedence over the policy masking strategy.
"""

masking_override = node.collection.masking_strategy_override
if masking_override and masking_override.strategy == MaskingStrategies.DELETE:
logger.info(
f"Masking override detected for collection {node.address.value}: {masking_override.strategy.value}"
)
return self.generate_delete(row, client)
return self.generate_update(row, policy, request, client)

def generate_update(
self, row: Row, policy: Policy, request: PrivacyRequest, client: Engine
) -> Optional[Update]:
Expand Down Expand Up @@ -851,6 +874,35 @@ def generate_update(
]
return table.update().where(*pk_clauses).values(**update_value_map)

def generate_delete(self, row: Row, client: Engine) -> Optional[Delete]:
"""Returns a SQLAlchemy DELETE statement for BigQuery. Does not actually execute the delete statement.
Used when a collection-level masking override is present and the masking strategy is DELETE.
"""
non_empty_primary_keys: Dict[str, Field] = filter_nonempty_values(
{
fpath.string_path: fld.cast(row[fpath.string_path])
for fpath, fld in self.primary_key_field_paths.items()
if fpath.string_path in row
}
)

valid = len(non_empty_primary_keys) > 0
if not valid:
logger.warning(
"There is not enough data to generate a valid DELETE statement for {}",
self.node.address,
)
return None

table = Table(
self.node.address.collection, MetaData(bind=client), autoload=True
)
pk_clauses: List[ColumnElement] = [
getattr(table.c, k) == v for k, v in non_empty_primary_keys.items()
]
return table.delete().where(*pk_clauses)


MongoStatement = Tuple[Dict[str, Any], Dict[str, Any]]
"""A mongo query is expressed in the form of 2 dicts, the first of which represents
Expand Down
20 changes: 12 additions & 8 deletions src/fides/api/service/connectors/sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,19 +562,23 @@ def mask_data(
request_task: RequestTask,
rows: List[Row],
) -> int:
"""Execute a masking request. Returns the number of records masked"""
"""Execute a masking request. Returns the number of records updated or deleted"""
query_config = self.query_config(node)
update_ct = 0
update_or_delete_ct = 0
client = self.client()
for row in rows:
update_stmt: Optional[Executable] = query_config.generate_update(
row, policy, privacy_request, client
update_or_delete_stmt: Optional[Executable] = (
query_config.generate_masking_stmt(
node, row, policy, privacy_request, client
)
)
if update_stmt is not None:
if update_or_delete_stmt is not None:
with client.connect() as connection:
results: LegacyCursorResult = connection.execute(update_stmt)
update_ct = update_ct + results.rowcount
return update_ct
results: LegacyCursorResult = connection.execute(
update_or_delete_stmt
)
update_or_delete_ct = update_or_delete_ct + results.rowcount
return update_or_delete_ct


class SnowflakeConnector(SQLConnector):
Expand Down
18 changes: 18 additions & 0 deletions tests/fixtures/bigquery_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ def bigquery_resources(
insert into customer (id, email, name, address_id)
values ({customer_id}, '{customer_email}', '{customer_name}', {address_id});
"""

connection.execute(stmt)

stmt = "select max(id) from employee;"
res = connection.execute(stmt)
employee_id = res.all()[0][0] + 1
employee_email = f"employee-{uuid}@example.com"
employee_name = f"Jane {uuid}"

stmt = f"""
insert into employee (id, email, name, address_id)
values ({employee_id}, '{employee_email}', '{employee_name}', {address_id});
"""
connection.execute(stmt)

yield {
Expand All @@ -131,6 +144,8 @@ def bigquery_resources(
"city": city,
"state": state,
"connector": connector,
"employee_id": employee_id,
"employee_email": employee_email,
}
# Remove test data and close BigQuery connection in teardown
stmt = f"delete from customer where email = '{customer_email}';"
Expand All @@ -139,6 +154,9 @@ def bigquery_resources(
stmt = f"delete from address where id = {address_id};"
connection.execute(stmt)

stmt = f"delete from employee where address_id = {address_id};"
connection.execute(stmt)


@pytest.fixture(scope="session")
def bigquery_test_engine() -> Generator:
Expand Down
25 changes: 25 additions & 0 deletions tests/ops/graph/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pydantic
import pytest
from fideslang.models import MaskingStrategies

from fides.api.graph.config import *
from fides.api.graph.data_type import (
Expand Down Expand Up @@ -95,6 +96,7 @@ def test_from_string(self):
collection_to_serialize = ds = Collection(
name="t3",
skip_processing=False,
masking_strategy_override=None,
fields=[
ScalarField(
name="f1",
Expand Down Expand Up @@ -124,6 +126,7 @@ def test_from_string(self):
serialized_collection = {
"name": "t3",
"skip_processing": False,
"masking_strategy_override": None,
"fields": [
{
"name": "f1",
Expand Down Expand Up @@ -378,6 +381,28 @@ def test_parse_from_task_without_data_categories(self):
parsed = Collection.parse_from_request_task(serialized_collection)
assert parsed.data_categories == set()

def test_collection_masking_strategy_override(self):
ds = Collection(
name="t3",
masking_strategy_override=MaskingStrategyOverride(
strategy=MaskingStrategies.DELETE
),
fields=[],
)

assert ds.masking_strategy_override == MaskingStrategyOverride(
strategy=MaskingStrategies.DELETE
)

serialized_collection_with_masking_override = {
"name": "t3",
"masking_strategy_override": {"strategy": "delete"},
"fields": [],
}

coll = ds.parse_from_request_task(serialized_collection_with_masking_override)
assert coll == ds


class TestField:
def test_generate_field(self) -> None:
Expand Down
37 changes: 37 additions & 0 deletions tests/ops/graph/test_graph_traversal.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import pytest
from fideslang import Dataset
from fideslang.models import MaskingStrategies

from fides.api.graph.graph import *
from fides.api.models.datasetconfig import convert_dataset_to_graph

from .graph_test_util import *

Expand Down Expand Up @@ -31,6 +34,40 @@ def test_graph_creation() -> None:
assert graph.identity_keys == {FieldAddress("dr_1", "ds_1", "f1"): "x"}


@pytest.mark.integration_external
@pytest.mark.integration_bigquery
def test_graph_creation_with_collection_level_meta(
example_datasets, bigquery_connection_config
):
dataset = Dataset(**example_datasets[7])
graph = convert_dataset_to_graph(dataset, bigquery_connection_config.key)
dg = DatasetGraph(*[graph])

# Assert erase_after
customer_collection = dg.nodes[
CollectionAddress("bigquery_example_test_dataset", "customer")
].collection
assert customer_collection.erase_after == {
CollectionAddress("bigquery_example_test_dataset", "address")
}

address_collection = dg.nodes[
CollectionAddress("bigquery_example_test_dataset", "address")
].collection
assert address_collection.erase_after == {
CollectionAddress("bigquery_example_test_dataset", "employee")
}
assert address_collection.masking_strategy_override is None

employee_collection = dg.nodes[
CollectionAddress("bigquery_example_test_dataset", "employee")
].collection
assert employee_collection.erase_after == set()
assert employee_collection.masking_strategy_override == MaskingStrategyOverride(
strategy=MaskingStrategies.DELETE
)


def test_extract_seed_nodes() -> None:
# TEST INIT:
t = generate_graph_resources(3)
Expand Down
3 changes: 2 additions & 1 deletion tests/ops/integration_tests/setup_scripts/postgres_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

# Need to manually import this model because it's used in src/fides/api/models/property.py
# but that file only imports it conditionally if TYPE_CHECKING is true
from fides.api.models.experience_notices import ExperienceNotices
from fides.api.models.privacy_experience import PrivacyExperienceConfig
from fides.api.service.connectors.sql_connector import PostgreSQLConnector
from fides.config import CONFIG
Expand All @@ -29,7 +30,7 @@ def seed_postgres_data(db: Session, query_file_path: str) -> Session:
that contains the query to seed the data in the DB. e.g.,
`./docker/sample_data/postgres_example.sql`
Using the provided sesion, creates the database, dropping it if it
Using the provided session, creates the database, dropping it if it
already existed. Seeds the created database using the query found
in the relative path provided. Some processing is done on the query
text so that it can be executed properly.
Expand Down
Loading

0 comments on commit 0e2db7c

Please sign in to comment.