Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add Cloud deploy functionality out of experimental status #419

Merged
merged 33 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
9b40875
clean up cloud functionality
aaronsteers Oct 15, 2024
4777d24
fix docs generation
aaronsteers Oct 15, 2024
6799b29
undo test skip for Cloud integration tests
aaronsteers Dec 6, 2024
1d4f53e
Merge remote-tracking branch 'origin/main' into aj/feat/cloud-deploy-…
aaronsteers Dec 6, 2024
1664110
Merge remote-tracking branch 'origin/main' into aj/feat/cloud-deploy-…
aaronsteers Dec 7, 2024
e8e948d
some name fixes
aaronsteers Dec 7, 2024
b435550
misc updates (not running tests yet)
aaronsteers Dec 7, 2024
354548e
more fixes
aaronsteers Dec 7, 2024
fe0ce9c
add: _paired_destination_name
aaronsteers Dec 7, 2024
be9225d
rename to 'cache_type_to_destination_type'
aaronsteers Dec 7, 2024
d8b40d1
fix more tests
aaronsteers Dec 7, 2024
7c9a6ac
multiple fixes
aaronsteers Dec 9, 2024
d58bf93
improve tests
aaronsteers Dec 9, 2024
e5b6ec4
xfail postgres merge support check
aaronsteers Dec 9, 2024
413abf4
update tests
aaronsteers Dec 9, 2024
3e16ee4
fix more tests
aaronsteers Dec 9, 2024
9d52230
improve tests
aaronsteers Dec 9, 2024
90bf1a2
lint fixes
aaronsteers Dec 9, 2024
2ab2e68
lint fixes
aaronsteers Dec 9, 2024
8a1092e
ignore warnings
aaronsteers Dec 9, 2024
c8a41b5
chore: convert if/then to lookup mapping
aaronsteers Dec 10, 2024
452bc0e
fixes (full code review)
aaronsteers Dec 10, 2024
13a4656
apply fixes and suggestions
aaronsteers Dec 10, 2024
e4b010f
delete redundant test
aaronsteers Dec 10, 2024
b8d63f0
fixes and apply suggestions
aaronsteers Dec 10, 2024
349df46
apply suggestion (class var)
aaronsteers Dec 10, 2024
6c3127e
improve exception handling
aaronsteers Dec 10, 2024
6325a53
Merge remote-tracking branch 'origin/main' into aj/feat/cloud-deploy-…
aaronsteers Dec 10, 2024
1bdcab1
fix list_destinations
aaronsteers Dec 10, 2024
ec5398b
fix return type
aaronsteers Dec 10, 2024
e4397d4
import types from api package
aaronsteers Dec 10, 2024
34f925e
Auto-fix lint and format issues
Dec 10, 2024
93fb13c
Auto-fix lint issues (unsafe)
Dec 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
333 changes: 260 additions & 73 deletions airbyte/_util/api_util.py

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions airbyte/_util/text_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Utility functions for working with text."""

from __future__ import annotations

import ulid


def generate_ulid() -> str:
"""Generate a new ULID."""
return str(ulid.ULID())


def generate_random_suffix() -> str:
"""Generate a random suffix for use in temporary names.

