Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed snapshot related bug which is not working with CTE's #153

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbt/adapters/fabric/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.8.0"
version = "1.8.1"
7 changes: 7 additions & 0 deletions dbt/adapters/fabric/fabric_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ def render_limited(self) -> str:
else:
return f"(select TOP {self.limit} * from {rendered}) _dbt_top_subq"

# TODO: Standardizing quote characters
# def quoted(self, identifier):
# return "[{identifier}]".format(
# quote_char=self.quote_character,
# identifier=identifier,
# )

@available
@classmethod
def render_column_constraint(cls, constraint: ColumnLevelConstraint) -> Optional[str]:
Expand Down
1 change: 1 addition & 0 deletions dbt/include/fabric/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
{% set tempTableName %}
{{ relation.schema }}.{{ relation.identifier }}_{{ range(1300, 19000) | random }}
{% endset %}
{{ log("Cannot Alter table type, as it is not supported. Using random table as a temp table. - " ~ tempTableName) }}

{% set tempTable %}
CREATE TABLE {{tempTableName}}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/fabric/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
{% set tempTableName %}
{{ relation.include(database=False).identifier.replace("#", "") }}_{{ range(21000, 109000) | random }}
{% endset %}

{{ log("Truncate Statement is not supported, Using random table as a temp table. - " ~ tempTableName) }}
{% call statement('truncate_relation') -%}
CREATE TABLE {{ tempTableName }} AS SELECT * FROM {{ relation }} WHERE 1=2
EXEC('DROP TABLE IF EXISTS {{ relation.include(database=False) }};');
Expand Down
199 changes: 199 additions & 0 deletions dbt/include/fabric/macros/materializations/snapshots/helpers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
{% macro fabric__post_snapshot(staging_relation) %}
-- Clean up the snapshot temp table
{% do drop_relation(staging_relation) %}
{% endmacro %}

--Due to Alter not being supported, have to rely on this for temporarily
{% macro fabric__create_columns(relation, columns) %}
{# default__ macro uses "add column"
TSQL preferes just "add"
#}

{% set columns %}
{% for column in columns %}
, CAST(NULL AS {{column.data_type}}) AS {{column_name}}
{% endfor %}
{% endset %}

{% set tempTableName %}
[{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}_{{ range(1300, 19000) | random }}]
{% endset %}
{{ log("Creating new columns are not supported without dropping a table. Using random table as a temp table. - " ~ tempTableName) }}

{% set tempTable %}
CREATE TABLE {{tempTableName}}
AS SELECT * {{columns}} FROM [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}] {{ information_schema_hints() }}
{% endset %}

{% call statement('create_temp_table') -%}
{{ tempTable }}
{%- endcall %}

{% set dropTable %}
DROP TABLE [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}]
{% endset %}

{% call statement('drop_table') -%}
{{ dropTable }}
{%- endcall %}

{% set createTable %}
CREATE TABLE {{ relation }}
AS SELECT * FROM {{tempTableName}} {{ information_schema_hints() }}
{% endset %}

{% call statement('create_Table') -%}
{{ createTable }}
{%- endcall %}

{% set dropTempTable %}
DROP TABLE {{tempTableName}}
{% endset %}

{% call statement('drop_temp_table') -%}
{{ dropTempTable }}
{%- endcall %}
{% endmacro %}

{% macro fabric__get_true_sql() %}
{{ return('1=1') }}
{% endmacro %}


{% macro fabric__build_snapshot_table(strategy, relation) %}

select *,
{{ strategy.scd_id }} as dbt_scd_id,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
from (
select * from {{ relation }}
) sbq

{% endmacro %}

{% macro fabric__snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) -%}

with snapshot_query as (

select * from {{ temp_snapshot_relation }}

),

snapshotted_data as (

select *,
{{ strategy.unique_key }} as dbt_unique_key

from {{ target_relation }}
where dbt_valid_to is null

),

insertions_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
{{ strategy.scd_id }} as dbt_scd_id

from snapshot_query
),

updates_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
{{ strategy.updated_at }} as dbt_valid_to

from snapshot_query
),

{%- if strategy.invalidate_hard_deletes %}

deletes_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key
from snapshot_query
),
{% endif %}

insertions as (

select
'insert' as dbt_change_type,
source_data.*

from insertions_source_data as source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
snapshotted_data.dbt_unique_key is not null
and (
{{ strategy.row_changed }}
)
)

),

updates as (

select
'update' as dbt_change_type,
source_data.*,
snapshotted_data.dbt_scd_id

from updates_source_data as source_data
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where (
{{ strategy.row_changed }}
)
)

{%- if strategy.invalidate_hard_deletes -%}
,

