Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add Cloud deploy functionality out of experimental status #419

Merged
merged 33 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
9b40875
clean up cloud functionality
aaronsteers Oct 15, 2024
4777d24
fix docs generation
aaronsteers Oct 15, 2024
6799b29
undo test skip for Cloud integration tests
aaronsteers Dec 6, 2024
1d4f53e
Merge remote-tracking branch 'origin/main' into aj/feat/cloud-deploy-…
aaronsteers Dec 6, 2024
1664110
Merge remote-tracking branch 'origin/main' into aj/feat/cloud-deploy-…
aaronsteers Dec 7, 2024
e8e948d
some name fixes
aaronsteers Dec 7, 2024
b435550
misc updates (not running tests yet)
aaronsteers Dec 7, 2024
354548e
more fixes
aaronsteers Dec 7, 2024
fe0ce9c
add: _paired_destination_name
aaronsteers Dec 7, 2024
be9225d
rename to 'cache_type_to_destination_type'
aaronsteers Dec 7, 2024
d8b40d1
fix more tests
aaronsteers Dec 7, 2024
7c9a6ac
multiple fixes
aaronsteers Dec 9, 2024
d58bf93
improve tests
aaronsteers Dec 9, 2024
e5b6ec4
xfail postgres merge support check
aaronsteers Dec 9, 2024
413abf4
update tests
aaronsteers Dec 9, 2024
3e16ee4
fix more tests
aaronsteers Dec 9, 2024
9d52230
improve tests
aaronsteers Dec 9, 2024
90bf1a2
lint fixes
aaronsteers Dec 9, 2024
2ab2e68
lint fixes
aaronsteers Dec 9, 2024
8a1092e
ignore warnings
aaronsteers Dec 9, 2024
c8a41b5
chore: convert if/then to lookup mapping
aaronsteers Dec 10, 2024
452bc0e
fixes (full code review)
aaronsteers Dec 10, 2024
13a4656
apply fixes and suggestions
aaronsteers Dec 10, 2024
e4b010f
delete redundant test
aaronsteers Dec 10, 2024
b8d63f0
fixes and apply suggestions
aaronsteers Dec 10, 2024
349df46
apply suggestion (class var)
aaronsteers Dec 10, 2024
6c3127e
improve exception handling
aaronsteers Dec 10, 2024
6325a53
Merge remote-tracking branch 'origin/main' into aj/feat/cloud-deploy-…
aaronsteers Dec 10, 2024
1bdcab1
fix list_destinations
aaronsteers Dec 10, 2024
ec5398b
fix return type
aaronsteers Dec 10, 2024
e4397d4
import types from api package
aaronsteers Dec 10, 2024
34f925e
Auto-fix lint and format issues
Dec 10, 2024
93fb13c
Auto-fix lint issues (unsafe)
Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 151 additions & 39 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from __future__ import annotations

import json
from typing import Any
from typing import TYPE_CHECKING, Any

import airbyte_api
from airbyte_api import api, models
Expand All @@ -24,9 +24,14 @@
AirbyteError,
AirbyteMissingResourceError,
AirbyteMultipleResourcesError,
PyAirbyteInputError,
)


if TYPE_CHECKING:
from collections.abc import Callable


JOB_WAIT_INTERVAL_SECS = 2.0
JOB_WAIT_TIMEOUT_SECS_DEFAULT = 60 * 60 # 1 hour
CLOUD_API_ROOT = "https://api.airbyte.com/v1"
Expand All @@ -43,7 +48,7 @@ def get_airbyte_server_instance(
*,
api_key: str,
api_root: str,
) -> airbyte_api.Airbyte:
) -> airbyte_api.AirbyteAPI:
"""Get an Airbyte instance."""
return airbyte_api.AirbyteAPI(
security=models.Security(
Expand Down Expand Up @@ -84,36 +89,126 @@ def get_workspace(
)


# List, get, and run connections
# List resources


def list_connections(
workspace_id: str,
*,
api_root: str,
api_key: str,
) -> list[api.ConnectionResponse]:
name: str | None = None,
name_filter: Callable[[str], bool] | None = None,
) -> list[models.ConnectionResponse]:
"""Get a connection."""
if name and name_filter:
raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")

name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)

