Skip to content

Commit

Permalink
fix more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Dec 9, 2024
1 parent 413abf4 commit 3e16ee4
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 61 deletions.
24 changes: 22 additions & 2 deletions airbyte/destinations/_translate_dest_to_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from airbyte.caches.motherduck import MotherDuckCache
from airbyte.caches.postgres import PostgresCache
from airbyte.caches.snowflake import SnowflakeCache
from airbyte.exceptions import PyAirbyteSecretNotFoundError
from airbyte.secrets import get_secret
from airbyte.secrets.base import SecretString

Expand Down Expand Up @@ -54,7 +55,7 @@ def destination_to_cache(
)
if hasattr(destination_type, "value"):
destination_type = destination_type.value
if hasattr(destination_type, "_value_"):
elif hasattr(destination_type, "_value_"):
destination_type = destination_type._value_
else:
destination_type = str(destination_type)
Expand Down Expand Up @@ -142,12 +143,31 @@ def snowflake_destination_to_cache(
if isinstance(destination_configuration, dict):
destination_configuration = DestinationSnowflake(**destination_configuration)

snowflake_password: str | None = None
if (
destination_configuration.credentials
and hasattr(destination_configuration.credentials, "password")
and isinstance(destination_configuration.credentials.password, str) # type: ignore [attr-defined]
):
destination_password = str(destination_configuration.credentials.password) # type: ignore [attr-defined]
if "****" in destination_password:
try:
snowflake_password = get_secret(password_secret_name)
except ValueError as ex:
raise PyAirbyteSecretNotFoundError(
"Password is required for Snowflake cache, but it was not available."
) from ex
else:
snowflake_password = get_secret(destination_password)
else:
snowflake_password = get_secret(password_secret_name)

return SnowflakeCache(
account=destination_configuration.host.split(".snowflakecomputing")[0],
database=destination_configuration.database,
schema_name=destination_configuration.schema,
warehouse=destination_configuration.warehouse,
role=destination_configuration.role,
username=destination_configuration.username,
password=get_secret(password_secret_name),
password=snowflake_password,
)
63 changes: 59 additions & 4 deletions tests/integration_tests/cloud/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@

from __future__ import annotations

from enum import auto
import json
import os
import sys
from pathlib import Path
from typing import Any, Generator

import pytest
from airbyte._util.api_util import CLOUD_API_ROOT
from airbyte._util.temp_files import as_temp_files
from airbyte._util.venv_util import get_bin_dir
from airbyte.cloud import CloudWorkspace
from airbyte.destinations.base import Destination
Expand All @@ -22,7 +26,7 @@
)

from airbyte.sources.base import Source
from airbyte.sources.util import get_benchmark_source
from airbyte.sources.util import get_source

AIRBYTE_CLOUD_WORKSPACE_ID = "19d7a891-8e0e-40ac-8a8c-5faf8d11e47c"

