Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add Cloud deploy functionality out of experimental status #419

Merged
merged 33 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
9b40875
clean up cloud functionality
aaronsteers Oct 15, 2024
4777d24
fix docs generation
aaronsteers Oct 15, 2024
6799b29
undo test skip for Cloud integration tests
aaronsteers Dec 6, 2024
1d4f53e
Merge remote-tracking branch 'origin/main' into aj/feat/cloud-deploy-…
aaronsteers Dec 6, 2024
1664110
Merge remote-tracking branch 'origin/main' into aj/feat/cloud-deploy-…
aaronsteers Dec 7, 2024
e8e948d
some name fixes
aaronsteers Dec 7, 2024
b435550
misc updates (not running tests yet)
aaronsteers Dec 7, 2024
354548e
more fixes
aaronsteers Dec 7, 2024
fe0ce9c
add: _paired_destination_name
aaronsteers Dec 7, 2024
be9225d
rename to 'cache_type_to_destination_type'
aaronsteers Dec 7, 2024
d8b40d1
fix more tests
aaronsteers Dec 7, 2024
7c9a6ac
multiple fixes
aaronsteers Dec 9, 2024
d58bf93
improve tests
aaronsteers Dec 9, 2024
e5b6ec4
xfail postgres merge support check
aaronsteers Dec 9, 2024
413abf4
update tests
aaronsteers Dec 9, 2024
3e16ee4
fix more tests
aaronsteers Dec 9, 2024
9d52230
improve tests
aaronsteers Dec 9, 2024
90bf1a2
lint fixes
aaronsteers Dec 9, 2024
2ab2e68
lint fixes
aaronsteers Dec 9, 2024
8a1092e
ignore warnings
aaronsteers Dec 9, 2024
c8a41b5
chore: convert if/then to lookup mapping
aaronsteers Dec 10, 2024
452bc0e
fixes (full code review)
aaronsteers Dec 10, 2024
13a4656
apply fixes and suggestions
aaronsteers Dec 10, 2024
e4b010f
delete redundant test
aaronsteers Dec 10, 2024
b8d63f0
fixes and apply suggestions
aaronsteers Dec 10, 2024
349df46
apply suggestion (class var)
aaronsteers Dec 10, 2024
6c3127e
improve exception handling
aaronsteers Dec 10, 2024
6325a53
Merge remote-tracking branch 'origin/main' into aj/feat/cloud-deploy-…
aaronsteers Dec 10, 2024
1bdcab1
fix list_destinations
aaronsteers Dec 10, 2024
ec5398b
fix return type
aaronsteers Dec 10, 2024
e4397d4
import types from api package
aaronsteers Dec 10, 2024
34f925e
Auto-fix lint and format issues
Dec 10, 2024
93fb13c
Auto-fix lint issues (unsafe)
Dec 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
347 changes: 271 additions & 76 deletions airbyte/_util/api_util.py

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions airbyte/_util/text_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Utility functions for working with text."""

from __future__ import annotations

import ulid


def generate_ulid() -> str:
"""Generate a new ULID."""
return str(ulid.ULID())


def generate_random_suffix() -> str:
"""Generate a random suffix for use in temporary names.

By default, this function generates a ULID and returns a 9-character string
which will be monotonically sortable. It is not guaranteed to be unique but
is sufficient for small-scale and medium-scale use cases.
"""
ulid_str = generate_ulid().lower()
return ulid_str[:6] + ulid_str[-3:]
35 changes: 27 additions & 8 deletions airbyte/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from __future__ import annotations

from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, final
from typing import IO, TYPE_CHECKING, Any, ClassVar, final

import pandas as pd
import pyarrow as pa
Expand All @@ -30,7 +30,6 @@

from airbyte._message_iterators import AirbyteMessageIterator
from airbyte.caches._state_backend_base import StateBackendBase
from airbyte.datasets._base import DatasetBase
from airbyte.progress import ProgressTracker
from airbyte.shared.sql_processor import SqlProcessorBase
from airbyte.shared.state_providers import StateProviderBase
Expand All @@ -57,16 +56,23 @@ class CacheBase(SqlConfig, AirbyteWriterInterface):

_name: str = PrivateAttr()

_deployed_api_root: str | None = PrivateAttr(default=None)
_deployed_workspace_id: str | None = PrivateAttr(default=None)
_deployed_destination_id: str | None = PrivateAttr(default=None)

_sql_processor_class: type[SqlProcessorBase] = PrivateAttr()
_sql_processor_class: ClassVar[type[SqlProcessorBase]]
_read_processor: SqlProcessorBase = PrivateAttr()

_catalog_backend: CatalogBackendBase = PrivateAttr()
_state_backend: StateBackendBase = PrivateAttr()

paired_destination_name: ClassVar[str | None] = None
paired_destination_config_class: ClassVar[type | None] = None

@property
def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type
"""Return a dictionary of destination configuration values."""
raise NotImplementedError(
f"The type '{type(self).__name__}' does not define an equivalent destination "
"configuration."
)

def __init__(self, **data: Any) -> None: # noqa: ANN401
"""Initialize the cache and backends."""
super().__init__(**data)
Expand Down Expand Up @@ -231,6 +237,19 @@ def streams(self) -> dict[str, CachedDataset]:

return result

@final
def __len__(self) -> int:
"""Gets the number of streams."""
return len(self._catalog_backend.stream_names)

@final
def __bool__(self) -> bool:
"""Always True.

