Skip to content

Commit

Permalink
Fix integ test for Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
moomindani committed Jan 10, 2025
1 parent b1fc499 commit 9942a2f
Showing 1 changed file with 119 additions and 91 deletions.
210 changes: 119 additions & 91 deletions tests/functional/adapter/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,86 +174,60 @@ class TestIncrementalGlue(BaseIncremental):
@pytest.fixture(scope="class")
def models(self):
incremental_sql = """
{{ config(
materialized="incremental",
incremental_strategy="append",
file_format="iceberg"
) }}
{% set source_relation = source('raw', 'seed') %}
with source_data as (
select * from {{ source_relation }}
{% if is_incremental() %}
where id > (
select max(id)
from {{ this }}
)
{% endif %}
)
select
id,
name,
some_date
from source_data
{{ config(materialized="incremental",incremental_strategy="append", file_format="iceberg") }}
select * from {{ source('raw', 'seed') }}
{% if is_incremental() %}
where id > (select max(id) from glue_catalog.{{ this }})
{% endif %}
""".strip()
return {"incremental.sql": incremental_sql, "schema.yml": schema_base_yml}

return {
"incremental.sql": incremental_sql,
"schema.yml": schema_base_yml
}

def _compare_data(self, project, relation1, relation2):
"""Helper method to compare data between two relations"""
sql = f"""
with relation1 as (
select id, name, some_date from {relation1}
),
relation2 as (
select * from {relation2}
),
diff as (
select * from (
select id, name, some_date from relation1
UNION ALL
select id, name, some_date from relation2
) t
group by id, name, some_date
having count(*) = 1
)
select count(*) as diff_count from diff
"""
result = project.run_sql(sql, fetch="one")
return result[0] if result else None

# test_incremental with refresh table
def test_incremental(self, project):
# Initial seed
# seed command
results = run_dbt(["seed"])
assert len(results) == 2

# Verify base table
# base table rowcount
relation = relation_from_name(project.adapter, "base")
project.run_sql(f"refresh table {relation}")
# run refresh table to disable the previous parquet file paths
result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one")
assert result[0] == 10

# Verify added table
# added table rowcount
relation = relation_from_name(project.adapter, "added")
project.run_sql(f"refresh table {relation}")
# run refresh table to disable the previous parquet file paths
result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one")
assert result[0] == 20

# Run model with base data
# run command
# the "seed_name" var changes the seed identifier in the schema file
results = run_dbt(["run", "--vars", "seed_name: base"])
assert len(results) == 1

# Compare data
# check relations equal
incremental_relation = relation_from_name(project.adapter, "incremental")
base_relation = relation_from_name(project.adapter, "base")
project.run_sql(f"refresh table {incremental_relation}")
check_relations_equal(project.adapter, ["base", "incremental"])

# change seed_name var
# the "seed_name" var changes the seed identifier in the schema file
results = run_dbt(["run", "--vars", "seed_name: added"])
assert len(results) == 1

# check relations equal
project.run_sql(f"refresh table {incremental_relation}")
check_relations_equal(project.adapter, ["added", "incremental"])

# get catalog from docs generate
catalog = run_dbt(["docs", "generate"])
assert len(catalog.nodes) == 3
assert len(catalog.sources) == 1

pass

# Use helper method to compare data
diff_count = self._compare_data(project, incremental_relation, base_relation)
assert diff_count == 0, "Data should match between base and incremental"

