Skip to content

Commit

Permalink
🎉 Remove hash when it is not necessary from normalization outputs (#3704
Browse files Browse the repository at this point in the history
)

* Refactor `generate_new_table_name` using a table name registry class instead

* update normalization docs

* Enable MyPy

* Regenerate output files

* Closes #2389

* Bumpversion normalization
  • Loading branch information
ChristopheDuong authored Jun 1, 2021
1 parent 1956fd3 commit bb4dcb1
Show file tree
Hide file tree
Showing 304 changed files with 2,307 additions and 1,207 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ RUN dbt deps
WORKDIR /airbyte
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.30
LABEL io.airbyte.version=0.1.31
LABEL io.airbyte.name=airbyte/normalization
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dbt files.
This class is testing the transform config functionality that converts a destination_config.json into the adequate profiles.yml file for dbt to use
see [related dbt docs on profiles.yml](https://docs.getdbt.com/reference/profiles.yml) for more context on what it actually is.

#### test_stream_processor.py and test_stream_processor_naming.py:
#### test_stream_processor.py and test_table_name_registry.py:

These unit tests functions check how each stream is converted to dbt models files.
For example, one big focus area is around how table names are chosen.
Expand Down
9 changes: 9 additions & 0 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,12 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs
}

integrationTest.dependsOn("customIntegrationTestPython")

// TODO fix and use https://github.com/airbytehq/airbyte/issues/3192 instead
task('mypyCheck', type: PythonTask) {
module = "mypy"
command = "normalization --config-file ${project.rootProject.file('tools/python/.mypy.ini').absolutePath}"

dependsOn 'blackFormat'
}
check.dependsOn mypyCheck
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def tear_down_postgres_db(self):

@staticmethod
def change_current_test_dir(request):
# This makes the test run whether it is executed from the tests folder
# (with pytest/gradle) or from the base-normalization folder (through pycharm)
# This makes the test run whether it is executed from the tests folder (with pytest/gradle)
# or from the base-normalization folder (through pycharm)
integration_tests_dir = os.path.join(request.fspath.dirname, "integration_tests")
if os.path.exists(integration_tests_dir):
os.chdir(integration_tests_dir)
Expand Down Expand Up @@ -196,25 +196,34 @@ def run_check_dbt_command(command: str, cwd: str) -> bool:
print(f"Equivalent to: dbt {command} --profiles-dir={cwd} --project-dir={cwd}")
with open(os.path.join(cwd, "dbt_output.log"), "ab") as f:
process = subprocess.Popen(commands, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=os.environ)
for line in iter(process.stdout.readline, b""):
for line in iter(lambda: process.stdout.readline(), b""):
f.write(line)
str_line = line.decode("utf-8")
sys.stdout.write(str_line)
# keywords to match lines as signaling errors
if "ERROR" in str_line or "FAIL" in str_line or "WARNING" in str_line:
# exception keywords in lines to ignore as errors (such as summary or expected warnings)
if not (
"Done." in str_line # DBT Summary
or "PASS=" in str_line # DBT Summary
or "Nothing to do." in str_line # When no schema/data tests are setup
or "Configuration paths exist in your dbt_project.yml" # When catalog does not generate a view or cte
):
is_exception = False
for except_clause in [
"Done.", # DBT Summary
"PASS=", # DBT Summary
"Nothing to do.", # When no schema/data tests are setup
"Configuration paths exist in your dbt_project.yml", # When no cte / view are generated
]:
if except_clause in str_line:
is_exception = True
break
if not is_exception:
# count lines signaling an error/failure/warning
error_count += 1
process.wait()
print(
f"{' '.join(commands)}\n\tterminated with return code {process.returncode} with {error_count} 'Error/Warning/Fail' mention(s)."
message = (
f"{' '.join(commands)}\n\tterminated with return code {process.returncode} "
f"with {error_count} 'Error/Warning/Fail' mention(s)."
)
print(message)
assert error_count == 0, message
assert process.returncode == 0, message
if error_count > 0:
return False
return process.returncode == 0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_669_DATA_ab1`
create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
Expand All @@ -9,7 +9,7 @@ select
_airbyte_partition_hashid,
json_extract_scalar(DATA, "$['currency']") as currency,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_64a_partition`
from `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition`
cross join unnest(DATA) as DATA
where DATA is not null
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_669_DATA_ab2`
create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
Expand All @@ -10,6 +10,6 @@ select
string
) as currency,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_669_DATA_ab1`
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA_ab1`
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA;

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_669_DATA_ab3`
create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
Expand All @@ -13,6 +13,6 @@ select
), '')) as
string
))) as _airbyte_DATA_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_669_DATA_ab2`
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA_ab2`
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA;

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_64a_partition_ab1`
create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_64a_partition_ab2`
create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
Expand All @@ -9,6 +9,6 @@ select
double_array_data,
DATA,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_64a_partition_ab1`
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_ab1`
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition;

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_64a_partition_ab3`
create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
Expand All @@ -15,6 +15,6 @@ select
), '')) as
string
))) as _airbyte_partition_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_64a_partition_ab2`
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_ab2`
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition;

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_e78_double_array_data_ab1`
create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab1`
OPTIONS()
as
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
Expand All @@ -9,7 +9,7 @@ select
_airbyte_partition_hashid,
json_extract_scalar(double_array_data, "$['id']") as id,
_airbyte_emitted_at
from `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_64a_partition`
from `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition`
cross join unnest(double_array_data) as double_array_data
where double_array_data is not null
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_e78_double_array_data_ab2`
create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab2`
OPTIONS()
as
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
Expand All @@ -10,6 +10,6 @@ select
string
) as id,
_airbyte_emitted_at
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_e78_double_array_data_ab1`
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab1`
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data;

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_e78_double_array_data_ab3`
create or replace view `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab3`
OPTIONS()
as
-- SQL model to build a hash column based on the values of this record
Expand All @@ -13,6 +13,6 @@ select
), '')) as
string
))) as _airbyte_double_array_data_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_e78_double_array_data_ab2`
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab2`
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data;

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create or replace table `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_64a_partition`
create or replace table `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition`