This is needed so that caches with zero streams are not falsely (None-like).
"""
return True

def get_state_provider(
self,
source_name: str,
Expand Down Expand Up @@ -274,7 +293,7 @@ def register_source(
incoming_stream_names=stream_names,
)

def __getitem__(self, stream: str) -> DatasetBase:
def __getitem__(self, stream: str) -> CachedDataset:
"""Return a dataset by stream name."""
return self.streams[stream]

Expand Down
21 changes: 18 additions & 3 deletions airbyte/caches/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,36 @@

from __future__ import annotations

from typing import NoReturn
from typing import TYPE_CHECKING, ClassVar, NoReturn

from pydantic import PrivateAttr
from airbyte_api.models import DestinationBigquery

from airbyte._processors.sql.bigquery import BigQueryConfig, BigQuerySqlProcessor
from airbyte.caches.base import (
CacheBase,
)
from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE
from airbyte.destinations._translate_cache_to_dest import (
bigquery_cache_to_destination_configuration,
)


if TYPE_CHECKING:
from airbyte.shared.sql_processor import SqlProcessorBase


class BigQueryCache(BigQueryConfig, CacheBase):
"""The BigQuery cache implementation."""

_sql_processor_class: type[BigQuerySqlProcessor] = PrivateAttr(default=BigQuerySqlProcessor)
_sql_processor_class: ClassVar[type[SqlProcessorBase]] = BigQuerySqlProcessor

paired_destination_name: ClassVar[str | None] = "destination-bigquery"
paired_destination_config_class: ClassVar[type | None] = DestinationBigquery

@property
def paired_destination_config(self) -> DestinationBigquery:
"""Return a dictionary of destination configuration values."""
return bigquery_cache_to_destination_configuration(cache=self)

def get_arrow_dataset(
self,
Expand Down
18 changes: 16 additions & 2 deletions airbyte/caches/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@
from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, ClassVar

from airbyte_api.models import DestinationDuckdb
from duckdb_engine import DuckDBEngineWarning
from pydantic import PrivateAttr

from airbyte._processors.sql.duckdb import DuckDBConfig, DuckDBSqlProcessor
from airbyte.caches.base import CacheBase
from airbyte.destinations._translate_cache_to_dest import duckdb_cache_to_destination_configuration


if TYPE_CHECKING:
from airbyte.shared.sql_processor import SqlProcessorBase


# Suppress warnings from DuckDB about reflection on indices.
Expand All @@ -37,7 +43,15 @@
class DuckDBCache(DuckDBConfig, CacheBase):
"""A DuckDB cache."""

_sql_processor_class: type[DuckDBSqlProcessor] = PrivateAttr(default=DuckDBSqlProcessor)
_sql_processor_class: ClassVar[type[SqlProcessorBase]] = DuckDBSqlProcessor

paired_destination_name: ClassVar[str | None] = "destination-duckdb"
paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb

@property
def paired_destination_config(self) -> DestinationDuckdb:
"""Return a dictionary of destination configuration values."""
return duckdb_cache_to_destination_configuration(cache=self)


# Expose the Cache class and also the Config class.
Expand Down
22 changes: 20 additions & 2 deletions airbyte/caches/motherduck.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,33 @@
from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, ClassVar

from airbyte_api.models import DestinationDuckdb
from duckdb_engine import DuckDBEngineWarning
from overrides import overrides
from pydantic import Field, PrivateAttr
from pydantic import Field

from airbyte._processors.sql.duckdb import DuckDBConfig
from airbyte._processors.sql.motherduck import MotherDuckSqlProcessor
from airbyte.caches.duckdb import DuckDBCache
from airbyte.destinations._translate_cache_to_dest import (
motherduck_cache_to_destination_configuration,
)
from airbyte.secrets import SecretString


if TYPE_CHECKING:
from airbyte.shared.sql_processor import SqlProcessorBase


class MotherDuckConfig(DuckDBConfig):
"""Configuration for the MotherDuck cache."""

database: str = Field()
api_key: SecretString = Field()
db_path: str = Field(default="md:")
_paired_destination_name: str = "destination-motherduck"

@overrides
def get_sql_alchemy_url(self) -> SecretString:
Expand Down Expand Up @@ -61,7 +71,15 @@ def get_database_name(self) -> str:
class MotherDuckCache(MotherDuckConfig, DuckDBCache):
"""Cache that uses MotherDuck for external persistent storage."""

_sql_processor_class: type[MotherDuckSqlProcessor] = PrivateAttr(default=MotherDuckSqlProcessor)
_sql_processor_class: ClassVar[type[SqlProcessorBase]] = MotherDuckSqlProcessor

paired_destination_name: ClassVar[str | None] = "destination-bigquery"
paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb

@property
def paired_destination_config(self) -> DestinationDuckdb:
"""Return a dictionary of destination configuration values."""
return motherduck_cache_to_destination_configuration(cache=self)


# Expose the Cache class and also the Config class.
Expand Down
31 changes: 29 additions & 2 deletions airbyte/caches/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,19 @@

from __future__ import annotations

from pydantic import PrivateAttr
from typing import TYPE_CHECKING, ClassVar

from airbyte_api.models import DestinationPostgres

from airbyte._processors.sql.postgres import PostgresConfig, PostgresSqlProcessor
from airbyte.caches.base import CacheBase
from airbyte.destinations._translate_cache_to_dest import (
postgres_cache_to_destination_configuration,
)


if TYPE_CHECKING:
from airbyte.shared.sql_processor import SqlProcessorBase


class PostgresCache(PostgresConfig, CacheBase):
Expand All @@ -31,7 +40,25 @@ class PostgresCache(PostgresConfig, CacheBase):
Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
"""

