Skip to content

Commit

Permalink
Clone Related changes
Browse files Browse the repository at this point in the history
  • Loading branch information
prdpsvs committed Nov 16, 2023
1 parent 89d59ca commit 004ab7a
Show file tree
Hide file tree
Showing 4 changed files with 452 additions and 18 deletions.
8 changes: 4 additions & 4 deletions dbt/include/fabric/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
type="view",
path={"schema": reference[0], "identifier": reference[1]})) }}
{% endfor %}
{% elif relation.type == 'table'%}
{%- else -%}
{{ exceptions.raise_not_implemented('Invalid relation being dropped: ' ~ relation) }}
{% endif %}
{% elif relation.type == 'table'%}
{%- else -%}
{{ exceptions.raise_not_implemented('Invalid relation being dropped: ' ~ relation) }}
{% endif %}

{{ use_database_hint() }}
EXEC('DROP {{ relation.type }} IF EXISTS {{ relation.include(database=False) }};');
Expand Down
72 changes: 72 additions & 0 deletions dbt/include/fabric/macros/materializations/models/table/clone.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
{% macro fabric__can_clone_table() %}
{{ return(True) }}
{% endmacro %}

{% macro fabric__create_or_replace_clone(this_relation, defer_relation) %}
CREATE TABLE {{this_relation}}
AS CLONE OF {{defer_relation}}
{% endmacro %}

{%- materialization clone, adapter='fabric' -%}

{%- set relations = {'relations': []} -%}

{%- if not defer_relation -%}
-- nothing to do
{{ log("No relation found in state manifest for " ~ model.unique_id, info=True) }}
{{ return(relations) }}
{%- endif -%}

{%- set existing_relation = load_cached_relation(this) -%}
{{ log("existing relation is "~existing_relation, info=True) }}
{{ log("defer relation is "~defer_relation, info=True) }}

{%- if existing_relation and not flags.FULL_REFRESH -%}
-- noop!
{{ log("Relation " ~ existing_relation ~ " already exists", info=True) }}
{{ return(relations) }}
{%- endif -%}

{%- set other_existing_relation = load_cached_relation(defer_relation) -%}
{{ log("other existing relation is "~other_existing_relation, info=True) }}
-- If this is a database that can do zero-copy cloning of tables, and the other relation is a table, then this will be a table
-- Otherwise, this will be a view

{% set can_clone_table = can_clone_table() %}

{%- if other_existing_relation and other_existing_relation.type == 'table' and can_clone_table -%}

{%- set target_relation = this.incorporate(type='table') -%}
{% if existing_relation is not none and not existing_relation.is_table %}
{{ log("Dropping relation " ~ existing_relation ~ " because it is of type " ~ existing_relation.type) }}
{{ fabric__drop_relation_script(existing_relation) }}
{% endif %}

-- as a general rule, data platforms that can clone tables can also do atomic 'create or replace'
{% call statement('main') %}
{{ create_or_replace_clone(target_relation, defer_relation) }}
{% endcall %}

{# {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do persist_docs(target_relation, model) %} #}

{{ return({'relations': [target_relation]}) }}

{%- else -%}

{%- set target_relation = this.incorporate(type='view') -%}

-- reuse the view materialization
-- TODO: support actual dispatch for materialization macros
-- Tracking ticket: https://github.com/dbt-labs/dbt-core/issues/7799
{% set search_name = "materialization_view_" ~ adapter.type() %}
{% if not search_name in context %}
{% set search_name = "materialization_view_default" %}
{% endif %}
{% set materialization_macro = context[search_name] %}
{% set relations = materialization_macro() %}
{{ return(relations) }}
{% endif %}

{%- endmaterialization -%}
236 changes: 236 additions & 0 deletions tests/functional/adapter/test_dbt_clone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
import os
import shutil
from collections import Counter
from copy import deepcopy

import pytest
from dbt.exceptions import DbtRuntimeError
from dbt.tests.adapter.dbt_clone.fixtures import (
custom_can_clone_tables_false_macros_sql,
ephemeral_model_sql,
exposures_yml,
get_schema_name_sql,
infinite_macros_sql,
macros_sql,
schema_yml,
seed_csv,
snapshot_sql,
table_model_sql,
view_model_sql,
)
from dbt.tests.util import run_dbt


class BaseClone:
@pytest.fixture(scope="class")
def models(self):
return {
"table_model.sql": table_model_sql,
"view_model.sql": view_model_sql,
"ephemeral_model.sql": ephemeral_model_sql,
"schema.yml": schema_yml,
"exposures.yml": exposures_yml,
}

@pytest.fixture(scope="class")
def macros(self):
return {
"macros.sql": macros_sql,
"infinite_macros.sql": infinite_macros_sql,
"get_schema_name.sql": get_schema_name_sql,
}

@pytest.fixture(scope="class")
def seeds(self):
return {
"seed.csv": seed_csv,
}

@pytest.fixture(scope="class")
def snapshots(self):
return {
"snapshot.sql": snapshot_sql,
}

@pytest.fixture(scope="class")
def other_schema(self, unique_schema):
return unique_schema + "_other"

@property
def project_config_update(self):
return {
"seeds": {
"test": {
"quote_columns": False,
}
}
}

@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target, unique_schema, other_schema):
outputs = {"default": dbt_profile_target, "otherschema": deepcopy(dbt_profile_target)}
outputs["default"]["schema"] = unique_schema
outputs["otherschema"]["schema"] = other_schema
return {"test": {"outputs": outputs, "target": "default"}}