By default, this function generates a ULID and returns a 9-character string
which will be monotonically sortable. It is not guaranteed to be unique but
is sufficient for small-scale and medium-scale use cases.
"""
ulid_str = generate_ulid()
return ulid_str[:6] + ulid_str[-3:]
5 changes: 1 addition & 4 deletions airbyte/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ class CacheBase(SqlConfig, AirbyteWriterInterface):
"""Whether to clean up the cache after use."""

_name: str = PrivateAttr()

_deployed_api_root: str | None = PrivateAttr(default=None)
_deployed_workspace_id: str | None = PrivateAttr(default=None)
_deployed_destination_id: str | None = PrivateAttr(default=None)
_paired_destination_name: str

_sql_processor_class: type[SqlProcessorBase] = PrivateAttr()
_read_processor: SqlProcessorBase = PrivateAttr()
Expand Down
1 change: 1 addition & 0 deletions airbyte/caches/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class BigQueryCache(BigQueryConfig, CacheBase):
"""The BigQuery cache implementation."""

_sql_processor_class: type[BigQuerySqlProcessor] = PrivateAttr(default=BigQuerySqlProcessor)
_paired_destination_name: str = "destination-bigquery"

def get_arrow_dataset(
self,
Expand Down
1 change: 1 addition & 0 deletions airbyte/caches/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class DuckDBCache(DuckDBConfig, CacheBase):
"""A DuckDB cache."""

_sql_processor_class: type[DuckDBSqlProcessor] = PrivateAttr(default=DuckDBSqlProcessor)
_paired_destination_name: str = "destination-duckdb"


# Expose the Cache class and also the Config class.
Expand Down
1 change: 1 addition & 0 deletions airbyte/caches/motherduck.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class MotherDuckConfig(DuckDBConfig):
database: str = Field()
api_key: SecretString = Field()
db_path: str = Field(default="md:")
_paired_destination_name: str = "destination-motherduck"

@overrides
def get_sql_alchemy_url(self) -> SecretString:
Expand Down
1 change: 1 addition & 0 deletions airbyte/caches/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class PostgresCache(PostgresConfig, CacheBase):
"""

_sql_processor_class = PrivateAttr(default=PostgresSqlProcessor)
_paired_destination_name: str = "destination-postgres"


# Expose the Cache class and also the Config class.
Expand Down
1 change: 1 addition & 0 deletions airbyte/caches/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class SnowflakeCache(SnowflakeConfig, CacheBase):
dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND

_sql_processor_class = PrivateAttr(default=SnowflakeSqlProcessor)
_paired_destination_name: str = "destination-snowflake"


# Expose the Cache class and also the Config class.
Expand Down
58 changes: 36 additions & 22 deletions airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING, cast

from airbyte._util import api_util
from airbyte.cloud.connectors import CloudDestination, CloudSource
from airbyte.cloud.sync_results import SyncResult


Expand Down Expand Up @@ -52,7 +53,8 @@ def _fetch_connection_info(self) -> ConnectionResponse:
workspace_id=self.workspace.workspace_id,
connection_id=self.connection_id,
api_root=self.workspace.api_root,
api_key=self.workspace.api_key,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
)

# Properties
Expand All @@ -68,6 +70,14 @@ def source_id(self) -> str:

return cast(str, self._source_id)

@property
def source(self) -> CloudSource:
"""Get the source object."""
return CloudSource(
workspace=self.workspace,
connector_id=self.source_id,
)

aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
@property
def destination_id(self) -> str:
"""The ID of the destination."""
Expand All @@ -79,21 +89,29 @@ def destination_id(self) -> str:

return cast(str, self._destination_id)

@property
def destination(self) -> CloudDestination:
"""Get the source object."""
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
return CloudDestination(
workspace=self.workspace,
connector_id=self.destination_id,
)

@property
def stream_names(self) -> list[str]:
"""The stream names."""
if not self._connection_info:
self._connection_info = self._fetch_connection_info()

return [stream.name for stream in self._connection_info.configurations.streams]
return [stream.name for stream in self._connection_info.configurations.streams or []]

@property
def table_prefix(self) -> str:
"""The table prefix."""
if not self._connection_info:
self._connection_info = self._fetch_connection_info()

return self._connection_info.prefix
return self._connection_info.prefix or ""

