Skip to content

Commit

Permalink
fixes (full code review)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Dec 10, 2024
1 parent c8a41b5 commit 452bc0e
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 17 deletions.
2 changes: 1 addition & 1 deletion airbyte/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def __len__(self) -> int:
def __bool__(self) -> bool:
"""Always True.
This is needed so that caches with zero streams are not falsely (None-like).
This is needed so that caches with zero streams are not falsey (None-like).
"""
return True

Expand Down
21 changes: 18 additions & 3 deletions airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ def __init__(
"""The ID of the destination."""

self._connection_info: ConnectionResponse | None = None
"""The connection info object. (Cached.)"""

self._cloud_source_object: CloudSource | None = None
"""The source object. (Cached.)"""

self._cloud_destination_object: CloudDestination | None = None
"""The destination object. (Cached.)"""

def _fetch_connection_info(self) -> ConnectionResponse:
"""Populate the connection with data from the API."""
Expand Down Expand Up @@ -73,10 +80,14 @@ def source_id(self) -> str:
@property
def source(self) -> CloudSource:
"""Get the source object."""
return CloudSource(
if self._cloud_source_object:
return self._cloud_source_object

self._cloud_source_object = CloudSource(
workspace=self.workspace,
connector_id=self.source_id,
)
return self._cloud_source_object

@property
def destination_id(self) -> str:
Expand All @@ -91,11 +102,15 @@ def destination_id(self) -> str:

@property
def destination(self) -> CloudDestination:
"""Get the source object."""
return CloudDestination(
"""Get the destination object."""
if self._cloud_destination_object:
return self._cloud_destination_object

self._cloud_destination_object = CloudDestination(
workspace=self.workspace,
connector_id=self.destination_id,
)
return self._cloud_destination_object

@property
def stream_names(self) -> list[str]:
Expand Down
4 changes: 2 additions & 2 deletions airbyte/destinations/_translate_dest_to_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def destination_to_cache(
) -> CacheBase:
"""Get the destination configuration from the cache."""
conversion_fn_map: dict[str, Callable[[Any], CacheBase]] = {
"bigquery": biqquery_destination_to_cache,
"bigquery": bigquery_destination_to_cache,
"duckdb": duckdb_destination_to_cache,
"motherduck": motherduck_destination_to_cache,
"postgres": postgres_destination_to_cache,
Expand Down Expand Up @@ -70,7 +70,7 @@ def destination_to_cache(
return conversion_fn(destination_configuration)


def biqquery_destination_to_cache(
def bigquery_destination_to_cache(
destination_configuration: DestinationBigquery | dict[str, Any],
) -> BigQueryCache:
"""Create a new BigQuery cache from the destination configuration.
Expand Down
4 changes: 2 additions & 2 deletions tests/integration_tests/cloud/test_cloud_api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from __future__ import annotations

from airbyte_api.models import SourceResponse, WorkspaceResponse
from airbyte_api.models import DestinationResponse, SourceResponse, WorkspaceResponse
from airbyte._util import api_util, text_util
from airbyte_api.models import DestinationDuckdb, SourceFaker

Expand Down Expand Up @@ -71,7 +71,7 @@ def test_list_destinations(
airbyte_cloud_client_id: SecretString,
airbyte_cloud_client_secret: SecretString,
) -> None:
result: list[SourceResponse] = api_util.list_sources(
result: list[DestinationResponse] = api_util.list_destinations(
workspace_id=workspace_id,
api_root=airbyte_cloud_api_root,
client_id=airbyte_cloud_client_id,
Expand Down
27 changes: 23 additions & 4 deletions tests/integration_tests/cloud/test_cloud_sql_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
import pandas as pd
import pytest
from airbyte import cloud
from airbyte.caches.base import CacheBase
from airbyte.caches.bigquery import BigQueryCache
from airbyte.caches.snowflake import SnowflakeCache
from airbyte.caches.postgres import PostgresCache
from airbyte.caches.duckdb import DuckDBCache
from airbyte.cloud.sync_results import SyncResult
from sqlalchemy.engine.base import Engine

Expand Down Expand Up @@ -136,15 +141,27 @@ def test_read_from_deployed_connection(


@pytest.mark.parametrize(
"deployed_connection_id",
"deployed_connection_id, cache_type",
[
pytest.param("c7b4d838-a612-495a-9d91-a14e477add51", id="Faker->Snowflake"),
pytest.param("0e1d6b32-b8e3-4b68-91a3-3a314599c782", id="Faker->BigQuery"),
pytest.param(
"", id="Faker->Postgres", marks=pytest.mark.skip(reason="Not yet supported")
"c7b4d838-a612-495a-9d91-a14e477add51",
SnowflakeCache,
id="Faker->Snowflake",
),
pytest.param(
"0e1d6b32-b8e3-4b68-91a3-3a314599c782",
BigQueryCache,
id="Faker->BigQuery",
),
pytest.param(
"",
PostgresCache,
id="Faker->Postgres",
marks=pytest.mark.skip(reason="Not yet supported"),
),
pytest.param(
"",
DuckDBCache,
id="Faker->MotherDuck",
marks=pytest.mark.skip(reason="Not yet supported"),
),
Expand All @@ -153,6 +170,7 @@ def test_read_from_deployed_connection(
def test_translate_cloud_job_to_sql_cache(
cloud_workspace: cloud.CloudWorkspace,
deployed_connection_id: str,
cache_type: type[CacheBase],
previous_job_run_id: int,
with_bigquery_credentials_env_vars,
with_snowflake_password_env_var,
Expand All @@ -170,6 +188,7 @@ def test_translate_cloud_job_to_sql_cache(
assert sync_result.is_job_complete()

cache = sync_result.get_sql_cache()
assert isinstance(cache, cache_type), f"Expected {cache_type}, got {type(cache)}"
sqlalchemy_url = cache.get_sql_alchemy_url()
engine: Engine = sync_result.get_sql_engine()

Expand Down
8 changes: 5 additions & 3 deletions tests/integration_tests/cloud/test_cloud_workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def test_deploy_connection(
deployable_dummy_destination: ab.Destination,
) -> None:
"""Test deploying a source and cache to a workspace as a new connection."""
stream_names = deployable_dummy_source.get_selected_streams()
cloud_source = cloud_workspace.deploy_source(
name="test-source",
source=deployable_dummy_source,
Expand All @@ -76,10 +77,11 @@ def test_deploy_connection(
connection_name="test-connection",
source=cloud_source,
destination=cloud_destination,
selected_streams=deployable_dummy_source.get_selected_streams(),
selected_streams=stream_names,
table_prefix="zzz_deleteme_",
)
# assert set(connection.stream_names) == set(["users", "products", "purchases"])
# assert connection.table_prefix == "abc_deleteme_"
assert set(connection.stream_names) == set(stream_names)
assert connection.table_prefix == "zzz_deleteme_"
cloud_workspace.permanently_delete_connection(
connection=connection,
cascade_delete_source=True,
Expand Down
3 changes: 1 addition & 2 deletions tests/integration_tests/test_bigquery_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ def test_bigquery_props(
assert new_bigquery_cache.dataset_name == new_bigquery_cache.schema_name, (
"Dataset name should be the same as schema name."
)
assert (
new_bigquery_cache.schema_name != "airbyte_raw"
assert new_bigquery_cache.schema_name != "airbyte_raw", (
"Schema name should not be the default value."
)

Expand Down

0 comments on commit 452bc0e

Please sign in to comment.