Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Airbyte CDK): add with_json_schema method to ConfiguredAirbyteStreamBuilder #40737

Merged
merged 12 commits into from
Jul 5, 2024
Merged
14 changes: 10 additions & 4 deletions airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from typing import List, Union, overload
from typing import Any, Dict, List, Union, overload

from airbyte_protocol.models import ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, SyncMode

Expand Down Expand Up @@ -32,6 +32,10 @@ def with_primary_key(self, pk: List[List[str]]) -> "ConfiguredAirbyteStreamBuild
self._stream["stream"]["source_defined_primary_key"] = pk # type: ignore # we assume that self._stream["stream"] is a Dict[str, Any]
return self

def with_json_schema(self, json_schema: Dict[str, Any]) -> "ConfiguredAirbyteStreamBuilder":
self._stream["stream"]["json_schema"] = json_schema
return self

def build(self) -> ConfiguredAirbyteStream:
return ConfiguredAirbyteStream.parse_obj(self._stream)

Expand All @@ -45,10 +49,12 @@ def with_stream(self, name: ConfiguredAirbyteStreamBuilder) -> "CatalogBuilder":
...

@overload
def with_stream(self, name: str, sync_mode: SyncMode) -> "CatalogBuilder":
def with_stream(self, name: str, sync_mode: SyncMode, json_schema: Dict[str, Any]) -> "CatalogBuilder":
...

def with_stream(self, name: Union[str, ConfiguredAirbyteStreamBuilder], sync_mode: Union[SyncMode, None] = None) -> "CatalogBuilder":
def with_stream(
aldogonzalez8 marked this conversation as resolved.
Show resolved Hide resolved
self, name: Union[str, ConfiguredAirbyteStreamBuilder], sync_mode: Union[SyncMode, None] = None, json_schema: Dict[str, Any] = None
) -> "CatalogBuilder":
# As we are introducing a fully fledge ConfiguredAirbyteStreamBuilder, we would like to deprecate the previous interface
# with_stream(str, SyncMode)

Expand All @@ -57,7 +63,7 @@ def with_stream(self, name: Union[str, ConfiguredAirbyteStreamBuilder], sync_mod
builder = (
name_or_builder
if isinstance(name_or_builder, ConfiguredAirbyteStreamBuilder)
else ConfiguredAirbyteStreamBuilder().with_name(name_or_builder).with_sync_mode(sync_mode)
else ConfiguredAirbyteStreamBuilder().with_name(name_or_builder).with_sync_mode(sync_mode).with_json_schema(json_schema or {})
)
self._streams.append(builder)
return self
Expand Down
25 changes: 25 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/test/entrypoint_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pathlib import Path
from typing import Any, List, Mapping, Optional, Union

from airbyte_cdk.connector import BaseConnector
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.exception_handler import assemble_uncaught_exception
from airbyte_cdk.logger import AirbyteLogFormatter
Expand Down Expand Up @@ -145,6 +146,30 @@ def _run_command(source: Source, args: List[str], expecting_exception: bool = Fa
return EntrypointOutput(messages + captured_logs, uncaught_exception)


def _get_catalog(args: List[str]) -> Mapping[str, Any]:
try:
catalog = AirbyteEntrypoint.extract_catalog(args)
return BaseConnector.read_config(catalog)
except Exception as exception:
print("Printing unexpected error from entrypoint_wrapper")
print("".join(traceback.format_exception(None, exception, exception.__traceback__)))


def get_catalog(config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog):
aldogonzalez8 marked this conversation as resolved.
Show resolved Hide resolved
with tempfile.TemporaryDirectory() as tmp_directory:
tmp_directory_path = Path(tmp_directory)
config_file = make_file(tmp_directory_path / "config.json", config)
catalog_file = make_file(tmp_directory_path / "catalog.json", catalog.json())
args = [
"read",
"--config",
config_file,
"--catalog",
catalog_file,
]
return _get_catalog(args)


def discover(
source: Source,
config: Mapping[str, Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
#

from datetime import datetime, timezone
from typing import List, Optional
from typing import Any, Dict, List, Optional
from unittest import TestCase

import freezegun
from airbyte_cdk.models import AirbyteStateBlob, ConfiguredAirbyteCatalog, SyncMode, Type
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import read
from airbyte_cdk.test.entrypoint_wrapper import get_catalog, read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest
from airbyte_cdk.test.mock_http.response_builder import (
FieldPath,
Expand Down Expand Up @@ -58,6 +58,12 @@ def _create_catalog(names_and_sync_modes: List[tuple[str, SyncMode]]) -> Configu
return catalog_builder.build()


def _create_catalog_with_configured_schema(names_and_sync_modes: List[tuple[str, SyncMode, str: Any]]) -> ConfiguredAirbyteCatalog:
catalog_builder = CatalogBuilder()
for stream_name, sync_mode, json_schema in names_and_sync_modes:
catalog_builder.with_stream(name=stream_name, sync_mode=sync_mode, json_schema=json_schema)
return catalog_builder.build()

def _create_justice_songs_request() -> RequestBuilder:
return RequestBuilder.justice_songs_endpoint()

Expand Down Expand Up @@ -157,6 +163,43 @@ def test_resumable_full_refresh_sync(self, http_mocker):
assert actual_messages.state_messages[3].state.stream.stream_state == AirbyteStateBlob()
assert actual_messages.state_messages[3].state.sourceStats.recordCount == 0.0

@HttpMocker()
def test_resumable_full_refresh_sync_with_configured_schema(self, http_mocker):
aldogonzalez8 marked this conversation as resolved.
Show resolved Hide resolved
config = {}

http_mocker.get(
_create_justice_songs_request().build(),
_create_response(pagination_has_more=False).with_record(record=_create_record("justice_songs")).build(),
)


source = SourceFixture()
json_schema: Dict[str:any] = {
"properties":
{"id":
{
"description" : "id",
"type" : "string"
}
}
}
catalog = _create_catalog_with_configured_schema([("justice_songs", SyncMode.full_refresh, json_schema)])
actual_messages = read(source, config=config, catalog=catalog)

assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("justice_songs"))
assert len(actual_messages.records) == 1
assert len(actual_messages.state_messages) == 2
validate_message_order([Type.RECORD, Type.STATE, Type.STATE], actual_messages.records_and_state_messages)
assert actual_messages.state_messages[0].state.stream.stream_descriptor.name == "justice_songs"
assert actual_messages.state_messages[0].state.stream.stream_state == AirbyteStateBlob()
assert actual_messages.state_messages[0].state.sourceStats.recordCount == 1.0
assert actual_messages.state_messages[1].state.stream.stream_descriptor.name == "justice_songs"
assert actual_messages.state_messages[1].state.stream.stream_state == AirbyteStateBlob()
assert actual_messages.state_messages[1].state.sourceStats.recordCount == 0.0

entrypoint_catalog = get_catalog(config=config ,catalog=catalog)
assert entrypoint_catalog["streams"][0]["stream"]["json_schema"] == json_schema

@HttpMocker()
def test_resumable_full_refresh_second_attempt(self, http_mocker):
config = {}
Expand Down
Loading