Skip to content

Commit

Permalink
clean up cloud functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Oct 15, 2024
1 parent f3bc4c6 commit 9b40875
Show file tree
Hide file tree
Showing 9 changed files with 499 additions and 277 deletions.
190 changes: 151 additions & 39 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from __future__ import annotations

import json
from typing import Any
from typing import TYPE_CHECKING, Any

import airbyte_api
from airbyte_api import api, models
Expand All @@ -24,9 +24,14 @@
AirbyteError,
AirbyteMissingResourceError,
AirbyteMultipleResourcesError,
PyAirbyteInputError,
)


if TYPE_CHECKING:
from collections.abc import Callable


JOB_WAIT_INTERVAL_SECS = 2.0
JOB_WAIT_TIMEOUT_SECS_DEFAULT = 60 * 60 # 1 hour
CLOUD_API_ROOT = "https://api.airbyte.com/v1"
Expand All @@ -43,7 +48,7 @@ def get_airbyte_server_instance(
*,
api_key: str,
api_root: str,
) -> airbyte_api.Airbyte:
) -> airbyte_api.AirbyteAPI:
"""Get an Airbyte instance."""
return airbyte_api.AirbyteAPI(
security=models.Security(
Expand Down Expand Up @@ -84,36 +89,126 @@ def get_workspace(
)


# List, get, and run connections
# List resources


def list_connections(
workspace_id: str,
*,
api_root: str,
api_key: str,
) -> list[api.ConnectionResponse]:
name: str | None = None,
name_filter: Callable[[str], bool] | None = None,
) -> list[models.ConnectionResponse]:
"""Get a connection."""
if name and name_filter:
raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")

name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)

_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.connections.list_connections(
api.ListConnectionsRequest()(
api.ListConnectionsRequest(
workspace_ids=[workspace_id],
),
)

if status_ok(response.status_code) and response.connections_response:
return response.connections_response.data
if not status_ok(response.status_code) and response.connections_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.connections_response is not None
return [
connection
for connection in response.connections_response.data
if name_filter(connection.name)
]


raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
def list_sources(
workspace_id: str,
*,
api_root: str,
api_key: str,
name: str | None = None,
name_filter: Callable[[str], bool] | None = None,
) -> list[models.SourceResponse]:
"""Get a connection."""
if name and name_filter:
raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")

name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)

_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.sources.list_sources(
api.ListSourcesRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.sources_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.sources_response is not None
return [source for source in response.sources_response.data if name_filter(source.name)]


def list_destinations(
workspace_id: str,
*,
api_root: str,
api_key: str,
name: str | None = None,
name_filter: Callable[[str], bool] | None = None,
) -> list[models.DestinationResponse]:
"""Get a connection."""
if name and name_filter:
raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")

name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)