deletes as (

select
'delete' as dbt_change_type,
source_data.*,
{{ snapshot_get_time() }} as dbt_valid_from,
{{ snapshot_get_time() }} as dbt_updated_at,
{{ snapshot_get_time() }} as dbt_valid_to,
snapshotted_data.dbt_scd_id

from snapshotted_data
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where source_data.dbt_unique_key is null
)
{%- endif %}

select * from insertions
union all
select * from updates
{%- if strategy.invalidate_hard_deletes %}
union all
select * from deletes
{%- endif %}

{%- endmacro %}

{% macro build_snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %}
{% set temp_relation = make_temp_relation(target_relation) %}
{% set select = fabric__snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %}
{% call statement('build_snapshot_staging_relation') %}
{{ create_table_as(True, temp_relation, select) }}
{% endcall %}

{% do return(temp_relation) %}
{% endmacro %}
163 changes: 105 additions & 58 deletions dbt/include/fabric/macros/materializations/snapshots/snapshot.sql
Original file line number Diff line number Diff line change
@@ -1,58 +1,105 @@
{% macro fabric__post_snapshot(staging_relation) %}
-- Clean up the snapshot temp table
{% do drop_relation(staging_relation) %}
{% endmacro %}

{% macro fabric__create_columns(relation, columns) %}
{# default__ macro uses "add column"
TSQL preferes just "add"
#}

{% set columns %}
{% for column in columns %}
, CAST(NULL AS {{column.data_type}}) AS {{column_name}}
{% endfor %}
{% endset %}

{% set tempTableName %}
[{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}_{{ range(1300, 19000) | random }}]
{% endset %}

{% set tempTable %}
CREATE TABLE {{tempTableName}}
AS SELECT * {{columns}} FROM [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}] {{ information_schema_hints() }}
{% endset %}

{% call statement('create_temp_table') -%}
{{ tempTable }}
{%- endcall %}

{% set dropTable %}
DROP TABLE [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}]
{% endset %}

{% call statement('drop_table') -%}
{{ dropTable }}
{%- endcall %}

{% set createTable %}
CREATE TABLE {{ relation }}
AS SELECT * FROM {{tempTableName}} {{ information_schema_hints() }}
{% endset %}

{% call statement('create_Table') -%}
{{ createTable }}
{%- endcall %}

{% set dropTempTable %}
DROP TABLE {{tempTableName}}
{% endset %}

{% call statement('drop_temp_table') -%}
{{ dropTempTable }}
{%- endcall %}
{% endmacro %}

{% macro fabric__get_true_sql() %}
{{ return('1=1') }}
{% endmacro %}
{% materialization snapshot, adapter='fabric' %}

{%- set config = model['config'] -%}
{%- set target_table = model.get('alias', model.get('name')) -%}
{%- set strategy_name = config.get('strategy') -%}
{%- set unique_key = config.get('unique_key') %}
-- grab current tables grants config for comparision later on
{%- set grant_config = config.get('grants') -%}

{% set target_relation_exists, target_relation = get_or_create_relation(
database=model.database,
schema=model.schema,
identifier=target_table,
type='table') -%}

{%- if not target_relation.is_table -%}
{% do exceptions.relation_wrong_type(target_relation, 'table') %}
{%- endif -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}

{% set temp_snapshot_relation_exists, temp_snapshot_relation = get_or_create_relation(
database=model.database,
schema=model.schema,
identifier=target_table+"_snapshot_staging_temp_view",
type='view') -%}

-- Create a temporary view to manage if user SQl uses CTE
{% set temp_snapshot_relation_sql = model['compiled_code'].replace("'", "''") %}
{% call statement('create temp_snapshot_relation') %}
EXEC('DROP VIEW IF EXISTS {{ temp_snapshot_relation.include(database=False) }};');
EXEC('create view {{ temp_snapshot_relation.include(database=False) }} as {{ temp_snapshot_relation_sql }};');
{% endcall %}

{% if not target_relation_exists %}

{% set build_sql = build_snapshot_table(strategy, temp_snapshot_relation) %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}
{% set staging_table = build_snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %}
-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}
{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}
{% do create_columns(target_relation, missing_columns) %}
{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}
{% set quoted_source_columns = [] %}
{% for column in source_columns %}
{% do quoted_source_columns.append(adapter.quote(column.name)) %}
{% endfor %}

{% set final_sql = snapshot_merge_sql(
target = target_relation,
source = staging_table,
insert_cols = quoted_source_columns
)
%}

{% endif %}

{% call statement('main') %}
{{ final_sql }}
{% endcall %}

fabric__drop_relation_script(temp_snapshot_relation)

{% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{% if not target_relation_exists %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{% if staging_table is defined %}
{% do post_snapshot(staging_table) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
Loading