_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.connections.list_connections(
api.ListConnectionsRequest()(
api.ListConnectionsRequest(
workspace_ids=[workspace_id],
),
)

if status_ok(response.status_code) and response.connections_response:
return response.connections_response.data
if not status_ok(response.status_code) and response.connections_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.connections_response is not None
return [
connection
for connection in response.connections_response.data
if name_filter(connection.name)
]


raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
def list_sources(
workspace_id: str,
*,
api_root: str,
api_key: str,
name: str | None = None,
name_filter: Callable[[str], bool] | None = None,
) -> list[models.SourceResponse]:
"""Get a connection."""
if name and name_filter:
raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")

name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)

_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.sources.list_sources(
api.ListSourcesRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.sources_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.sources_response is not None
return [source for source in response.sources_response.data if name_filter(source.name)]


def list_destinations(
workspace_id: str,
*,
api_root: str,
api_key: str,
name: str | None = None,
name_filter: Callable[[str], bool] | None = None,
) -> list[models.DestinationResponse]:
"""Get a connection."""
if name and name_filter:
raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")

name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)

_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.destinations.list_destinations(
api.ListDestinationsRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.destinations_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.destinations_response is not None
return [
destination
for destination in response.destinations_response.data
if name_filter(destination.name)
]


# Get and run connections


def get_connection(
Expand All @@ -122,7 +217,7 @@ def get_connection(
*,
api_root: str,
api_key: str,
) -> api.ConnectionResponse:
) -> models.ConnectionResponse:
"""Get a connection."""
_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
Expand All @@ -137,7 +232,11 @@ def get_connection(
if status_ok(response.status_code) and response.connection_response:
return response.connection_response

raise AirbyteMissingResourceError(connection_id, "connection", response.text)
raise AirbyteMissingResourceError(
resource_name_or_id=connection_id,
resource_type="connection",
log_text=response.raw_response.text,
)


def run_connection(
Expand All @@ -146,7 +245,7 @@ def run_connection(
*,
api_root: str,
api_key: str,
) -> api.ConnectionResponse:
) -> models.JobResponse:
"""Get a connection.

If block is True, this will block until the connection is finished running.
Expand Down Expand Up @@ -186,7 +285,7 @@ def get_job_logs(
*,
api_root: str,
api_key: str,
) -> list[api.JobResponse]:
) -> list[models.JobResponse]:
"""Get a job's logs."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -213,11 +312,11 @@ def get_job_logs(


def get_job_info(
job_id: str,
job_id: int,
*,
api_root: str,
api_key: str,
) -> api.JobResponse:
) -> models.JobResponse:
"""Get a job."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -231,7 +330,11 @@ def get_job_info(
if status_ok(response.status_code) and response.job_response:
return response.job_response

raise AirbyteMissingResourceError(job_id, "job", response.text)
raise AirbyteMissingResourceError(
resource_name_or_id=str(job_id),
resource_type="job",
log_text=response.raw_response.text,
)


# Create, get, and delete sources
Expand All @@ -244,7 +347,7 @@ def create_source(
config: dict[str, Any],
api_root: str,
api_key: str,
) -> api.SourceResponse:
) -> models.SourceResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand Down Expand Up @@ -273,7 +376,7 @@ def get_source(
*,
api_root: str,
api_key: str,
) -> api.SourceResponse:
) -> models.SourceResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -284,10 +387,14 @@ def get_source(
source_id=source_id,
),
)
if status_ok(response.status_code) and response.connection_response:
return response.connection_response
if status_ok(response.status_code) and response.source_response:
return response.source_response

raise AirbyteMissingResourceError(source_id, "source", response.text)
raise AirbyteMissingResourceError(
resource_name_or_id=source_id,
resource_type="source",
log_text=response.raw_response.text,
)


def delete_source(
Expand Down Expand Up @@ -327,7 +434,7 @@ def create_destination(
config: dict[str, Any],
api_root: str,
api_key: str,
) -> api.DestinationResponse:
) -> models.DestinationResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -354,7 +461,7 @@ def get_destination(
*,
api_root: str,
api_key: str,
) -> api.DestinationResponse:
) -> models.DestinationResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -365,33 +472,38 @@ def get_destination(
destination_id=destination_id,
),
)
if status_ok(response.status_code):
if status_ok(response.status_code) and response.destination_response:
# TODO: This is a temporary workaround to resolve an issue where
# the destination API response is of the wrong type.
# https://github.com/airbytehq/pyairbyte/issues/320
raw_response: dict[str, Any] = json.loads(response.raw_response.text)
raw_configuration: dict[str, Any] = raw_response["configuration"]

destination_type = raw_response.get("destinationType")
if destination_type == "snowflake":
response.destination_response.configuration = models.DestinationSnowflake.from_dict(
raw_configuration,
response.destination_response.configuration = models.DestinationSnowflake(
**raw_configuration,
)
if destination_type == "bigquery":
response.destination_response.configuration = models.DestinationBigquery.from_dict(
raw_configuration,
response.destination_response.configuration = models.DestinationBigquery(
**raw_configuration,
)
if destination_type == "postgres":
response.destination_response.configuration = models.DestinationPostgres.from_dict(
raw_configuration,
response.destination_response.configuration = models.DestinationPostgres(
**raw_configuration,
)
if destination_type == "duckdb":
response.destination_response.configuration = models.DestinationDuckdb.from_dict(
raw_configuration,
response.destination_response.configuration = models.DestinationDuckdb(
**raw_configuration,
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
)

return response.destination_response

raise AirbyteMissingResourceError(destination_id, "destination", response.text)
raise AirbyteMissingResourceError(
resource_name_or_id=destination_id,
resource_type="destination",
log_text=response.raw_response.text,
)


def delete_destination(
Expand Down Expand Up @@ -448,17 +560,17 @@ def create_connection(
)
stream_configurations.append(stream_configuration)

stream_configurations = models.StreamConfigurations(stream_configurations)
stream_configurations_obj = models.StreamConfigurations(stream_configurations)
response = airbyte_instance.connections.create_connection(
models.ConnectionCreateRequest(
name=name,
source_id=source_id,
destination_id=destination_id,
configurations=stream_configurations,
configurations=stream_configurations_obj,
prefix=prefix,
),
)
if not status_ok(response.status_code):
if not status_ok(response.status_code) or response.connection_response is None:
raise AirbyteError(
context={
"source_id": source_id,
Expand Down
22 changes: 22 additions & 0 deletions airbyte/_util/text_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Utility functions for working with text."""

from __future__ import annotations

import ulid


def generate_ulid() -> str:
"""Generate a new ULID."""
return str(ulid.ULID())


def generate_random_suffix() -> str:
"""Generate a random suffix for use in temporary names.

By default, this function generates a ULID and returns a 9-character string
which will be monotonically sortable. It is not guaranteed to be unique but
is sufficient for small-scale and medium-scale use cases.
"""
ulid_str = generate_ulid()
return ulid_str[:6] + ulid_str[-3:]
5 changes: 1 addition & 4 deletions airbyte/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ class CacheBase(SqlConfig, AirbyteWriterInterface):
"""Whether to clean up the cache after use."""

_name: str = PrivateAttr()

_deployed_api_root: str | None = PrivateAttr(default=None)
_deployed_workspace_id: str | None = PrivateAttr(default=None)
_deployed_destination_id: str | None = PrivateAttr(default=None)
_paired_destination_name: str

_sql_processor_class: type[SqlProcessorBase] = PrivateAttr()
_read_processor: SqlProcessorBase = PrivateAttr()
Expand Down
Loading
Loading