Skip to content

Commit

Permalink
🎉 Base Norrmalization: clean-up Redshift tmp_schemas after SAT (#14015
Browse files Browse the repository at this point in the history
)

Now after `base-normalization` SAT the Destination Redshift will be automatically cleaned up from test leftovers. Other destinations are not covered yet.
  • Loading branch information
bazarnov authored Jun 27, 2022
1 parent 4bf1ab1 commit 062b12f
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{% macro clean_tmp_tables(schemas) -%}
{{ adapter.dispatch('clean_tmp_tables')(schemas) }}
{%- endmacro %}

-- default
{% macro default__clean_tmp_tables(schemas) -%}
{% do exceptions.warn("\tINFO: CLEANING TEST LEFTOVERS IS NOT IMPLEMENTED FOR THIS DESTINATION. CONSIDER TO REMOVE TEST TABLES MANUALY.\n") %}
{%- endmacro %}

-- for redshift
{% macro redshift__clean_tmp_tables(schemas) %}
{%- for tmp_schema in schemas -%}
{% do log("\tDROP SCHEMA IF EXISTS " ~ tmp_schema, info=True) %}
{%- set drop_query -%}
drop schema if exists {{ tmp_schema }} cascade;
{%- endset -%}
{%- do run_query(drop_query) -%}
{%- endfor -%}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import json
import os
import pathlib
import random
import re
import socket
Expand All @@ -14,8 +15,9 @@
import threading
import time
from copy import copy
from typing import Any, Callable, Dict, List
from typing import Any, Callable, Dict, List, Union