_sql_processor_class = PrivateAttr(default=PostgresSqlProcessor)
_sql_processor_class: ClassVar[type[SqlProcessorBase]] = PostgresSqlProcessor

paired_destination_name: ClassVar[str | None] = "destination-bigquery"
paired_destination_config_class: ClassVar[type | None] = DestinationPostgres

@property
def paired_destination_config(self) -> DestinationPostgres:
"""Return a dictionary of destination configuration values."""
return postgres_cache_to_destination_configuration(cache=self)

def clone_as_cloud_destination_config(self) -> DestinationPostgres:
"""Return a DestinationPostgres instance with the same configuration."""
return DestinationPostgres(
host=self.host,
port=self.port,
username=self.username,
password=self.password,
database=self.database,
)


# Expose the Cache class and also the Config class.
Expand Down
19 changes: 16 additions & 3 deletions airbyte/caches/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,32 @@

from __future__ import annotations

from pydantic import PrivateAttr
from typing import ClassVar

from airbyte_api.models import DestinationSnowflake

from airbyte._processors.sql.snowflake import SnowflakeConfig, SnowflakeSqlProcessor
from airbyte.caches.base import CacheBase
from airbyte.shared.sql_processor import RecordDedupeMode
from airbyte.destinations._translate_cache_to_dest import (
snowflake_cache_to_destination_configuration,
)
from airbyte.shared.sql_processor import RecordDedupeMode, SqlProcessorBase


class SnowflakeCache(SnowflakeConfig, CacheBase):
"""Configuration for the Snowflake cache."""

dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND

_sql_processor_class = PrivateAttr(default=SnowflakeSqlProcessor)
_sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor

paired_destination_name: ClassVar[str | None] = "destination-bigquery"
paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake

@property
def paired_destination_config(self) -> DestinationSnowflake:
"""Return a dictionary of destination configuration values."""
return snowflake_cache_to_destination_configuration(cache=self)


# Expose the Cache class and also the Config class.
Expand Down
Loading
Loading