@property
def connection_url(self) -> str | None:
Expand All @@ -117,8 +135,9 @@ def run_sync(
connection_response = api_util.run_connection(
connection_id=self.connection_id,
api_root=self.workspace.api_root,
api_key=self.workspace.api_key,
workspace_id=self.workspace.workspace_id,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
)
sync_result = SyncResult(
workspace=self.workspace,
Expand Down Expand Up @@ -146,9 +165,10 @@ def get_previous_sync_logs(
sync_logs: list[JobResponse] = api_util.get_job_logs(
connection_id=self.connection_id,
api_root=self.workspace.api_root,
api_key=self.workspace.api_key,
workspace_id=self.workspace.workspace_id,
limit=limit,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
)
return [
SyncResult(
Expand All @@ -162,7 +182,7 @@ def get_previous_sync_logs(

def get_sync_result(
self,
job_id: str | None = None,
job_id: int | None = None,
) -> SyncResult | None:
"""Get the sync result for the connection.

Expand All @@ -189,28 +209,22 @@ def get_sync_result(

# Deletions

def _permanently_delete(
def permanently_delete(
self,
*,
delete_source: bool = False,
delete_destination: bool = False,
cascade_delete_source: bool = False,
cascade_delete_destination: bool = False,
) -> None:
"""Delete the connection.

Args:
delete_source: Whether to also delete the source.
delete_destination: Whether to also delete the destination.
cascade_delete_source: Whether to also delete the source.
cascade_delete_destination: Whether to also delete the destination.
"""
self.workspace._permanently_delete_connection( # noqa: SLF001 # Non-public API (for now)
connection=self
)
self.workspace.permanently_delete_connection(self)

if delete_source:
self.workspace._permanently_delete_source( # noqa: SLF001 # Non-public API (for now)
source=self.source_id
)
if cascade_delete_source:
self.workspace.permanently_delete_source(self.source_id)

if delete_destination:
self.workspace._permanently_delete_destination( # noqa: SLF001 # Non-public API
destination=self.destination_id,
)
if cascade_delete_destination:
self.workspace.permanently_delete_destination(self.destination_id)
81 changes: 81 additions & 0 deletions airbyte/cloud/connectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Cloud connectors module for working with Cloud sources and destinations."""

from __future__ import annotations

import abc
from typing import TYPE_CHECKING, Literal


if TYPE_CHECKING:
from airbyte.cloud.workspaces import CloudWorkspace


class CloudConnector(abc.ABC):
"""A cloud connector is a deployed source or destination on Airbyte Cloud.

You can use a connector object to manage the connector.
"""

@property
@abc.abstractmethod
def connector_type(self) -> Literal["source", "destination"]:
"""Get the type of the connector."""
...

def __init__(
self,
workspace: CloudWorkspace,
connector_id: str,
) -> None:
"""Initialize a cloud connector object."""
self.workspace = workspace
"""The workspace that the connector belongs to."""
self.connector_id = connector_id
"""The ID of the connector."""

@property
def connector_url(self) -> str:
"""Get the URL of the source connector."""
return f"{self.workspace.workspace_url}/{self.connector_type}s/{self.connector_id}"

def permanently_delete(self) -> None:
"""Permanently delete the connector."""
if self.connector_type == "source":
self.workspace.permanently_delete_source(self.connector_id)
else:
self.workspace.permanently_delete_destination(self.connector_id)


class CloudSource(CloudConnector):
"""A cloud source is a source that is deployed on Airbyte Cloud."""

@property
def source_id(self) -> str:
"""Get the ID of the source.

This is an alias for `connector_id`.
"""
return self.connector_id

@property
def connector_type(self) -> Literal["source", "destination"]:
"""Get the type of the connector."""
return "source"

aaronsteers marked this conversation as resolved.
Show resolved Hide resolved

class CloudDestination(CloudConnector):
"""A cloud destination is a destination that is deployed on Airbyte Cloud."""

@property
def destination_id(self) -> str:
"""Get the ID of the destination.

This is an alias for `connector_id`.
"""
return self.connector_id

@property
def connector_type(self) -> Literal["source", "destination"]:
"""Get the type of the connector."""
return "destination"
Empty file added airbyte/cloud/destinations.py
Empty file.
60 changes: 0 additions & 60 deletions airbyte/cloud/experimental.py

This file was deleted.

Loading
Loading