Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: Refactor new "Writers" interface for destinations and caches #326

Merged
merged 10 commits into from
Sep 2, 2024
69 changes: 69 additions & 0 deletions airbyte/_future_cdk/catalog_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

from __future__ import annotations

import copy
from typing import TYPE_CHECKING, Any, final

from airbyte_protocol.models import (
ConfiguredAirbyteCatalog,
)

from airbyte import exceptions as exc
from airbyte.strategies import WriteMethod, WriteStrategy


if TYPE_CHECKING:
Expand Down Expand Up @@ -135,3 +137,70 @@ def from_read_result(
]
)
)

def get_primary_keys(
self,
stream_name: str,
) -> list[str]:
pks = self.get_configured_stream_info(stream_name).primary_key
if not pks:
return []

joined_pks = [".".join(pk) for pk in pks]
for pk in joined_pks:
if "." in pk:
msg = f"Nested primary keys are not yet supported. Found: {pk}"
raise NotImplementedError(msg)

return joined_pks

def get_cursor_key(
self,
stream_name: str,
) -> str | None:
return self.get_configured_stream_info(stream_name).cursor_field

def resolve_write_method(
self,
stream_name: str,
write_strategy: WriteStrategy,
) -> WriteMethod:
"""Return the write method for the given stream."""
has_pks: bool = bool(self.get_primary_keys(stream_name))
has_incremental_key: bool = bool(self.get_cursor_key(stream_name))
if write_strategy == WriteStrategy.MERGE and not has_pks:
raise exc.PyAirbyteInputError(
message="Cannot use merge strategy on a stream with no primary keys.",
context={
"stream_name": stream_name,
},
)

if write_strategy != WriteStrategy.AUTO:
return WriteMethod(write_strategy)

if has_pks:
return WriteMethod.MERGE

if has_incremental_key:
return WriteMethod.APPEND

return WriteMethod.REPLACE
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved

def with_write_strategy(
self,
write_strategy: WriteStrategy,
) -> CatalogProvider:
"""Return a new catalog provider with the specified write strategy applied.

The original catalog provider is not modified.
"""
new_catalog: ConfiguredAirbyteCatalog = copy.deepcopy(self.configured_catalog)
for stream in new_catalog.streams:
write_method = self.resolve_write_method(
stream_name=stream.stream.name,
write_strategy=write_strategy,
)
stream.destination_sync_mode = write_method.destination_sync_mode

return CatalogProvider(new_catalog)
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
300 changes: 0 additions & 300 deletions airbyte/_future_cdk/record_processor.py

This file was deleted.

Loading
Loading