class TestIncrementalGlueWithCustomLocation(TestIncrementalGlue):
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -306,6 +280,7 @@ class TestValidateConnectionGlue(BaseValidateConnection):
class TestIcebergTimestamp(BaseSimpleMaterializations):
@pytest.fixture(scope="class")
def project_config_update(self):
"""Project level configuration"""
return {
"name": "iceberg_timestamp_test",
"models": {
Expand All @@ -315,67 +290,120 @@ def project_config_update(self):

@pytest.fixture(scope="class")
def models(self):
"""Model definitions for timestamp testing"""
# Model with timestamp enabled (default behavior)
timestamp_enabled_sql = """
{{ config(
materialized="table",
file_format="iceberg",
add_iceberg_timestamp=true
) }}
select * from {{ source('raw', 'seed') }}
"""

# Model with timestamp explicitly disabled
timestamp_disabled_sql = """
{{ config(
materialized="table",
file_format="iceberg",
add_iceberg_timestamp=false
) }}
select * from {{ source('raw', 'seed') }}
"""

# Schema definition with source configuration
schema_yml = """
version: 2
sources:
- name: raw
schema: "{{ target.schema }}"
database: "{{ target.schema }}"
tables:
- name: seed
identifier: "base"
"""

return {
"enabled_timestamp.sql": timestamp_enabled_sql,
"disabled_timestamp.sql": timestamp_disabled_sql,
"schema.yml": schema_base_yml,
"schema.yml": schema_yml,
}

def test_base(self, project):
"""Test basic table creation and model count"""
# Run initial seed
results = run_dbt(["seed"])
assert len(results) == 1

# Run models
results = run_dbt(["run"])
assert len(results) == 2 # Two models: enabled and disabled

# Get relations
relation_enabled = relation_from_name(project.adapter, "enabled_timestamp")
relation_disabled = relation_from_name(project.adapter, "disabled_timestamp")

# Refresh tables to ensure metadata is up to date
project.run_sql(f"refresh table {relation_enabled}")
project.run_sql(f"refresh table {relation_disabled}")

# Verify basic table functionality
for relation in [relation_enabled, relation_disabled]:
result = project.run_sql(f"select count(*) from {relation}", fetch="one")
assert result[0] > 0, f"Table {relation} should contain rows"

def test_iceberg_timestamp(self, project):
# Run initial seed and models
"""Test timestamp column behavior in Iceberg tables"""
# Run initial setup
results = run_dbt(["seed"])
assert len(results) == 1

results = run_dbt(["run"])
assert len(results) == 2

# Check enabled model using describe
# Get relations
relation_enabled = relation_from_name(project.adapter, "enabled_timestamp")
sql = f"describe {relation_enabled}"

try:
result = project.run_sql(sql, fetch="all")
enabled_columns = [row[0].lower() for row in result]
assert "update_iceberg_ts" in enabled_columns, \
"update_iceberg_ts column should exist"
except Exception as e:
print(f"Failed to describe table: {e}")
# Fallback to direct column check
sql = f"SELECT update_iceberg_ts FROM {relation_enabled} LIMIT 1"
result = project.run_sql(sql, fetch="one")
assert result is not None, "update_iceberg_ts column should exist"

# Check disabled model
relation_disabled = relation_from_name(project.adapter, "disabled_timestamp")
sql = f"describe {relation_disabled}"

# Refresh tables to ensure metadata is up to date
project.run_sql(f"refresh table {relation_enabled}")
project.run_sql(f"refresh table {relation_disabled}")

# Verify timestamp column exists in enabled table
try:
result = project.run_sql(f"select update_iceberg_ts from {relation_enabled} limit 1", fetch="one")
assert result is not None, "update_iceberg_ts should contain valid timestamps"
has_timestamp = True
except Exception:
has_timestamp = False
assert has_timestamp, "update_iceberg_ts column should exist in enabled table"

# Verify timestamp column does not exist in disabled table
try:
result = project.run_sql(sql, fetch="all")
disabled_columns = [row[0].lower() for row in result]
assert "update_iceberg_ts" not in disabled_columns, \
"update_iceberg_ts column should not exist"
except Exception as e:
print(f"Failed to describe table: {e}")
# Verify column does not exist
sql = f"SELECT update_iceberg_ts FROM {relation_disabled} LIMIT 1"
try:
project.run_sql(sql, fetch="one")
assert False, "update_iceberg_ts column should not exist"
except Exception:
pass # Expected error as column should not exist
project.run_sql(f"select update_iceberg_ts from {relation_disabled} limit 1", fetch="one")
has_timestamp = True
except Exception:
has_timestamp = False
assert not has_timestamp, "update_iceberg_ts column should not exist in disabled table"

# Verify data consistency
base_relation = relation_from_name(project.adapter, "base")
enabled_base_sql = f"""
select count(*) from (
select id, name, some_date from {relation_enabled}
except
select * from {base_relation}
)
"""
disabled_base_sql = f"""
select count(*) from (
select * from {relation_disabled}
except
select * from {base_relation}
)
"""
enabled_diff = project.run_sql(enabled_base_sql, fetch="one")[0]
disabled_diff = project.run_sql(disabled_base_sql, fetch="one")[0]

assert enabled_diff == 0, "Data in enabled model should match base (excluding timestamp)"
assert disabled_diff == 0, "Data in disabled model should match base"

0 comments on commit 9942a2f

Please sign in to comment.