OPTIONS()
Expand All @@ -13,7 +13,7 @@ select
DATA,
_airbyte_emitted_at,
_airbyte_partition_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_64a_partition_ab3`
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_ab3`
-- partition at nested_stream_with_complex_columns_resulting_into_long_names/partition from `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names`
);

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create or replace table `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_669_DATA`
create or replace table `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA`


OPTIONS()
Expand All @@ -12,7 +12,7 @@ select
currency,
_airbyte_emitted_at,
_airbyte_DATA_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_669_DATA_ab3`
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA from `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_64a_partition`
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA_ab3`
-- DATA at nested_stream_with_complex_columns_resulting_into_long_names/partition/DATA from `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition`
);

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


create or replace table `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_e78_double_array_data`
create or replace table `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data`


OPTIONS()
Expand All @@ -12,7 +12,7 @@ select
id,
_airbyte_emitted_at,
_airbyte_double_array_data_hashid
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_e78_double_array_data_ab3`
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data from `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_64a_partition`
from `dataline-integration-testing`._airbyte_test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data_ab3`
-- double_array_data at nested_stream_with_complex_columns_resulting_into_long_names/partition/double_array_data from `dataline-integration-testing`.test_normalization.`nested_stream_with_complex_columns_resulting_into_long_names_partition`
);

Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
version: 2

models:
- name: exchange_rate_f0e
- name: exchange_rate
tests:
- dbt_utils.equality:
description: check_streams_are_equal
In this integration test, we are sending the same records to both streams
exchange_rate and dedup_exchange_rate.
The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror
the final table with append or overwrite mode from exchange_rate.
compare_model: ref('dedup_exchange_rate_scd_81d')
compare_model: ref('dedup_exchange_rate_scd')
compare_columns:
- id
- currency
Expand All @@ -33,7 +33,7 @@ models:
tests:
- not_null

- name: dedup_exchange_rate_81d
- name: dedup_exchange_rate
tests:
- dbt_utils.unique_combination_of_columns:
description: check_deduplication_by_primary_key
Expand All @@ -43,7 +43,7 @@ models:
- currency
- NZD

- name: nested_stream_with_complex_columns_resulting_into_long_names_64a_partition_44f
- name: nested_stream_with_complex_columns_resulting_into_long_names_partition
columns:
- name: DATA
tests:
Expand All @@ -52,12 +52,12 @@ models:
- name: double_array_data
tests:
- not_null
- name: nested_stream_with_complex_columns_resulting_into_long_names_partition_669_DATA_886
- name: nested_stream_with_complex_columns_resulting_into_long_names_partition_DATA
columns:
- name: currency
tests:
- not_null
- name: nested_stream_with_complex_columns_resulting_into_long_names_partition_e78_double_array_data_1b9
- name: nested_stream_with_complex_columns_resulting_into_long_names_partition_double_array_data
columns:
- name: id
tests:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{{ config(alias="dedup_exchange_rate_ab1", schema="_airbyte_test_normalization", tags=["top-level-intermediate"]) }}
{{ config(schema="_airbyte_test_normalization", tags=["top-level-intermediate"]) }}
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
select
{{ json_extract_scalar('_airbyte_data', ['id']) }} as id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{{ config(alias="dedup_exchange_rate_ab2", schema="_airbyte_test_normalization", tags=["top-level-intermediate"]) }}
{{ config(schema="_airbyte_test_normalization", tags=["top-level-intermediate"]) }}
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
select
cast(id as {{ dbt_utils.type_bigint() }}) as id,
Expand All @@ -9,6 +9,6 @@ select
cast(NZD as {{ dbt_utils.type_float() }}) as NZD,
cast(USD as {{ dbt_utils.type_float() }}) as USD,
_airbyte_emitted_at
from {{ ref('dedup_exchange_rate_ab1_281') }}
from {{ ref('dedup_exchange_rate_ab1') }}
-- dedup_exchange_rate

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{{ config(alias="dedup_exchange_rate_ab3", schema="_airbyte_test_normalization", tags=["top-level-intermediate"]) }}
{{ config(schema="_airbyte_test_normalization", tags=["top-level-intermediate"]) }}
-- SQL model to build a hash column based on the values of this record
select
*,
Expand All @@ -11,6 +11,6 @@ select
'NZD',
'USD',
]) }} as _airbyte_dedup_exchange_rate_hashid
from {{ ref('dedup_exchange_rate_ab2_281') }}
from {{ ref('dedup_exchange_rate_ab2') }}
-- dedup_exchange_rate

Loading

0 comments on commit bb4dcb1

Please sign in to comment.