_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.destinations.list_destinations(
api.ListDestinationsRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.destinations_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.destinations_response is not None
return [
destination
for destination in response.destinations_response.data
if name_filter(destination.name)
]


# Get and run connections


def get_connection(
Expand All @@ -122,7 +217,7 @@ def get_connection(
*,
api_root: str,
api_key: str,
) -> api.ConnectionResponse:
) -> models.ConnectionResponse:
"""Get a connection."""
_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
Expand All @@ -137,7 +232,11 @@ def get_connection(
if status_ok(response.status_code) and response.connection_response:
return response.connection_response

raise AirbyteMissingResourceError(connection_id, "connection", response.text)
raise AirbyteMissingResourceError(
resource_name_or_id=connection_id,
resource_type="connection",
log_text=response.raw_response.text,
)


def run_connection(
Expand All @@ -146,7 +245,7 @@ def run_connection(
*,
api_root: str,
api_key: str,
) -> api.ConnectionResponse:
) -> models.JobResponse:
"""Get a connection.
If block is True, this will block until the connection is finished running.
Expand Down Expand Up @@ -186,7 +285,7 @@ def get_job_logs(
*,
api_root: str,
api_key: str,
) -> list[api.JobResponse]:
) -> list[models.JobResponse]:
"""Get a job's logs."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -213,11 +312,11 @@ def get_job_logs(


def get_job_info(
job_id: str,
job_id: int,
*,
api_root: str,
api_key: str,
) -> api.JobResponse:
) -> models.JobResponse:
"""Get a job."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -231,7 +330,11 @@ def get_job_info(
if status_ok(response.status_code) and response.job_response:
return response.job_response

raise AirbyteMissingResourceError(job_id, "job", response.text)
raise AirbyteMissingResourceError(
resource_name_or_id=str(job_id),
resource_type="job",
log_text=response.raw_response.text,
)


# Create, get, and delete sources
Expand All @@ -244,7 +347,7 @@ def create_source(
config: dict[str, Any],
api_root: str,
api_key: str,
) -> api.SourceResponse:
) -> models.SourceResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand Down Expand Up @@ -273,7 +376,7 @@ def get_source(
*,
api_root: str,
api_key: str,
) -> api.SourceResponse:
) -> models.SourceResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -284,10 +387,14 @@ def get_source(
source_id=source_id,
),
)
if status_ok(response.status_code) and response.connection_response:
return response.connection_response
if status_ok(response.status_code) and response.source_response:
return response.source_response

raise AirbyteMissingResourceError(source_id, "source", response.text)
raise AirbyteMissingResourceError(
resource_name_or_id=source_id,
resource_type="source",
log_text=response.raw_response.text,
)


def delete_source(
Expand Down Expand Up @@ -327,7 +434,7 @@ def create_destination(
config: dict[str, Any],
api_root: str,
api_key: str,
) -> api.DestinationResponse:
) -> models.DestinationResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -354,7 +461,7 @@ def get_destination(
*,
api_root: str,
api_key: str,
) -> api.DestinationResponse:
) -> models.DestinationResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -365,33 +472,38 @@ def get_destination(
destination_id=destination_id,
),
)
if status_ok(response.status_code):
if status_ok(response.status_code) and response.destination_response:
# TODO: This is a temporary workaround to resolve an issue where
# the destination API response is of the wrong type.
# https://github.com/airbytehq/pyairbyte/issues/320
raw_response: dict[str, Any] = json.loads(response.raw_response.text)
raw_configuration: dict[str, Any] = raw_response["configuration"]

destination_type = raw_response.get("destinationType")
if destination_type == "snowflake":
response.destination_response.configuration = models.DestinationSnowflake.from_dict(
raw_configuration,
response.destination_response.configuration = models.DestinationSnowflake(
**raw_configuration,
)
if destination_type == "bigquery":
response.destination_response.configuration = models.DestinationBigquery.from_dict(
raw_configuration,
response.destination_response.configuration = models.DestinationBigquery(
**raw_configuration,
)
if destination_type == "postgres":
response.destination_response.configuration = models.DestinationPostgres.from_dict(
raw_configuration,
response.destination_response.configuration = models.DestinationPostgres(
**raw_configuration,
)
if destination_type == "duckdb":
response.destination_response.configuration = models.DestinationDuckdb.from_dict(
raw_configuration,
response.destination_response.configuration = models.DestinationDuckdb(
**raw_configuration,
)

return response.destination_response

raise AirbyteMissingResourceError(destination_id, "destination", response.text)
raise AirbyteMissingResourceError(
resource_name_or_id=destination_id,
resource_type="destination",
log_text=response.raw_response.text,
)


def delete_destination(
Expand Down Expand Up @@ -448,17 +560,17 @@ def create_connection(
)
stream_configurations.append(stream_configuration)

stream_configurations = models.StreamConfigurations(stream_configurations)
stream_configurations_obj = models.StreamConfigurations(stream_configurations)
response = airbyte_instance.connections.create_connection(
models.ConnectionCreateRequest(
name=name,
source_id=source_id,
destination_id=destination_id,
configurations=stream_configurations,
configurations=stream_configurations_obj,
prefix=prefix,
),
)
if not status_ok(response.status_code):
if not status_ok(response.status_code) or response.connection_response is None:
raise AirbyteError(
context={
"source_id": source_id,
Expand Down
22 changes: 22 additions & 0 deletions airbyte/_util/text_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Utility functions for working with text."""

from __future__ import annotations

import ulid


def generate_ulid() -> str:
"""Generate a new ULID."""
return str(ulid.ULID())


def generate_random_suffix() -> str:
"""Generate a random suffix for use in temporary names.
By default, this function generates a ULID and returns a 9-character string
which will be monotonically sortable. It is not guaranteed to be unique but
is sufficient for small-scale and medium-scale use cases.
"""
ulid_str = generate_ulid()
return ulid_str[:6] + ulid_str[-3:]
4 changes: 0 additions & 4 deletions airbyte/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ class CacheBase(SqlConfig, AirbyteWriterInterface):

_name: str = PrivateAttr()

_deployed_api_root: str | None = PrivateAttr(default=None)
_deployed_workspace_id: str | None = PrivateAttr(default=None)
_deployed_destination_id: str | None = PrivateAttr(default=None)

_sql_processor_class: type[SqlProcessorBase] = PrivateAttr()
_read_processor: SqlProcessorBase = PrivateAttr()

Expand Down
Loading

0 comments on commit 9b40875

Please sign in to comment.