Skip to content

Commit

Permalink
Add auto-recreate option into v0.partition_table__update when schem…
Browse files Browse the repository at this point in the history
…a change is detected (#69)

Resolve #64
  • Loading branch information
takegue committed Jul 17, 2023
1 parent 0018f69 commit 11d4770
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,19 @@ create or replace function `bqtest.zbqt_gensql__udf_snapshot`(
signature
, ret
from
unnest([
struct(string(null) as signature, string(NULL) as ret)
, %s
]) as R
unnest(
array<struct<signature string, ret string>>[
%s
]
) as R
%s
where
signature is not null
"""
, 0)
, ifnull(
ltrim(`bqmake.v0.zreindent`(array_to_string(array(
select
format("(%t, format('%%T', %s))", format("%T", s), s)
from unnest(signature) as s
select
format("(%t, format('%%T', %s))", format("%T", s), s)
from unnest(signature) as s
)
, '\n, '
), 6))
Expand Down
69 changes: 59 additions & 10 deletions bigquery/@default/v0/@routines/partition_table__update/ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Arguments
* force_expired_at: The timestamp to force expire partitions. If the destination's partition timestamp is older than this timestamp, the procedure stale the partitions. [Default: null].
* bq_location: BigQuery Location of job. This is used for query analysis to get dependencies. [Default: "region-us"]
* backfill_direction: The direction to backfill partitions. [Default: "backward"]
* auto_recreate: if target table schema change is detected, procedure recreate whole table [Default: "error_if_target_not_exists"]
Examples
===
Expand Down Expand Up @@ -60,9 +61,18 @@ begin
declare partition_range struct<begin string, `end` string>;
declare partition_column struct<name string, type string>;
declare partition_unit string;
declare table_identifier string;

-- Options
declare _options struct<dry_run BOOL, tolerate_delay INTERVAL, max_update_partition_range INTERVAL, via_temp_table BOOL, bq_location string, backfill_direction int64> default (
declare _options struct<
dry_run BOOL,
tolerate_delay INTERVAL,
max_update_partition_range INTERVAL,
via_temp_table BOOL,
bq_location string,
backfill_direction int64,
auto_recreate string
> default (
ifnull(bool(options.dry_run), false)
, ifnull(safe_cast(string(options.tolerate_delay) as interval), interval 0 minute)
, ifnull(safe_cast(string(options.max_update_partition_range) as interval), interval 1 month)
Expand All @@ -73,11 +83,12 @@ begin
when "forward" then -1
else error(format("Invalid backfill_direction: %p", options.backfill_direction))
end
, ifnull(string(options.auto_recreate), "error_if_not_exists")
);

-- Assert invalid options
select logical_and(if(
key in ('dry_run', 'tolerate_delay', 'max_update_partition_range', 'via_temp_table', 'backfill_direction')
key in ('dry_run', 'tolerate_delay', 'max_update_partition_range', 'via_temp_table', 'backfill_direction', 'auto_recreate')
, true
, error(format("Invalid Option: name=%t in %t'", key, `options`))
))
Expand Down Expand Up @@ -169,6 +180,7 @@ begin
when safe.parse_datetime('%Y%m%d', partition_range.begin) is not null then 'DAY'
when safe.parse_datetime('%Y%m', partition_range.begin) is not null then 'MONTH'
when safe.parse_datetime('%Y', partition_range.begin) is not null then 'YEAR'
when partition_range.begin = '__NULL__' then 'NO_PARTITION'
else error(format('Invalid partition_id: %s', partition_range.begin))
end
, (
Expand All @@ -177,16 +189,59 @@ begin
, format_datetime('%Y-%m-%d', safe.parse_datetime('%Y%m%d', partition_range.begin))
, format_datetime('%Y-%m-%d', safe.parse_datetime('%Y%m', partition_range.begin))
, format_datetime('%Y-%m-%d', safe.parse_datetime('%Y', partition_range.begin))
, null
)
, coalesce(
format_datetime('%Y-%m-%d %T', safe.parse_datetime('%Y%m%d%H', partition_range.`end`))
, format_datetime('%Y-%m-%d', safe.parse_datetime('%Y%m%d', partition_range.`end`))
, format_datetime('%Y-%m-%d', safe.parse_datetime('%Y%m', partition_range.`end`))
, format_datetime('%Y-%m-%d', safe.parse_datetime('%Y', partition_range.`end`))
, null
)
)
);

set table_identifier = coalesce(format(
'%s.%s.%s'
, ifnull(destination.project_id, @@project_id)
, destination.dataset_id
, destination.table_id
), error("Invalid destination"));

-- Assert or recreate destination table
case _options.auto_recreate
when 'error_if_not_exists' THEN
execute immediate format("select * from `%s` limit 0", table_identifier);
when 'replace_if_changed' THEN
begin
-- Zero scan amount table schema compatibility check
execute immediate format(
"select * from `%s` union all %s limit 0", table_identifier, update_job_query
) using
partition_range.`begin` as `begin`
, partition_range.`end` as `end`
;
exception when error then
begin
declare table_ddl, column_ddl string;
execute immediate `v0.zgensql__table_recreation`(
(coalesce(destination.project_id, @@project_id), destination.dataset_id, destination.table_id)
, update_job_query || ' limit 0'
) into table_ddl, column_ddl;
execute immediate table_ddl
using
partition_range.`begin` as `begin`
, partition_range.`end` as `end`
;
execute immediate column_ddl;
end;
end
;
else
select error('Invalid option');
end case;

-- Build new data and merge
if ifnull(_options.via_temp_table, false) then
execute immediate format("""
create or replace temp table temp_table as
Expand All @@ -213,13 +268,7 @@ begin
delete
"""
-- Destination
, ifnull(format(
'%s.%s.%s'
, ifnull(destination.project_id, @@project_id)
, destination.dataset_id
, destination.table_id
), 'invalid destination'
)
, table_identifier
, if(ifnull(_options.via_temp_table, false), 'temp_table', update_job_query)
, case
when partition_unit = 'DAY' then
Expand All @@ -236,7 +285,7 @@ begin
end
)
, error(format(
"arguments is invalud: %T", (destination, partition_column.name, partition_range)
"arguments are invalid: %T", (destination, table_identifier, partition_column, partition_range)
))
)
using
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
begin
declare temp_schema, init_sql, defer_sql string;
set (temp_schema, init_sql, defer_sql) = (`v0.zgensql__temporary_dataset`(false));
execute immediate init_sql;
begin
-- Prepare fixtures
execute immediate format("""
create table `%s.bikeshare_stations`
like bigquery-public-data.austin_bikeshare.bikeshare_stations
"""
, temp_schema
);
execute immediate format("""
alter table `%s.bikeshare_stations`
alter column station_id set options (description = 'station id')
"""
, temp_schema
);

-- Scenario
call `v0.partition_table__update`(
(null, temp_schema, 'bikeshare_stations')
, [('bigquery-public-data', "austin_bikeshare", "bikeshare_stations")]
, [('__NULL__', ["__NULL__"])]
, "select 1 as `new_column`, * from bigquery-public-data.austin_bikeshare.bikeshare_stations"
, to_json(struct(
"replace_if_changed" as auto_recreate
))
);
set @@dataset_id = temp_schema;
assert exists(
select
description
from `INFORMATION_SCHEMA.COLUMN_FIELD_PATHS`
where table_name = "bikeshare_stations"
and column_name = 'station_id'
and description is not null
);

-- Tear down fixtures
execute immediate defer_sql;
exception when error then
execute immediate defer_sql;
raise using message = @@error.message;
end;
end;
108 changes: 108 additions & 0 deletions bigquery/@default/v0/@routines/zgensql__table_recreation/ddl.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
CREATE OR REPLACE FUNCTION `v0.zgensql__table_recreation`(
target_table struct<catalog string, schema string, name string>
, new_query string
)
OPTIONS(
description="""Private function to build DDL for table recreation with metadata.
`CREATE TABLE LIKE` is not suitable when the source table query and target table schema is different.
This SQL generator's goal is to generate DDL for table recreation with metadata like `CREATE TABLE LIKE` operator.
"""
)
AS (REPLACE(REPLACE(REPLACE(REPLACE(
r"""
with table_definition as (
select as value
ddl
from `!TABLE_SCHEMA!.INFORMATION_SCHEMA.TABLES`
where table_name = !TABLE_NAME!
)
, alter_column_options as (
select as value
"ALTER TABLE `!TABLE_IDENTITY!`\n"
|| string_agg(
format(
" ALTER COLUMN IF EXISTS `%s` SET OPTIONS(%s)"
, field_path
, array_to_string([
'description=' || format('%T', description)
, 'rounding_mode=' || format('%s', rounding_mode)
], ',')
)
, ',\n'
)
from `!TABLE_SCHEMA!.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS`
where table_name = !TABLE_NAME!
and (description is not null or rounding_mode is not null)
-- ALTER COLUMN syntax don't support subfields structure
and field_path = column_name
)
select as value
struct(
array_to_string(
[
"CREATE OR REPLACE TABLE `!TABLE_IDENTITY!`"
]
|| ifnull(
array(
select as value line
from unnest(split(table_definition, '\n')) as line
where
starts_with(line, 'PARTITION BY')
or starts_with(line, 'CLUSTER BY')
or starts_with(line, 'DEFAULT COLLATE')
)
, []
)
|| (
[
"OPTIONS("
]
|| table_options
|| [")"]
)
|| [
"AS",
!TABLE_NEW_QUERY!
]
, '\n')
as new_table_ddl
, alter_column_options as column_metadata_ddl
)
from table_definition, alter_column_options
left join unnest([struct(
ifnull(
array(
select as value line
from unnest(split(table_definition, '\n')) as line
where
starts_with(line, ' description=')
)
, []
) as table_options
)])
"""
, '!TABLE_SCHEMA!', coalesce(format('%s.%s', target_table.catalog, target_table.schema), target_table.schema, error('Invalid schema argument')))
, '!TABLE_NAME!', format('%T', target_table.name))
, '!TABLE_IDENTITY!', coalesce(
format('%s.%s.%s', target_table.catalog, target_table.schema, target_table.name)
, format('%s.%s', target_table.schema, target_table.name)
, format('%s', target_table.name)
, error('Invalid schema argument')))
, '!TABLE_NEW_QUERY!', format('%T', new_query))
);

begin
call `bqmake.v0.assert_golden`(
(null, "bqtest", "zgolden_routines"), -- Profiling query
`bqtest.zbqt_gensql__udf_snapshot`(
[
"""`v0.zgensql__table_recreation`(("bigquery-public-data", "san_francisco", "bikeshare_stations"), "SELECT *, 'new' as n FROM `bigquery-public-data.san_francisco.bikeshare_stations` LIMIT 0")"""
],
"zgolden_routines"
),
'signature',
true
);
end

0 comments on commit 11d4770

Please sign in to comment.