diff --git a/bigquery/@default/bqtest/@routines/zbqt_gensql__udf_snapshot/ddl.sql b/bigquery/@default/bqtest/@routines/zbqt_gensql__udf_snapshot/ddl.sql index 325bad4..f979cbd 100644 --- a/bigquery/@default/bqtest/@routines/zbqt_gensql__udf_snapshot/ddl.sql +++ b/bigquery/@default/bqtest/@routines/zbqt_gensql__udf_snapshot/ddl.sql @@ -15,20 +15,19 @@ create or replace function `bqtest.zbqt_gensql__udf_snapshot`( signature , ret from - unnest([ - struct(string(null) as signature, string(NULL) as ret) - , %s - ]) as R + unnest( + array>[ + %s + ] + ) as R %s - where - signature is not null """ , 0) , ifnull( ltrim(`bqmake.v0.zreindent`(array_to_string(array( - select - format("(%t, format('%%T', %s))", format("%T", s), s) - from unnest(signature) as s + select + format("(%t, format('%%T', %s))", format("%T", s), s) + from unnest(signature) as s ) , '\n, ' ), 6)) diff --git a/bigquery/@default/v0/@routines/partition_table__update/ddl.sql b/bigquery/@default/v0/@routines/partition_table__update/ddl.sql index f91d959..62abe2d 100644 --- a/bigquery/@default/v0/@routines/partition_table__update/ddl.sql +++ b/bigquery/@default/v0/@routines/partition_table__update/ddl.sql @@ -22,6 +22,7 @@ Arguments * force_expired_at: The timestamp to force expire partitions. If the destination's partition timestamp is older than this timestamp, the procedure stale the partitions. [Default: null]. * bq_location: BigQuery Location of job. This is used for query analysis to get dependencies. [Default: "region-us"] * backfill_direction: The direction to backfill partitions. [Default: "backward"] + * auto_recreate: if target table schema change is detected, procedure recreate whole table [Default: "error_if_target_not_exists"] Examples === @@ -60,9 +61,18 @@ begin declare partition_range struct; declare partition_column struct; declare partition_unit string; + declare table_identifier string; -- Options - declare _options struct default ( + declare _options struct< + dry_run BOOL, + tolerate_delay INTERVAL, + max_update_partition_range INTERVAL, + via_temp_table BOOL, + bq_location string, + backfill_direction int64, + auto_recreate string + > default ( ifnull(bool(options.dry_run), false) , ifnull(safe_cast(string(options.tolerate_delay) as interval), interval 0 minute) , ifnull(safe_cast(string(options.max_update_partition_range) as interval), interval 1 month) @@ -73,11 +83,12 @@ begin when "forward" then -1 else error(format("Invalid backfill_direction: %p", options.backfill_direction)) end + , ifnull(string(options.auto_recreate), "error_if_not_exists") ); -- Assert invalid options select logical_and(if( - key in ('dry_run', 'tolerate_delay', 'max_update_partition_range', 'via_temp_table', 'backfill_direction') + key in ('dry_run', 'tolerate_delay', 'max_update_partition_range', 'via_temp_table', 'backfill_direction', 'auto_recreate') , true , error(format("Invalid Option: name=%t in %t'", key, `options`)) )) @@ -169,6 +180,7 @@ begin when safe.parse_datetime('%Y%m%d', partition_range.begin) is not null then 'DAY' when safe.parse_datetime('%Y%m', partition_range.begin) is not null then 'MONTH' when safe.parse_datetime('%Y', partition_range.begin) is not null then 'YEAR' + when partition_range.begin = '__NULL__' then 'NO_PARTITION' else error(format('Invalid partition_id: %s', partition_range.begin)) end , ( @@ -177,16 +189,59 @@ begin , format_datetime('%Y-%m-%d', safe.parse_datetime('%Y%m%d', partition_range.begin)) , format_datetime('%Y-%m-%d', safe.parse_datetime('%Y%m', partition_range.begin)) , format_datetime('%Y-%m-%d', safe.parse_datetime('%Y', partition_range.begin)) + , null ) , coalesce( format_datetime('%Y-%m-%d %T', safe.parse_datetime('%Y%m%d%H', partition_range.`end`)) , format_datetime('%Y-%m-%d', safe.parse_datetime('%Y%m%d', partition_range.`end`)) , format_datetime('%Y-%m-%d', safe.parse_datetime('%Y%m', partition_range.`end`)) , format_datetime('%Y-%m-%d', safe.parse_datetime('%Y', partition_range.`end`)) + , null ) ) ); + set table_identifier = coalesce(format( + '%s.%s.%s' + , ifnull(destination.project_id, @@project_id) + , destination.dataset_id + , destination.table_id + ), error("Invalid destination")); + + -- Assert or recreate destination table + case _options.auto_recreate + when 'error_if_not_exists' THEN + execute immediate format("select * from `%s` limit 0", table_identifier); + when 'replace_if_changed' THEN + begin + -- Zero scan amount table schema compatibility check + execute immediate format( + "select * from `%s` union all %s limit 0", table_identifier, update_job_query + ) using + partition_range.`begin` as `begin` + , partition_range.`end` as `end` + ; + exception when error then + begin + declare table_ddl, column_ddl string; + execute immediate `v0.zgensql__table_recreation`( + (coalesce(destination.project_id, @@project_id), destination.dataset_id, destination.table_id) + , update_job_query || ' limit 0' + ) into table_ddl, column_ddl; + execute immediate table_ddl + using + partition_range.`begin` as `begin` + , partition_range.`end` as `end` + ; + execute immediate column_ddl; + end; + end + ; + else + select error('Invalid option'); + end case; + + -- Build new data and merge if ifnull(_options.via_temp_table, false) then execute immediate format(""" create or replace temp table temp_table as @@ -213,13 +268,7 @@ begin delete """ -- Destination - , ifnull(format( - '%s.%s.%s' - , ifnull(destination.project_id, @@project_id) - , destination.dataset_id - , destination.table_id - ), 'invalid destination' - ) + , table_identifier , if(ifnull(_options.via_temp_table, false), 'temp_table', update_job_query) , case when partition_unit = 'DAY' then @@ -236,7 +285,7 @@ begin end ) , error(format( - "arguments is invalud: %T", (destination, partition_column.name, partition_range) + "arguments are invalid: %T", (destination, table_identifier, partition_column, partition_range) )) ) using diff --git a/bigquery/@default/v0/@routines/partition_table__update/test_scinario__table_recreation_.sql b/bigquery/@default/v0/@routines/partition_table__update/test_scinario__table_recreation_.sql new file mode 100644 index 0000000..fc1eccb --- /dev/null +++ b/bigquery/@default/v0/@routines/partition_table__update/test_scinario__table_recreation_.sql @@ -0,0 +1,46 @@ +begin + declare temp_schema, init_sql, defer_sql string; + set (temp_schema, init_sql, defer_sql) = (`v0.zgensql__temporary_dataset`(false)); + execute immediate init_sql; + begin + -- Prepare fixtures + execute immediate format(""" + create table `%s.bikeshare_stations` + like bigquery-public-data.austin_bikeshare.bikeshare_stations + """ + , temp_schema + ); + execute immediate format(""" + alter table `%s.bikeshare_stations` + alter column station_id set options (description = 'station id') + """ + , temp_schema + ); + + -- Scenario + call `v0.partition_table__update`( + (null, temp_schema, 'bikeshare_stations') + , [('bigquery-public-data', "austin_bikeshare", "bikeshare_stations")] + , [('__NULL__', ["__NULL__"])] + , "select 1 as `new_column`, * from bigquery-public-data.austin_bikeshare.bikeshare_stations" + , to_json(struct( + "replace_if_changed" as auto_recreate + )) + ); + set @@dataset_id = temp_schema; + assert exists( + select + description + from `INFORMATION_SCHEMA.COLUMN_FIELD_PATHS` + where table_name = "bikeshare_stations" + and column_name = 'station_id' + and description is not null + ); + + -- Tear down fixtures + execute immediate defer_sql; + exception when error then + execute immediate defer_sql; + raise using message = @@error.message; + end; +end; diff --git a/bigquery/@default/v0/@routines/zgensql__table_recreation/ddl.sql b/bigquery/@default/v0/@routines/zgensql__table_recreation/ddl.sql new file mode 100644 index 0000000..be4bb95 --- /dev/null +++ b/bigquery/@default/v0/@routines/zgensql__table_recreation/ddl.sql @@ -0,0 +1,108 @@ +CREATE OR REPLACE FUNCTION `v0.zgensql__table_recreation`( + target_table struct + , new_query string +) +OPTIONS( + description="""Private function to build DDL for table recreation with metadata. + +`CREATE TABLE LIKE` is not suitable when the source table query and target table schema is different. +This SQL generator's goal is to generate DDL for table recreation with metadata like `CREATE TABLE LIKE` operator. +""" +) +AS (REPLACE(REPLACE(REPLACE(REPLACE( + r""" + with table_definition as ( + select as value + ddl + from `!TABLE_SCHEMA!.INFORMATION_SCHEMA.TABLES` + where table_name = !TABLE_NAME! + ) + , alter_column_options as ( + select as value + "ALTER TABLE `!TABLE_IDENTITY!`\n" + || string_agg( + format( + " ALTER COLUMN IF EXISTS `%s` SET OPTIONS(%s)" + , field_path + , array_to_string([ + 'description=' || format('%T', description) + , 'rounding_mode=' || format('%s', rounding_mode) + ], ',') + ) + , ',\n' + ) + from `!TABLE_SCHEMA!.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS` + where table_name = !TABLE_NAME! + and (description is not null or rounding_mode is not null) + -- ALTER COLUMN syntax don't support subfields structure + and field_path = column_name + ) + + select as value + struct( + array_to_string( + [ + "CREATE OR REPLACE TABLE `!TABLE_IDENTITY!`" + ] + || ifnull( + array( + select as value line + from unnest(split(table_definition, '\n')) as line + where + starts_with(line, 'PARTITION BY') + or starts_with(line, 'CLUSTER BY') + or starts_with(line, 'DEFAULT COLLATE') + ) + , [] + ) + || ( + [ + "OPTIONS(" + ] + || table_options + || [")"] + ) + || [ + "AS", + !TABLE_NEW_QUERY! + ] + , '\n') + as new_table_ddl + , alter_column_options as column_metadata_ddl + ) + from table_definition, alter_column_options + left join unnest([struct( + ifnull( + array( + select as value line + from unnest(split(table_definition, '\n')) as line + where + starts_with(line, ' description=') + ) + , [] + ) as table_options + )]) +""" + , '!TABLE_SCHEMA!', coalesce(format('%s.%s', target_table.catalog, target_table.schema), target_table.schema, error('Invalid schema argument'))) + , '!TABLE_NAME!', format('%T', target_table.name)) + , '!TABLE_IDENTITY!', coalesce( + format('%s.%s.%s', target_table.catalog, target_table.schema, target_table.name) + , format('%s.%s', target_table.schema, target_table.name) + , format('%s', target_table.name) + , error('Invalid schema argument'))) + , '!TABLE_NEW_QUERY!', format('%T', new_query)) +); + +begin + call `bqmake.v0.assert_golden`( + (null, "bqtest", "zgolden_routines"), -- Profiling query + `bqtest.zbqt_gensql__udf_snapshot`( + [ + """`v0.zgensql__table_recreation`(("bigquery-public-data", "san_francisco", "bikeshare_stations"), "SELECT *, 'new' as n FROM `bigquery-public-data.san_francisco.bikeshare_stations` LIMIT 0")""" + ], + "zgolden_routines" + ), + 'signature', + true + ); +end