Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Airbyte CDK): add with_json_schema method to ConfiguredAirbyteStreamBuilder #40737

Merged
merged 12 commits into from
Jul 5, 2024
Merged
6 changes: 5 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from typing import List, Union, overload
from typing import Any, Dict, List, Union, overload

from airbyte_protocol.models import ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, SyncMode

Expand Down Expand Up @@ -32,6 +32,10 @@ def with_primary_key(self, pk: List[List[str]]) -> "ConfiguredAirbyteStreamBuild
self._stream["stream"]["source_defined_primary_key"] = pk # type: ignore # we assume that self._stream["stream"] is a Dict[str, Any]
return self

def with_json_schema(self, json_schema: Dict[str, Any]) -> "ConfiguredAirbyteStreamBuilder":
self._stream["stream"]["json_schema"] = json_schema
return self

def build(self) -> ConfiguredAirbyteStream:
return ConfiguredAirbyteStream.parse_obj(self._stream)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
#

from datetime import datetime, timezone
from typing import List, Optional
from typing import Any, Dict, List, Optional
from unittest import TestCase

import freezegun
from airbyte_cdk.models import AirbyteStateBlob, ConfiguredAirbyteCatalog, SyncMode, Type
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder
from airbyte_cdk.test.entrypoint_wrapper import read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest
from airbyte_cdk.test.mock_http.response_builder import (
Expand Down Expand Up @@ -58,6 +58,14 @@ def _create_catalog(names_and_sync_modes: List[tuple[str, SyncMode]]) -> Configu
return catalog_builder.build()


def _create_catalog_with_configured_schema(names_and_sync_modes: List[tuple[str, SyncMode, str: Any]]) -> ConfiguredAirbyteCatalog:
stream_builder = ConfiguredAirbyteStreamBuilder()
streams = []
for stream_name, sync_mode, json_schema in names_and_sync_modes:
streams.append(stream_builder.with_name(stream_name).with_sync_mode(sync_mode).with_json_schema(json_schema or {}))

return ConfiguredAirbyteCatalog(streams=list(map(lambda builder: builder.build(), streams)))

def _create_justice_songs_request() -> RequestBuilder:
return RequestBuilder.justice_songs_endpoint()

Expand Down Expand Up @@ -157,6 +165,42 @@ def test_resumable_full_refresh_sync(self, http_mocker):
assert actual_messages.state_messages[3].state.stream.stream_state == AirbyteStateBlob()
assert actual_messages.state_messages[3].state.sourceStats.recordCount == 0.0

@HttpMocker()
def test_resumable_full_refresh_sync_with_configured_schema(self, http_mocker):
aldogonzalez8 marked this conversation as resolved.
Show resolved Hide resolved
config = {}

http_mocker.get(
_create_justice_songs_request().build(),
_create_response(pagination_has_more=False).with_record(record=_create_record("justice_songs")).build(),
)


source = SourceFixture()
json_schema: Dict[str:any] = {
"properties":
{"id":
{
"description" : "id",
"type" : "string"
}
}
}
catalog = _create_catalog_with_configured_schema([("justice_songs", SyncMode.full_refresh, json_schema)])
assert catalog.streams[0].stream.json_schema == json_schema
actual_messages = read(source, config=config, catalog=catalog)

assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses("justice_songs"))
assert len(actual_messages.records) == 1
assert len(actual_messages.state_messages) == 2
validate_message_order([Type.RECORD, Type.STATE, Type.STATE], actual_messages.records_and_state_messages)
assert actual_messages.state_messages[0].state.stream.stream_descriptor.name == "justice_songs"
assert actual_messages.state_messages[0].state.stream.stream_state == AirbyteStateBlob()
assert actual_messages.state_messages[0].state.sourceStats.recordCount == 1.0
assert actual_messages.state_messages[1].state.stream.stream_descriptor.name == "justice_songs"
assert actual_messages.state_messages[1].state.stream.stream_state == AirbyteStateBlob()
assert actual_messages.state_messages[1].state.sourceStats.recordCount == 0.0


@HttpMocker()
def test_resumable_full_refresh_second_attempt(self, http_mocker):
config = {}
Expand Down
Loading