Expand Down Expand Up @@ -97,9 +101,17 @@ def cloud_workspace(

@pytest.fixture
def deployable_dummy_source() -> Source:
"""A local PyAirbyte `Source` object."""
return get_benchmark_source(
install_if_missing=False,
"""A local PyAirbyte `Source` object.
For some reason `source-hardcoded-records` and `source-e2e-tests` are not working.
"""
return get_source(
"source-faker",
streams=["products"],
config={
"count": 100,
},
# install_if_missing=False,
)


Expand Down Expand Up @@ -148,3 +160,46 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
indirect=True,
scope="function",
)

@pytest.fixture(scope="session")
def with_bigquery_credentials_env_vars(
ci_secret_manager: GoogleGSMSecretManager,
) -> Generator[None, Any, None]:
"""This fixture sets up the BigQuery credentials file for the session.
This is needed because when retrieving config from the REST API, the credentials are
obfuscated.
"""
dest_bigquery_config = ci_secret_manager.get_secret(
secret_name="SECRET_DESTINATION-BIGQUERY_CREDENTIALS__CREDS"
).parse_json()

credentials_json = dest_bigquery_config["credentials_json"]
with as_temp_files(files_contents=[credentials_json]) as (credentials_path,):
os.environ["BIGQUERY_CREDENTIALS_PATH"] = credentials_path
os.environ["BIGQUERY_CREDENTIALS_JSON"] = json.dumps(credentials_json)

yield

return


@pytest.fixture(scope="session")
def snowflake_creds(ci_secret_manager: GoogleGSMSecretManager) -> dict:
return ci_secret_manager.get_secret(
"AIRBYTE_LIB_SNOWFLAKE_CREDS",
).parse_json()


@pytest.fixture(scope="session")
def with_snowflake_password_env_var(snowflake_creds: dict):
"""This fixture sets up Snowflake credentials for tests.
This is needed because when retrieving config from the REST API, the credentials are
obfuscated.
"""
os.environ["SNOWFLAKE_PASSWORD"] = snowflake_creds["password"]

yield

return
114 changes: 61 additions & 53 deletions tests/integration_tests/cloud/test_cloud_sql_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,59 +27,61 @@ def previous_job_run_id() -> int:
return 10136196


@pytest.mark.super_slow
def test_deploy_and_run_and_read(
cloud_workspace: cloud.CloudWorkspace,
deployable_dummy_destination: ab.Destination,
deployable_dummy_source: ab.Source,
) -> None:
"""Test reading from a cache."""

# Deploy source, destination, and connection:
cloud_source: CloudSource = cloud_workspace.deploy_source(
name="test-source",
source=deployable_dummy_source,
random_name_suffix=True,
)
cloud_destination = cloud_workspace.deploy_destination(
name="test-destination",
destination=deployable_dummy_destination,
random_name_suffix=True,
)
connection: cloud.CloudConnection = cloud_workspace.deploy_connection(
connection_name="test-connection-{text_util.generate_random_suffix()}",
source=cloud_source,
destination=cloud_destination,
# selected_streams=deployable_source.get_selected_streams(),
)

# Run sync and get result:
sync_result: SyncResult = connection.run_sync()

# TODO: Remove this second run after Destination bug is resolved:
# https://github.com/airbytehq/airbyte/issues/36875
sync_result: SyncResult = connection.run_sync()

# Check sync result:
assert sync_result.is_job_complete()
assert set(sync_result.stream_names) == set(["users", "products", "purchases"])

dataset: ab.CachedDataset = sync_result.get_dataset(stream_name="users")
assert dataset.stream_name == "users"
data_as_list = list(dataset)
assert len(data_as_list) == 100

# Cleanup
with suppress(Exception):
cloud_workspace.permanently_delete_connection(
connection=connection,
cascade_delete_source=True,
cascade_delete_destination=True,
)
with suppress(Exception):
cloud_workspace.permanently_delete_source(source=cloud_source)
with suppress(Exception):
cloud_workspace.permanently_delete_destination(destination=cloud_destination)
# This test is redundant with the other tests, and auto-retries in Cloud make the test untenable
# for us to have multiple tests running the sync operation for now.
# @pytest.mark.super_slow
# def test_deploy_and_run_and_read(
# cloud_workspace: cloud.CloudWorkspace,
# deployable_dummy_destination: ab.Destination,
# deployable_dummy_source: ab.Source,
# ) -> None:
# """Test reading from a cache."""

# # Deploy source, destination, and connection:
# cloud_source: CloudSource = cloud_workspace.deploy_source(
# name="test-source",
# source=deployable_dummy_source,
# random_name_suffix=True,
# )
# cloud_destination = cloud_workspace.deploy_destination(
# name="test-destination",
# destination=deployable_dummy_destination,
# random_name_suffix=True,
# )
# connection: cloud.CloudConnection = cloud_workspace.deploy_connection(
# connection_name="test-connection-{text_util.generate_random_suffix()}",
# source=cloud_source,
# destination=cloud_destination,
# selected_streams=deployable_source.get_selected_streams(),
# )

# # Run sync and get result:
# sync_result: SyncResult = connection.run_sync()

# # TODO: Remove this second run after Destination bug is resolved:
# # https://github.com/airbytehq/airbyte/issues/36875
# sync_result: SyncResult = connection.run_sync()

# # Check sync result:
# assert sync_result.is_job_complete()
# assert set(sync_result.stream_names) == set(["users", "products", "purchases"])

# dataset: ab.CachedDataset = sync_result.get_dataset(stream_name="users")
# assert dataset.stream_name == "users"
# data_as_list = list(dataset)
# assert len(data_as_list) == 100

# # Cleanup
# with suppress(Exception):
# cloud_workspace.permanently_delete_connection(
# connection=connection,
# cascade_delete_source=True,
# cascade_delete_destination=True,
# )
# with suppress(Exception):
# cloud_workspace.permanently_delete_source(source=cloud_source)
# with suppress(Exception):
# cloud_workspace.permanently_delete_destination(destination=cloud_destination)


@pytest.mark.parametrize(
Expand All @@ -100,6 +102,8 @@ def test_deploy_and_run_and_read(
def test_read_from_deployed_connection(
cloud_workspace: cloud.CloudWorkspace,
deployed_connection_id: str,
with_snowflake_password_env_var,
with_bigquery_credentials_env_vars,
) -> None:
"""Test reading from a cache."""
# Run sync and get result:
Expand Down Expand Up @@ -152,6 +156,8 @@ def test_translate_cloud_job_to_sql_cache(
cloud_workspace: cloud.CloudWorkspace,
deployed_connection_id: str,
previous_job_run_id: int,
with_bigquery_credentials_env_vars,
with_snowflake_password_env_var,
) -> None:
"""Test reading from a cache."""
# Run sync and get result:
Expand Down Expand Up @@ -189,6 +195,8 @@ def test_read_from_previous_job(
cloud_workspace: cloud.CloudWorkspace,
deployed_connection_id: str,
previous_job_run_id: int,
with_bigquery_credentials_env_vars,
with_snowflake_password_env_var,
) -> None:
"""Test reading from a cache."""
# Run sync and get result:
Expand Down
19 changes: 17 additions & 2 deletions tests/integration_tests/cloud/test_cloud_workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,24 @@ def test_deploy_source(
cloud_workspace.permanently_delete_source(cloud_source)


def test_deploy_dummy_source(
deployable_dummy_source: ab.Source,
cloud_workspace: CloudWorkspace,
) -> None:
"""Test deploying a source to a workspace."""
deployable_dummy_source.check()

cloud_source: CloudSource = cloud_workspace.deploy_source(
name="test-source",
source=deployable_dummy_source,
)
cloud_workspace.permanently_delete_source(cloud_source)


def test_deploy_connection(
cloud_workspace: CloudWorkspace,
deployable_dummy_source,
deployable_dummy_destination,
deployable_dummy_source: ab.Source,
deployable_dummy_destination: ab.Destination,
) -> None:
"""Test deploying a source and cache to a workspace as a new connection."""
cloud_source = cloud_workspace.deploy_source(
Expand All @@ -62,6 +76,7 @@ def test_deploy_connection(
connection_name="test-connection",
source=cloud_source,
destination=cloud_destination,
selected_streams=deployable_dummy_source.get_selected_streams(),
)
# assert set(connection.stream_names) == set(["users", "products", "purchases"])
# assert connection.table_prefix == "abc_deleteme_"
Expand Down

0 comments on commit 3e16ee4

Please sign in to comment.