Skip to content

Commit

Permalink
feat(Airbyte CDK): add with_json_schema method to ConfiguredAirbyteSt…
Browse files Browse the repository at this point in the history
…reamBuilder (#40737)
  • Loading branch information
aldogonzalez8 authored Jul 5, 2024
1 parent 162bd15 commit 1422786
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
8 changes: 6 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from typing import List, Union, overload
from typing import Any, Dict, List, Union, overload

from airbyte_protocol.models import ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, SyncMode


class ConfiguredAirbyteStreamBuilder:
def __init__(self) -> None:
self._stream = {
self._stream: Dict[str, Any] = {
"stream": {
"name": "any name",
"json_schema": {},
Expand All @@ -32,6 +32,10 @@ def with_primary_key(self, pk: List[List[str]]) -> "ConfiguredAirbyteStreamBuild
self._stream["stream"]["source_defined_primary_key"] = pk # type: ignore # we assume that self._stream["stream"] is a Dict[str, Any]
return self

def with_json_schema(self, json_schema: Dict[str, Any]) -> "ConfiguredAirbyteStreamBuilder":
self._stream["stream"]["json_schema"] = json_schema
return self

def build(self) -> ConfiguredAirbyteStream:
return ConfiguredAirbyteStream.parse_obj(self._stream)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
#

from datetime import datetime, timezone
from typing import List, Optional
from typing import Any, Dict, List, Optional
from unittest import TestCase

import freezegun
from airbyte_cdk.models import AirbyteStateBlob, ConfiguredAirbyteCatalog, SyncMode, Type
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.catalog_builder import ConfiguredAirbyteStreamBuilder
from airbyte_cdk.test.entrypoint_wrapper import read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest
from airbyte_cdk.test.mock_http.response_builder import (
Expand Down Expand Up @@ -51,11 +51,13 @@ def build(self) -> HttpRequest:
)


def _create_catalog(names_and_sync_modes: List[tuple[str, SyncMode]]) -> ConfiguredAirbyteCatalog:
catalog_builder = CatalogBuilder()
for stream_name, sync_mode in names_and_sync_modes:
catalog_builder.with_stream(name=stream_name, sync_mode=sync_mode)
return catalog_builder.build()
def _create_catalog(names_and_sync_modes: List[tuple[str, SyncMode, Dict[str, Any]]]) -> ConfiguredAirbyteCatalog:
stream_builder = ConfiguredAirbyteStreamBuilder()
streams = []
for stream_name, sync_mode, json_schema in names_and_sync_modes:
streams.append(stream_builder.with_name(stream_name).with_sync_mode(sync_mode).with_json_schema(json_schema or {}))

return ConfiguredAirbyteCatalog(streams=list(map(lambda builder: builder.build(), streams)))


def _create_justice_songs_request() -> RequestBuilder:
Expand Down Expand Up @@ -138,7 +140,7 @@ def test_resumable_full_refresh_sync(self, http_mocker):
)

source = SourceFixture()
actual_messages = read(source, config=config, catalog=_create_catalog([("justice_songs", SyncMode.full_refresh)]))
actual_messages = read(source, config=config, catalog=_create_catalog([("justice_songs", SyncMode.full_refresh, {})]))

assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("justice_songs"))
assert len(actual_messages.records) == 5
Expand Down Expand Up @@ -185,7 +187,7 @@ def test_resumable_full_refresh_second_attempt(self, http_mocker):
)

source = SourceFixture()
actual_messages = read(source, config=config, catalog=_create_catalog([("justice_songs", SyncMode.full_refresh)]), state=state)
actual_messages = read(source, config=config, catalog=_create_catalog([("justice_songs", SyncMode.full_refresh, {})]), state=state)

assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("justice_songs"))
assert len(actual_messages.records) == 8
Expand Down Expand Up @@ -221,7 +223,7 @@ def test_resumable_full_refresh_failure(self, http_mocker):
http_mocker.get(_create_justice_songs_request().with_page(2).build(), _create_response().with_status_code(status_code=400).build())

source = SourceFixture()
actual_messages = read(source, config=config, catalog=_create_catalog([("justice_songs", SyncMode.full_refresh)]), expecting_exception=True)
actual_messages = read(source, config=config, catalog=_create_catalog([("justice_songs", SyncMode.full_refresh, {})]), expecting_exception=True)

status_messages = actual_messages.get_stream_statuses("justice_songs")
assert status_messages[-1] == AirbyteStreamStatus.INCOMPLETE
Expand Down

0 comments on commit 1422786

Please sign in to comment.