Skip to content

Commit

Permalink
Revise the doc: Add snowflake uri decompose in structure dataset
Browse files Browse the repository at this point in the history
Signed-off-by: HH <hhcs9527@gmail.com>
  • Loading branch information
hhcs9527 committed Sep 13, 2023
1 parent 26fcccf commit 3c7f046
Showing 1 changed file with 45 additions and 8 deletions.
53 changes: 45 additions & 8 deletions examples/data_types_and_io/data_types_and_io/structured_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#
# Structured dataset is a superset of Flyte Schema.
#
# The `StructuredDataset` Transformer can write a dataframe to BigQuery, s3, or any storage by registering new structured dataset encoder and decoder.
# The `StructuredDataset` Transformer can write a dataframe to BigQuery, s3, Snowflake, or any storage by registering new structured dataset encoder and decoder.
#
# Flytekit makes it possible to return or accept a {py:class}`pandas.DataFrame` which is automatically
# converted into Flyte's abstract representation of a structured dataset object.
Expand Down Expand Up @@ -81,24 +81,34 @@ def get_schema_df(a: int) -> FlyteSchema[superset_cols]:
def get_subset_df(df: Annotated[StructuredDataset, subset_cols]) -> Annotated[StructuredDataset, subset_cols]:
df = df.open(pd.DataFrame).all()
df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])])
# On specifying BigQuery uri for StructuredDataset, flytekit writes a pandas dataframe to a BigQuery table
# When specifying a BigQuery or Snowflake URI for a StructuredDataset, flytekit exports a Pandas DataFrame to a table in BigQuery or Snowflake.
return StructuredDataset(dataframe=df)


# %% [markdown]
# ## StructuredDataset with `uri` Argument
#
# BigQuery `uri` allows you to load and retrieve data from cloud using the `uri`. The `uri` comprises of the bucket name and the filename prefixed with `gs://`.
# If you specify BigQuery `uri` for StructuredDataset, BigQuery creates a table in the location specified by the `uri`.
# The `uri` in StructuredDataset reads from or writes to S3, GCP, BigQuery, or any storage.
# Let's understand how to convert a pandas DataFrame to a BigQuery table and vice-versa through an example.
# Both Snowflake and BigQuery `uri` allows you to load and retrieve data from cloud using the `uri`.
# The `uri` comprises of the bucket name and the filename prefixed with `bq://` for BigQuery and `snowflake://` for Snowflake.
# If you specify in either BigQuery or Snowflake `uri` for StructuredDataset, it will create a table in the location specified by the `uri`.
# The `uri` in StructuredDataset reads from or writes to S3, GCP, BigQuery, Snowflake or any storage.
# Let's understand how to convert a pandas DataFrame to a BigQuery or Snowflake table and vice-versa through an example.
#
# Before writing DataFrame to a BigQuery table,
#
# 1. Create a [GCP account](https://cloud.google.com/docs/authentication/getting-started) and create a service account.
# 2. Create a project and add the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to your .bashrc file.
# 3. Create a dataset in your project.

# Before writing DataFrame to a Snowflake table,
#
# 1. Create a [Snowflake account](https://signup.snowflake.com/) and create a service account.
# 2. Create a dataset in your project.
# 3. Use [Key Pair Authentication](https://docs.snowflake.com/en/user-guide/key-pair-auth) to setup the connections with Snowflake.
# 4. run the following command to setup the secret
# ```bash
# kubectl create secret generic snowflake --namespace=flyte --from-literal=private_key={your_private_key_above}
# ```
# %% [markdown]
# Import the dependencies.
# %%
Expand All @@ -118,6 +128,17 @@ def pandas_to_bq() -> StructuredDataset:
return StructuredDataset(dataframe=df, uri="bq://sample-project-1-352610.sample_352610.test1")


# %% [markdown]
# Define a task that converts a pandas DataFrame to a Snowflake table.
# %%
@task
def pandas_to_sf() -> StructuredDataset:
# create a pandas dataframe
df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})
# convert the dataframe to StructuredDataset
return StructuredDataset(dataframe=df, uri="snowflake://<user>:<your_account>/<database>/<schema>/<warehouse>/<table>")


# %% [markdown]
# :::{note}
# The BigQuery uri's format is `bq://<project_name>.<dataset_name>.<table_name>`.
Expand All @@ -131,6 +152,19 @@ def bq_to_pandas(sd: StructuredDataset) -> pd.DataFrame:
# convert to pandas dataframe
return sd.open(pd.DataFrame).all()

# %% [markdown]
# :::{note}
# The Snowflake uri's format is `snowflake://<user>:<your_account>/<database>/<schema>/<warehouse>/<table>`.
# :::

# %% [markdown]
# Define a task that converts the Snowflake table to a pandas DataFrame.
# %%
@task
def sf_to_pandas(sd: StructuredDataset) -> pd.DataFrame:
# convert to pandas dataframe
return sd.open(pd.DataFrame).all()


# %% [markdown]
# :::{note}
Expand All @@ -141,8 +175,11 @@ def bq_to_pandas(sd: StructuredDataset) -> pd.DataFrame:
# Trigger the tasks locally.
# %%
if __name__ == "__main__":
o1 = bq_to_pandas(sd=StructuredDataset(uri="bq://sample-project-1-352610.sample_352610.test1"))
o2 = pandas_to_bq()
obj_bq_1 = bq_to_pandas(sd=StructuredDataset(uri="bq://sample-project-1-352610.sample_352610.test1"))
obj_bq_2 = pandas_to_bq()

obj_sf_1 = sf_to_pandas(sd=StructuredDataset(uri="snowflake://<user>:<your_account>/<database>/<schema>/<warehouse>/<table>"))
obj_sf_2 = pandas_to_sf()


# %% [markdown]
Expand Down

0 comments on commit 3c7f046

Please sign in to comment.