import yaml
from normalization.destination_type import DestinationType
from normalization.transform_catalog.transform import read_yaml_config, write_yaml_config
from normalization.transform_config.transform import TransformConfig
Expand Down Expand Up @@ -414,8 +416,15 @@ def dbt_run(self, destination_type: DestinationType, test_root_dir: str, force_f
# Compile dbt models files into destination sql dialect, then run the transformation queries
assert self.run_check_dbt_command(normalization_image, "run", test_root_dir, force_full_refresh)

@staticmethod
def run_check_dbt_command(normalization_image: str, command: str, cwd: str, force_full_refresh: bool = False) -> bool:
def dbt_run_macro(self, destination_type: DestinationType, test_root_dir: str, macro: str, macro_args: str = None):
"""
Run the dbt CLI to perform transformations on the test raw data in the destination, using independent macro.
"""
normalization_image: str = self.get_normalization_image(destination_type)
# Compile dbt models files into destination sql dialect, then run the transformation queries
assert self.run_dbt_run_operation(normalization_image, test_root_dir, macro, macro_args)

def run_check_dbt_command(self, normalization_image: str, command: str, cwd: str, force_full_refresh: bool = False) -> bool:
"""
Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs
"""
Expand All @@ -424,7 +433,6 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc
else:
dbtAdditionalArgs = ["--event-buffer-size=10000"]

error_count = 0
commands = (
[
"docker",
Expand Down Expand Up @@ -458,6 +466,45 @@ def run_check_dbt_command(normalization_image: str, command: str, cwd: str, forc
command = f"{command} --full-refresh"
print("Executing: ", " ".join(commands))
print(f"Equivalent to: dbt {command} --profiles-dir={cwd} --project-dir={cwd}")
return self.run_check_dbt_subprocess(commands, cwd)

def run_dbt_run_operation(self, normalization_image: str, cwd: str, macro: str, macro_args: str = None) -> bool:
"""
Run dbt subprocess while checking and counting for "ERROR", "FAIL" or "WARNING" printed in its outputs
"""
args = ["--args", macro_args] if macro_args else []
commands = (
[
"docker",
"run",
"--rm",
"--init",
"-v",
f"{cwd}:/workspace",
"-v",
f"{cwd}/build:/build",
"-v",
f"{cwd}/logs:/logs",
"-v",
f"{cwd}/build/dbt_packages:/dbt",
"--network",
"host",
"--entrypoint",
"/usr/local/bin/dbt",
"-i",
normalization_image,
]
+ ["run-operation", macro]
+ args
+ ["--profiles-dir=/workspace", "--project-dir=/workspace"]
)

print("Executing: ", " ".join(commands))
print(f"Equivalent to: dbt run-operation {macro} --args {macro_args} --profiles-dir={cwd} --project-dir={cwd}")
return self.run_check_dbt_subprocess(commands, cwd)

def run_check_dbt_subprocess(self, commands: list, cwd: str):
error_count = 0
with open(os.path.join(cwd, "dbt_output.log"), "ab") as f:
process = subprocess.Popen(commands, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=os.environ)
for line in iter(lambda: process.stdout.readline(), b""):
Expand Down Expand Up @@ -552,3 +599,94 @@ def update_yaml_file(filename: str, callback: Callable):
updated, config = callback(config)
if updated:
write_yaml_config(config, filename)

def clean_tmp_tables(
self,
destination_type: Union[DestinationType, List[DestinationType]],
test_type: str,
tmp_folders: list = None,
git_versioned_tests: list = None,
):
"""
Cleans-up all temporary schemas created during the test session.
It parses the provided tmp_folders: List[str] or uses `git_versioned_tests` to find sources.yml files generated for the tests.
It gets target schemas created by the tests and removes them using custom scenario specified in
`dbt-project-template/macros/clean_tmp_tables.sql` macro.
REQUIREMENTS:
1) Idealy, the schemas should have unique names like: test_normalization_<some_random_string> to avoid conflicts.
2) The `clean_tmp_tables.sql` macro should have the specific macro for target destination to proceed.
INPUT ARGUMENTS:
:: destination_type : either single destination or list of destinations
:: test_type: either "ephemeral" or "normalization" should be supplied.
:: tmp_folders: should be supplied if test_type = "ephemeral", to get schemas from /build/normalization_test_output folders
:: git_versioned_tests: should be supplied if test_type = "normalization", to get schemas from integration_tests/normalization_test_output folders
EXAMPLE:
clean_up_args = {
"destination_type": [ DestinationType.REDSHIFT, DestinationType.POSTGRES, ... ]
"test_type": "normalization",
"git_versioned_tests": git_versioned_tests,
}
"""

path_to_sources: str = "/models/generated/sources.yml"
test_folders: dict = {}
source_files: dict = {}
schemas_to_remove: dict = {}

# collecting information about tmp_tables created for the test for each destination
for destination in destination_type:
test_folders[destination.value] = []
source_files[destination.value] = []
schemas_to_remove[destination.value] = []

# based on test_type select path to source files
if test_type == "ephemeral":
if not tmp_folders:
raise TypeError("`tmp_folders` arg is not provided.")
for folder in tmp_folders:
if destination.value in folder:
test_folders[destination.value].append(folder)
source_files[destination.value].append(f"{folder}{path_to_sources}")
elif test_type == "normalization":
if not git_versioned_tests:
raise TypeError("`git_versioned_tests` arg is not provided.")
base_path = f"{pathlib.Path().absolute()}/integration_tests/normalization_test_output"
for test in git_versioned_tests:
test_root_dir: str = f"{base_path}/{destination.value}/{test}"
test_folders[destination.value].append(test_root_dir)
source_files[destination.value].append(f"{test_root_dir}{path_to_sources}")
else:
raise TypeError(f"\n`test_type`: {test_type} is not a registered, use `ephemeral` or `normalization` instead.\n")

# parse source.yml files from test folders to get schemas and table names created for the tests
for file in source_files[destination.value]:
source_yml = {}
try:
with open(file, "r") as source_file:
source_yml = yaml.safe_load(source_file)
except FileNotFoundError:
print(f"\n{destination.value}: {file} doesn't exist, consider to remove any temp_tables and schemas manually!\n")
pass
test_sources: list = source_yml.get("sources", []) if source_yml else []

for source in test_sources:
target_schema: str = source.get("name")
if target_schema not in schemas_to_remove[destination.value]:
schemas_to_remove[destination.value].append(target_schema)
# adding _airbyte_* tmp schemas to be removed
schemas_to_remove[destination.value].append(f"_airbyte_{target_schema}")

# cleaning up tmp_tables generated by the tests
for destination in destination_type:
if not schemas_to_remove[destination.value]:
print(f"\n\t{destination.value.upper()} DESTINATION: SKIP CLEANING, NOTHING TO REMOVE.\n")
else:
print(f"\n\t{destination.value.upper()} DESTINATION: CLEANING LEFTOVERS...\n")
print(f"\t{schemas_to_remove[destination.value]}\n")
test_root_folder = test_folders[destination.value][0]
args = json.dumps({"schemas": schemas_to_remove[destination.value]})
self.dbt_check(destination, test_root_folder)
self.dbt_run_macro(destination, test_root_folder, "clean_tmp_tables", args)
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@
@pytest.fixture(scope="module", autouse=True)
def before_all_tests(request):
destinations_to_test = dbt_test_utils.get_test_targets()
# set clean-up args to clean target destination after the test
clean_up_args = {
"destination_type": [d for d in DestinationType if d.value in destinations_to_test],
"test_type": "ephemeral",
"tmp_folders": temporary_folders,
}
if DestinationType.POSTGRES.value not in destinations_to_test:
destinations_to_test.append(DestinationType.POSTGRES.value)
dbt_test_utils.set_target_schema("test_ephemeral")
dbt_test_utils.change_current_test_dir(request)
dbt_test_utils.setup_db(destinations_to_test)
os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"]
yield
dbt_test_utils.clean_tmp_tables(**clean_up_args)
dbt_test_utils.tear_down_db()
for folder in temporary_folders:
print(f"Deleting temporary test folder {folder}")
Expand Down Expand Up @@ -91,6 +98,9 @@ def run_test(destination_type: DestinationType, column_count: int, expected_exce
if destination_type.value == DestinationType.ORACLE.value:
# Oracle does not allow changing to random schema
dbt_test_utils.set_target_schema("test_normalization")
elif destination_type.value == DestinationType.REDSHIFT.value:
# set unique schema for Redshift test
dbt_test_utils.set_target_schema(dbt_test_utils.generate_random_string("test_ephemeral_"))
else:
dbt_test_utils.set_target_schema("test_ephemeral")
print("Testing ephemeral")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
@pytest.fixture(scope="module", autouse=True)
def before_all_tests(request):
destinations_to_test = dbt_test_utils.get_test_targets()
# set clean-up args to clean target destination after the test
clean_up_args = {
"destination_type": [d for d in DestinationType if d.value in destinations_to_test],
"test_type": "normalization",
"git_versioned_tests": git_versioned_tests,
}
for integration_type in [d.value for d in DestinationType]:
if integration_type in destinations_to_test:
test_root_dir = f"{pathlib.Path().absolute()}/normalization_test_output/{integration_type.lower()}"
Expand All @@ -39,11 +45,11 @@ def before_all_tests(request):
dbt_test_utils.setup_db(destinations_to_test)
os.environ["PATH"] = os.path.abspath("../.venv/bin/") + ":" + os.environ["PATH"]
yield
dbt_test_utils.clean_tmp_tables(**clean_up_args)
dbt_test_utils.tear_down_db()
for folder in temporary_folders:
print(f"Deleting temporary test folder {folder}")
shutil.rmtree(folder, ignore_errors=True)
# TODO delete target_schema in destination by copying dbt_project.yml and injecting a on-run-end hook to clean up


@pytest.fixture
Expand Down Expand Up @@ -78,6 +84,9 @@ def test_normalization(destination_type: DestinationType, test_resource_name: st
if destination_type.value == DestinationType.ORACLE.value:
# Oracle does not allow changing to random schema
dbt_test_utils.set_target_schema("test_normalization")
elif destination_type.value == DestinationType.REDSHIFT.value:
# set unique schema for Redshift test
dbt_test_utils.set_target_schema(dbt_test_utils.generate_random_string("test_normalization_"))
try:
run_test_normalization(destination_type, test_resource_name)
finally:
Expand Down Expand Up @@ -498,6 +507,11 @@ def to_lower_identifier(input: re.Match) -> str:

def test_redshift_normalization_migration(tmp_path, setup_test_path):
destination_type = DestinationType.REDSHIFT
clean_up_args = {
"destination_type": [destination_type],
"test_type": "ephemeral", # "ephemeral", because we parse /tmp folders
"tmp_folders": [str(tmp_path)],
}
if destination_type.value not in dbt_test_utils.get_test_targets():
pytest.skip(f"Destinations {destination_type} is not in NORMALIZATION_TEST_TARGET env variable")
base_dir = pathlib.Path(os.path.realpath(os.path.join(__file__, "../..")))
Expand Down Expand Up @@ -535,3 +549,5 @@ def test_redshift_normalization_migration(tmp_path, setup_test_path):
run_destination_process(destination_type, tmp_path, messages_file2, "destination_catalog.json", docker_tag="dev")
dbt_test_utils.dbt_run(destination_type, tmp_path, force_full_refresh=False)
dbt_test(destination_type, tmp_path)
# clean-up test tables created for this test
dbt_test_utils.clean_tmp_tables(**clean_up_args)

0 comments on commit 062b12f

Please sign in to comment.