Skip to content

Commit

Permalink
update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Dec 9, 2024
1 parent e5b6ec4 commit 413abf4
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 10 deletions.
18 changes: 14 additions & 4 deletions airbyte/destinations/_translate_dest_to_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,18 @@ def destination_to_cache(
destination_configuration.get("DESTINATION_TYPE")
or destination_configuration["destinationType"]
)
if hasattr(destination_configuration, "value"):
if hasattr(destination_type, "value"):
destination_type = destination_type.value
if hasattr(destination_type, "_value_"):
destination_type = destination_type._value_
else:
destination_type = str(destination_type)
except KeyError as ex:
raise ValueError(
f"Missing 'destinationType' in keys {list(destination_configuration.keys())}."
) from ex
else:
destination_type = str(destination_configuration.DESTINATION_TYPE)
destination_type = destination_configuration.DESTINATION_TYPE.value

conversion_fn = conversion_fn_map[destination_type]
return conversion_fn(destination_configuration)
Expand All @@ -70,7 +72,11 @@ def destination_to_cache(
def biqquery_destination_to_cache(
destination_configuration: DestinationBigquery | dict[str, Any],
) -> BigQueryCache:
"""Create a new BigQuery cache from the destination configuration."""
"""Create a new BigQuery cache from the destination configuration.
We may have to inject credentials, because they are obfuscated when config
is returned from the REST API.
"""
credentials_path = get_secret("BIGQUERY_CREDENTIALS_PATH")
if isinstance(destination_configuration, dict):
destination_configuration = DestinationBigquery(**destination_configuration)
Expand Down Expand Up @@ -128,7 +134,11 @@ def snowflake_destination_to_cache(
destination_configuration: DestinationSnowflake | dict[str, Any],
password_secret_name: str = SNOWFLAKE_PASSWORD_SECRET_NAME,
) -> SnowflakeCache:
"""Create a new Snowflake cache from the destination configuration."""
"""Create a new Snowflake cache from the destination configuration.
We may have to inject credentials, because they are obfuscated when config
is returned from the REST API.
"""
if isinstance(destination_configuration, dict):
destination_configuration = DestinationSnowflake(**destination_configuration)

Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ select = [
"TID", # flake8-tidy-imports
"TRY", # tryceratops
"TRY002", # Disallow raising vanilla Exception. Create or use a custom exception instead.
"TRY003", # Disallow vanilla string passing. Prefer kwargs to the exception constructur.
"UP", # pyupgrade
"W", # pycodestyle (warnings)
"YTT", # flake8-2020
Expand Down Expand Up @@ -202,7 +201,8 @@ ignore = [
"FIX002", # Allow "TODO:" until release (then switch to requiring links via TDO003)
"PLW0603", # Using the global statement to update _cache is discouraged
"PLW0108", # Lambda may be unnecessary; consider inlining inner function
# "TD003", # Require links for TODOs (now enabled)
"TRY003", # Allow exceptions to receive strings in constructors.
# "TD003", # Require links for TODOs (now enabled)
]
fixable = ["ALL"]
unfixable = [
Expand Down
37 changes: 37 additions & 0 deletions tests/integration_tests/cloud/test_cloud_sql_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,43 @@ def test_read_from_deployed_connection(
assert pandas_df[col].notnull().all()


@pytest.mark.parametrize(
"deployed_connection_id",
[
pytest.param("c7b4d838-a612-495a-9d91-a14e477add51", id="Faker->Snowflake"),
pytest.param("0e1d6b32-b8e3-4b68-91a3-3a314599c782", id="Faker->BigQuery"),
pytest.param(
"", id="Faker->Postgres", marks=pytest.mark.skip(reason="Not yet supported")
),
pytest.param(
"",
id="Faker->MotherDuck",
marks=pytest.mark.skip(reason="Not yet supported"),
),
],
)
def test_translate_cloud_job_to_sql_cache(
cloud_workspace: cloud.CloudWorkspace,
deployed_connection_id: str,
previous_job_run_id: int,
) -> None:
"""Test reading from a cache."""
# Run sync and get result:
sync_result: SyncResult | None = cloud_workspace.get_connection(
connection_id=deployed_connection_id
).get_sync_result(
job_id=previous_job_run_id,
)
assert sync_result, f"Failed to get sync result for job {previous_job_run_id}"

# Test sync result:
assert sync_result.is_job_complete()

cache = sync_result.get_sql_cache()
sqlalchemy_url = cache.get_sql_alchemy_url()
engine: Engine = sync_result.get_sql_engine()


@pytest.mark.parametrize(
"deployed_connection_id",
[
Expand Down
7 changes: 3 additions & 4 deletions tests/integration_tests/test_source_faker_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from __future__ import annotations

import os
import shutil
import sys
import tempfile
import warnings
Expand Down Expand Up @@ -133,7 +132,7 @@ def test_faker_pks(
@pytest.mark.slow
def test_replace_strategy(
source_faker_seed_a: ab.Source,
all_cache_types: CacheBase,
all_cache_types: list[CacheBase],
) -> None:
"""Test that the append strategy works as expected."""
for (
Expand All @@ -150,7 +149,7 @@ def test_replace_strategy(
@pytest.mark.slow
def test_append_strategy(
source_faker_seed_a: ab.Source,
all_cache_types: CacheBase,
all_cache_types: list[CacheBase],
) -> None:
"""Test that the append strategy works as expected."""
for (
Expand All @@ -173,7 +172,7 @@ def test_merge_strategy(
strategy: str,
source_faker_seed_a: ab.Source,
source_faker_seed_b: ab.Source,
all_cache_types: CacheBase,
all_cache_types: list[CacheBase],
) -> None:
"""Test that the merge strategy works as expected.
Expand Down

0 comments on commit 413abf4

Please sign in to comment.