def copy_state(self, project_root):
state_path = os.path.join(project_root, "state")
if not os.path.exists(state_path):
os.makedirs(state_path)
shutil.copyfile(
f"{project_root}/target/manifest.json", f"{project_root}/state/manifest.json"
)

def run_and_save_state(self, project_root, with_snapshot=False):
results = run_dbt(["seed"])
assert len(results) == 1
assert not any(r.node.deferred for r in results)
results = run_dbt(["run"])
assert len(results) == 2
assert not any(r.node.deferred for r in results)
results = run_dbt(["test"])
assert len(results) == 2

if with_snapshot:
results = run_dbt(["snapshot"])
assert len(results) == 1
assert not any(r.node.deferred for r in results)

# copy files
self.copy_state(project_root)


# -- Below we define base classes for tests you import the one based on if your adapter uses dbt clone or not --
class BaseClonePossible(BaseClone):
def test_can_clone_true(self, project, unique_schema, other_schema):
project.create_test_schema(other_schema)
self.run_and_save_state(project.project_root, with_snapshot=True)

clone_args = [
"clone",
"--state",
"state",
"--target",
"otherschema",
]

results = run_dbt(clone_args)
assert len(results) == 4

schema_relations = project.adapter.list_relations(
database=project.database, schema=other_schema
)
types = [r.type for r in schema_relations]
count_types = Counter(types)
assert count_types == Counter({"table": 3, "view": 1})

# objects already exist, so this is a no-op
results = run_dbt(clone_args)
assert len(results) == 4
assert all("no-op" in r.message.lower() for r in results)

# recreate all objects
results = run_dbt([*clone_args, "--full-refresh"])
assert len(results) == 4

# select only models this time
results = run_dbt([*clone_args, "--resource-type", "model"])
assert len(results) == 2
assert all("no-op" in r.message.lower() for r in results)

def test_clone_no_state(self, project, unique_schema, other_schema):
project.create_test_schema(other_schema)
self.run_and_save_state(project.project_root, with_snapshot=True)

clone_args = [
"clone",
"--target",
"otherschema",
]

with pytest.raises(
DbtRuntimeError,
match="--state or --defer-state are required for deferral, but neither was provided",
):
run_dbt(clone_args)


class BaseCloneNotPossible(BaseClone):
@pytest.fixture(scope="class")
def macros(self):
return {
"macros.sql": macros_sql,
"my_can_clone_tables.sql": custom_can_clone_tables_false_macros_sql,
"infinite_macros.sql": infinite_macros_sql,
"get_schema_name.sql": get_schema_name_sql,
}

def test_can_clone_false(self, project, unique_schema, other_schema):
project.create_test_schema(other_schema)
self.run_and_save_state(project.project_root, with_snapshot=True)

clone_args = [
"clone",
"--state",
"state",
"--target",
"otherschema",
]

results = run_dbt(clone_args)
assert len(results) == 4

schema_relations = project.adapter.list_relations(
database=project.database, schema=other_schema
)
assert all(r.type == "view" for r in schema_relations)

# objects already exist, so this is a no-op
results = run_dbt(clone_args)
assert len(results) == 4
assert all("no-op" in r.message.lower() for r in results)

# recreate all objects
results = run_dbt([*clone_args, "--full-refresh"])
assert len(results) == 4

# select only models this time
results = run_dbt([*clone_args, "--resource-type", "model"])
assert len(results) == 2
assert all("no-op" in r.message.lower() for r in results)


class TestPostgresCloneNotPossible(BaseCloneNotPossible):
@pytest.fixture(autouse=True)
def clean_up(self, project):
yield
with project.adapter.connection_named("__test"):
relation = project.adapter.Relation.create(
database=project.database, schema=f"{project.test_schema}_seeds"
)
project.adapter.drop_schema(relation)

relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
project.adapter.drop_schema(relation)

pass


class TestPostgresClonePossible(BaseClonePossible):
@pytest.fixture(autouse=True)
def clean_up(self, project):
yield
with project.adapter.connection_named("__test"):
relation = project.adapter.Relation.create(
database=project.database, schema=f"{project.test_schema}_seeds"
)
project.adapter.drop_schema(relation)

relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
project.adapter.drop_schema(relation)

pass
Loading

0 comments on commit 004ab7a

Please sign in to comment.