From 05fece7c02caf8efbe916f570ddcc1a4712b7c66 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 4 Jul 2024 13:11:23 -0600 Subject: [PATCH 1/9] Airbyte CDK: add with_json_schema method to CatalogBuilder --- .../airbyte_cdk/test/catalog_builder.py | 14 ++++-- .../airbyte_cdk/test/entrypoint_wrapper.py | 25 ++++++++++ .../test_resumable_full_refresh.py | 47 ++++++++++++++++++- 3 files changed, 80 insertions(+), 6 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py b/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py index bb4cc162f4cef..7fee5f4c4908f 100644 --- a/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py +++ b/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py @@ -1,6 +1,6 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. -from typing import List, Union, overload +from typing import Any, Dict, List, Union, overload from airbyte_protocol.models import ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, SyncMode @@ -32,6 +32,10 @@ def with_primary_key(self, pk: List[List[str]]) -> "ConfiguredAirbyteStreamBuild self._stream["stream"]["source_defined_primary_key"] = pk # type: ignore # we assume that self._stream["stream"] is a Dict[str, Any] return self + def with_json_schema(self, json_schema: Dict[str, Any]) -> "ConfiguredAirbyteStreamBuilder": + self._stream["stream"]["json_schema"] = json_schema + return self + def build(self) -> ConfiguredAirbyteStream: return ConfiguredAirbyteStream.parse_obj(self._stream) @@ -45,10 +49,12 @@ def with_stream(self, name: ConfiguredAirbyteStreamBuilder) -> "CatalogBuilder": ... @overload - def with_stream(self, name: str, sync_mode: SyncMode) -> "CatalogBuilder": + def with_stream(self, name: str, sync_mode: SyncMode, json_schema: Dict[str, Any]) -> "CatalogBuilder": ... - def with_stream(self, name: Union[str, ConfiguredAirbyteStreamBuilder], sync_mode: Union[SyncMode, None] = None) -> "CatalogBuilder": + def with_stream( + self, name: Union[str, ConfiguredAirbyteStreamBuilder], sync_mode: Union[SyncMode, None] = None, json_schema: Dict[str, Any] = None + ) -> "CatalogBuilder": # As we are introducing a fully fledge ConfiguredAirbyteStreamBuilder, we would like to deprecate the previous interface # with_stream(str, SyncMode) @@ -57,7 +63,7 @@ def with_stream(self, name: Union[str, ConfiguredAirbyteStreamBuilder], sync_mod builder = ( name_or_builder if isinstance(name_or_builder, ConfiguredAirbyteStreamBuilder) - else ConfiguredAirbyteStreamBuilder().with_name(name_or_builder).with_sync_mode(sync_mode) + else ConfiguredAirbyteStreamBuilder().with_name(name_or_builder).with_sync_mode(sync_mode).with_json_schema(json_schema or {}) ) self._streams.append(builder) return self diff --git a/airbyte-cdk/python/airbyte_cdk/test/entrypoint_wrapper.py b/airbyte-cdk/python/airbyte_cdk/test/entrypoint_wrapper.py index 35f68af210f07..8b3a0fd123b36 100644 --- a/airbyte-cdk/python/airbyte_cdk/test/entrypoint_wrapper.py +++ b/airbyte-cdk/python/airbyte_cdk/test/entrypoint_wrapper.py @@ -22,6 +22,7 @@ from pathlib import Path from typing import Any, List, Mapping, Optional, Union +from airbyte_cdk.connector import BaseConnector from airbyte_cdk.entrypoint import AirbyteEntrypoint from airbyte_cdk.exception_handler import assemble_uncaught_exception from airbyte_cdk.logger import AirbyteLogFormatter @@ -145,6 +146,30 @@ def _run_command(source: Source, args: List[str], expecting_exception: bool = Fa return EntrypointOutput(messages + captured_logs, uncaught_exception) +def _get_catalog(args: List[str]) -> Mapping[str, Any]: + try: + catalog = AirbyteEntrypoint.extract_catalog(args) + return BaseConnector.read_config(catalog) + except Exception as exception: + print("Printing unexpected error from entrypoint_wrapper") + print("".join(traceback.format_exception(None, exception, exception.__traceback__))) + + +def get_catalog(config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog): + with tempfile.TemporaryDirectory() as tmp_directory: + tmp_directory_path = Path(tmp_directory) + config_file = make_file(tmp_directory_path / "config.json", config) + catalog_file = make_file(tmp_directory_path / "catalog.json", catalog.json()) + args = [ + "read", + "--config", + config_file, + "--catalog", + catalog_file, + ] + return _get_catalog(args) + + def discover( source: Source, config: Mapping[str, Any], diff --git a/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py b/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py index 473b0781125f8..36a341bcee214 100644 --- a/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py +++ b/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py @@ -3,13 +3,13 @@ # from datetime import datetime, timezone -from typing import List, Optional +from typing import Any, Dict, List, Optional from unittest import TestCase import freezegun from airbyte_cdk.models import AirbyteStateBlob, ConfiguredAirbyteCatalog, SyncMode, Type from airbyte_cdk.test.catalog_builder import CatalogBuilder -from airbyte_cdk.test.entrypoint_wrapper import read +from airbyte_cdk.test.entrypoint_wrapper import get_catalog, read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest from airbyte_cdk.test.mock_http.response_builder import ( FieldPath, @@ -58,6 +58,12 @@ def _create_catalog(names_and_sync_modes: List[tuple[str, SyncMode]]) -> Configu return catalog_builder.build() +def _create_catalog_with_configured_schema(names_and_sync_modes: List[tuple[str, SyncMode, str: Any]]) -> ConfiguredAirbyteCatalog: + catalog_builder = CatalogBuilder() + for stream_name, sync_mode, json_schema in names_and_sync_modes: + catalog_builder.with_stream(name=stream_name, sync_mode=sync_mode, json_schema=json_schema) + return catalog_builder.build() + def _create_justice_songs_request() -> RequestBuilder: return RequestBuilder.justice_songs_endpoint() @@ -157,6 +163,43 @@ def test_resumable_full_refresh_sync(self, http_mocker): assert actual_messages.state_messages[3].state.stream.stream_state == AirbyteStateBlob() assert actual_messages.state_messages[3].state.sourceStats.recordCount == 0.0 + @HttpMocker() + def test_resumable_full_refresh_sync_with_configured_schema(self, http_mocker): + config = {} + + http_mocker.get( + _create_justice_songs_request().build(), + _create_response(pagination_has_more=False).with_record(record=_create_record("justice_songs")).build(), + ) + + + source = SourceFixture() + json_schema: Dict[str:any] = { + "properties": + {"id": + { + "description" : "id", + "type" : "string" + } + } + } + catalog = _create_catalog_with_configured_schema([("justice_songs", SyncMode.full_refresh, json_schema)]) + actual_messages = read(source, config=config, catalog=catalog) + + assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("justice_songs")) + assert len(actual_messages.records) == 1 + assert len(actual_messages.state_messages) == 2 + validate_message_order([Type.RECORD, Type.STATE, Type.STATE], actual_messages.records_and_state_messages) + assert actual_messages.state_messages[0].state.stream.stream_descriptor.name == "justice_songs" + assert actual_messages.state_messages[0].state.stream.stream_state == AirbyteStateBlob() + assert actual_messages.state_messages[0].state.sourceStats.recordCount == 1.0 + assert actual_messages.state_messages[1].state.stream.stream_descriptor.name == "justice_songs" + assert actual_messages.state_messages[1].state.stream.stream_state == AirbyteStateBlob() + assert actual_messages.state_messages[1].state.sourceStats.recordCount == 0.0 + + entrypoint_catalog = get_catalog(config=config ,catalog=catalog) + assert entrypoint_catalog["streams"][0]["stream"]["json_schema"] == json_schema + @HttpMocker() def test_resumable_full_refresh_second_attempt(self, http_mocker): config = {} From a5a6382b2ef2be96573c11d7e1094177c0cf4c8b Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 4 Jul 2024 13:52:12 -0600 Subject: [PATCH 2/9] Airbyte CDK: fix typing --- airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py b/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py index 7fee5f4c4908f..d8dac96fb0cc2 100644 --- a/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py +++ b/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py @@ -53,7 +53,10 @@ def with_stream(self, name: str, sync_mode: SyncMode, json_schema: Dict[str, Any ... def with_stream( - self, name: Union[str, ConfiguredAirbyteStreamBuilder], sync_mode: Union[SyncMode, None] = None, json_schema: Dict[str, Any] = None + self, + name: Union[str, ConfiguredAirbyteStreamBuilder], + sync_mode: Union[SyncMode, None] = None, + json_schema: Union[Dict[str, Any], None] = None, ) -> "CatalogBuilder": # As we are introducing a fully fledge ConfiguredAirbyteStreamBuilder, we would like to deprecate the previous interface # with_stream(str, SyncMode) From 64d5089c0eae8151672254efb0a6cf2ecddfd2ef Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 4 Jul 2024 14:14:02 -0600 Subject: [PATCH 3/9] Airbyte CDK: remove functionality to CatalogBuilder as this will be deprecated --- .../python/airbyte_cdk/test/catalog_builder.py | 11 +++-------- .../mock_server_tests/test_resumable_full_refresh.py | 10 ++++++---- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py b/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py index d8dac96fb0cc2..ce8ec4ed6c7a8 100644 --- a/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py +++ b/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py @@ -49,15 +49,10 @@ def with_stream(self, name: ConfiguredAirbyteStreamBuilder) -> "CatalogBuilder": ... @overload - def with_stream(self, name: str, sync_mode: SyncMode, json_schema: Dict[str, Any]) -> "CatalogBuilder": + def with_stream(self, name: str, sync_mode: SyncMode) -> "CatalogBuilder": ... - def with_stream( - self, - name: Union[str, ConfiguredAirbyteStreamBuilder], - sync_mode: Union[SyncMode, None] = None, - json_schema: Union[Dict[str, Any], None] = None, - ) -> "CatalogBuilder": + def with_stream(self, name: Union[str, ConfiguredAirbyteStreamBuilder], sync_mode: Union[SyncMode, None] = None) -> "CatalogBuilder": # As we are introducing a fully fledge ConfiguredAirbyteStreamBuilder, we would like to deprecate the previous interface # with_stream(str, SyncMode) @@ -66,7 +61,7 @@ def with_stream( builder = ( name_or_builder if isinstance(name_or_builder, ConfiguredAirbyteStreamBuilder) - else ConfiguredAirbyteStreamBuilder().with_name(name_or_builder).with_sync_mode(sync_mode).with_json_schema(json_schema or {}) + else ConfiguredAirbyteStreamBuilder().with_name(name_or_builder).with_sync_mode(sync_mode) ) self._streams.append(builder) return self diff --git a/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py b/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py index 36a341bcee214..8269ac4600ff0 100644 --- a/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py +++ b/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py @@ -8,7 +8,7 @@ import freezegun from airbyte_cdk.models import AirbyteStateBlob, ConfiguredAirbyteCatalog, SyncMode, Type -from airbyte_cdk.test.catalog_builder import CatalogBuilder +from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder from airbyte_cdk.test.entrypoint_wrapper import get_catalog, read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest from airbyte_cdk.test.mock_http.response_builder import ( @@ -59,10 +59,12 @@ def _create_catalog(names_and_sync_modes: List[tuple[str, SyncMode]]) -> Configu def _create_catalog_with_configured_schema(names_and_sync_modes: List[tuple[str, SyncMode, str: Any]]) -> ConfiguredAirbyteCatalog: - catalog_builder = CatalogBuilder() + stream_builder = ConfiguredAirbyteStreamBuilder() + streams = [] for stream_name, sync_mode, json_schema in names_and_sync_modes: - catalog_builder.with_stream(name=stream_name, sync_mode=sync_mode, json_schema=json_schema) - return catalog_builder.build() + streams.append(stream_builder.with_name(stream_name).with_sync_mode(sync_mode).with_json_schema(json_schema or {})) + + return ConfiguredAirbyteCatalog(streams=list(map(lambda builder: builder.build(), streams))) def _create_justice_songs_request() -> RequestBuilder: return RequestBuilder.justice_songs_endpoint() From 54301c4cfb7c8e0b52c401716e45aa78ce3daf8c Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 4 Jul 2024 14:40:11 -0600 Subject: [PATCH 4/9] Airbyte CDK: remove redundant check on catalog --- .../airbyte_cdk/test/entrypoint_wrapper.py | 25 ------------------- .../test_resumable_full_refresh.py | 5 ++-- 2 files changed, 2 insertions(+), 28 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/test/entrypoint_wrapper.py b/airbyte-cdk/python/airbyte_cdk/test/entrypoint_wrapper.py index 8b3a0fd123b36..35f68af210f07 100644 --- a/airbyte-cdk/python/airbyte_cdk/test/entrypoint_wrapper.py +++ b/airbyte-cdk/python/airbyte_cdk/test/entrypoint_wrapper.py @@ -22,7 +22,6 @@ from pathlib import Path from typing import Any, List, Mapping, Optional, Union -from airbyte_cdk.connector import BaseConnector from airbyte_cdk.entrypoint import AirbyteEntrypoint from airbyte_cdk.exception_handler import assemble_uncaught_exception from airbyte_cdk.logger import AirbyteLogFormatter @@ -146,30 +145,6 @@ def _run_command(source: Source, args: List[str], expecting_exception: bool = Fa return EntrypointOutput(messages + captured_logs, uncaught_exception) -def _get_catalog(args: List[str]) -> Mapping[str, Any]: - try: - catalog = AirbyteEntrypoint.extract_catalog(args) - return BaseConnector.read_config(catalog) - except Exception as exception: - print("Printing unexpected error from entrypoint_wrapper") - print("".join(traceback.format_exception(None, exception, exception.__traceback__))) - - -def get_catalog(config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog): - with tempfile.TemporaryDirectory() as tmp_directory: - tmp_directory_path = Path(tmp_directory) - config_file = make_file(tmp_directory_path / "config.json", config) - catalog_file = make_file(tmp_directory_path / "catalog.json", catalog.json()) - args = [ - "read", - "--config", - config_file, - "--catalog", - catalog_file, - ] - return _get_catalog(args) - - def discover( source: Source, config: Mapping[str, Any], diff --git a/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py b/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py index 8269ac4600ff0..0a58e54c056ff 100644 --- a/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py +++ b/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py @@ -9,7 +9,7 @@ import freezegun from airbyte_cdk.models import AirbyteStateBlob, ConfiguredAirbyteCatalog, SyncMode, Type from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder -from airbyte_cdk.test.entrypoint_wrapper import get_catalog, read +from airbyte_cdk.test.entrypoint_wrapper import read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest from airbyte_cdk.test.mock_http.response_builder import ( FieldPath, @@ -186,6 +186,7 @@ def test_resumable_full_refresh_sync_with_configured_schema(self, http_mocker): } } catalog = _create_catalog_with_configured_schema([("justice_songs", SyncMode.full_refresh, json_schema)]) + assert catalog.streams[0].stream.json_schema == json_schema actual_messages = read(source, config=config, catalog=catalog) assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("justice_songs")) @@ -199,8 +200,6 @@ def test_resumable_full_refresh_sync_with_configured_schema(self, http_mocker): assert actual_messages.state_messages[1].state.stream.stream_state == AirbyteStateBlob() assert actual_messages.state_messages[1].state.sourceStats.recordCount == 0.0 - entrypoint_catalog = get_catalog(config=config ,catalog=catalog) - assert entrypoint_catalog["streams"][0]["stream"]["json_schema"] == json_schema @HttpMocker() def test_resumable_full_refresh_second_attempt(self, http_mocker): From cd92a16aca4e38d0002ed3f7e47103c32978719d Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 4 Jul 2024 14:54:41 -0600 Subject: [PATCH 5/9] AirbyteCDK: fix typing --- airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py b/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py index ce8ec4ed6c7a8..c3e3578f34941 100644 --- a/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py +++ b/airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py @@ -7,7 +7,7 @@ class ConfiguredAirbyteStreamBuilder: def __init__(self) -> None: - self._stream = { + self._stream: Dict[str, Any] = { "stream": { "name": "any name", "json_schema": {}, From 3cb755e6ace5ce04510742e887a56b13473e0ff0 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Thu, 4 Jul 2024 15:08:41 -0600 Subject: [PATCH 6/9] Airbyte CDK:update _create_catalog to fully rely on ConfiguredAirbyteCatalog instead of CatalogBuilder --- .../test_resumable_full_refresh.py | 51 ++----------------- 1 file changed, 4 insertions(+), 47 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py b/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py index 0a58e54c056ff..666fff3e0f094 100644 --- a/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py +++ b/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py @@ -51,14 +51,7 @@ def build(self) -> HttpRequest: ) -def _create_catalog(names_and_sync_modes: List[tuple[str, SyncMode]]) -> ConfiguredAirbyteCatalog: - catalog_builder = CatalogBuilder() - for stream_name, sync_mode in names_and_sync_modes: - catalog_builder.with_stream(name=stream_name, sync_mode=sync_mode) - return catalog_builder.build() - - -def _create_catalog_with_configured_schema(names_and_sync_modes: List[tuple[str, SyncMode, str: Any]]) -> ConfiguredAirbyteCatalog: +def _create_catalog(names_and_sync_modes: List[tuple[str, SyncMode, str: Any]]) -> ConfiguredAirbyteCatalog: stream_builder = ConfiguredAirbyteStreamBuilder() streams = [] for stream_name, sync_mode, json_schema in names_and_sync_modes: @@ -146,7 +139,7 @@ def test_resumable_full_refresh_sync(self, http_mocker): ) source = SourceFixture() - actual_messages = read(source, config=config, catalog=_create_catalog([("justice_songs", SyncMode.full_refresh)])) + actual_messages = read(source, config=config, catalog=_create_catalog([("justice_songs", SyncMode.full_refresh, {})])) assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("justice_songs")) assert len(actual_messages.records) == 5 @@ -165,42 +158,6 @@ def test_resumable_full_refresh_sync(self, http_mocker): assert actual_messages.state_messages[3].state.stream.stream_state == AirbyteStateBlob() assert actual_messages.state_messages[3].state.sourceStats.recordCount == 0.0 - @HttpMocker() - def test_resumable_full_refresh_sync_with_configured_schema(self, http_mocker): - config = {} - - http_mocker.get( - _create_justice_songs_request().build(), - _create_response(pagination_has_more=False).with_record(record=_create_record("justice_songs")).build(), - ) - - - source = SourceFixture() - json_schema: Dict[str:any] = { - "properties": - {"id": - { - "description" : "id", - "type" : "string" - } - } - } - catalog = _create_catalog_with_configured_schema([("justice_songs", SyncMode.full_refresh, json_schema)]) - assert catalog.streams[0].stream.json_schema == json_schema - actual_messages = read(source, config=config, catalog=catalog) - - assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("justice_songs")) - assert len(actual_messages.records) == 1 - assert len(actual_messages.state_messages) == 2 - validate_message_order([Type.RECORD, Type.STATE, Type.STATE], actual_messages.records_and_state_messages) - assert actual_messages.state_messages[0].state.stream.stream_descriptor.name == "justice_songs" - assert actual_messages.state_messages[0].state.stream.stream_state == AirbyteStateBlob() - assert actual_messages.state_messages[0].state.sourceStats.recordCount == 1.0 - assert actual_messages.state_messages[1].state.stream.stream_descriptor.name == "justice_songs" - assert actual_messages.state_messages[1].state.stream.stream_state == AirbyteStateBlob() - assert actual_messages.state_messages[1].state.sourceStats.recordCount == 0.0 - - @HttpMocker() def test_resumable_full_refresh_second_attempt(self, http_mocker): config = {} @@ -229,7 +186,7 @@ def test_resumable_full_refresh_second_attempt(self, http_mocker): ) source = SourceFixture() - actual_messages = read(source, config=config, catalog=_create_catalog([("justice_songs", SyncMode.full_refresh)]), state=state) + actual_messages = read(source, config=config, catalog=_create_catalog([("justice_songs", SyncMode.full_refresh, {})]), state=state) assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("justice_songs")) assert len(actual_messages.records) == 8 @@ -265,7 +222,7 @@ def test_resumable_full_refresh_failure(self, http_mocker): http_mocker.get(_create_justice_songs_request().with_page(2).build(), _create_response().with_status_code(status_code=400).build()) source = SourceFixture() - actual_messages = read(source, config=config, catalog=_create_catalog([("justice_songs", SyncMode.full_refresh)]), expecting_exception=True) + actual_messages = read(source, config=config, catalog=_create_catalog([("justice_songs", SyncMode.full_refresh, {})]), expecting_exception=True) status_messages = actual_messages.get_stream_statuses("justice_songs") assert status_messages[-1] == AirbyteStreamStatus.INCOMPLETE From c64b3649f93af0e0366d175bb79648be918186fc Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Fri, 5 Jul 2024 06:13:10 -0600 Subject: [PATCH 7/9] Airbyte CDK: fix styles --- .../sources/mock_server_tests/test_resumable_full_refresh.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py b/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py index 666fff3e0f094..937ef1043cfee 100644 --- a/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py +++ b/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py @@ -3,12 +3,12 @@ # from datetime import datetime, timezone -from typing import Any, Dict, List, Optional +from typing import Any, List, Optional from unittest import TestCase import freezegun from airbyte_cdk.models import AirbyteStateBlob, ConfiguredAirbyteCatalog, SyncMode, Type -from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder +from airbyte_cdk.test.catalog_builder import ConfiguredAirbyteStreamBuilder from airbyte_cdk.test.entrypoint_wrapper import read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest from airbyte_cdk.test.mock_http.response_builder import ( @@ -59,6 +59,7 @@ def _create_catalog(names_and_sync_modes: List[tuple[str, SyncMode, str: Any]]) return ConfiguredAirbyteCatalog(streams=list(map(lambda builder: builder.build(), streams))) + def _create_justice_songs_request() -> RequestBuilder: return RequestBuilder.justice_songs_endpoint() From ea75fc7ff232cc2ba1ffdbc96b99dd6af554b048 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Fri, 5 Jul 2024 06:19:04 -0600 Subject: [PATCH 8/9] Airbyte CDK: fix typing --- .../sources/mock_server_tests/test_resumable_full_refresh.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py b/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py index 937ef1043cfee..e3f40d2867301 100644 --- a/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py +++ b/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py @@ -3,7 +3,7 @@ # from datetime import datetime, timezone -from typing import Any, List, Optional +from typing import Dict, Any, List, Optional from unittest import TestCase import freezegun @@ -51,7 +51,7 @@ def build(self) -> HttpRequest: ) -def _create_catalog(names_and_sync_modes: List[tuple[str, SyncMode, str: Any]]) -> ConfiguredAirbyteCatalog: +def _create_catalog(names_and_sync_modes: List[tuple[str, SyncMode, Dict[str, Any]]]) -> ConfiguredAirbyteCatalog: stream_builder = ConfiguredAirbyteStreamBuilder() streams = [] for stream_name, sync_mode, json_schema in names_and_sync_modes: From b8e67949cb0cf9cbac26d82118eab1f6b1012cd7 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Fri, 5 Jul 2024 06:27:15 -0600 Subject: [PATCH 9/9] Airbyte CDK: fix format --- .../sources/mock_server_tests/test_resumable_full_refresh.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py b/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py index e3f40d2867301..cd303f34a92ff 100644 --- a/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py +++ b/airbyte-cdk/python/unit_tests/sources/mock_server_tests/test_resumable_full_refresh.py @@ -3,7 +3,7 @@ # from datetime import datetime, timezone -from typing import Dict, Any, List, Optional +from typing import Any, Dict, List, Optional from unittest import TestCase import freezegun