Skip to content

Commit

Permalink
Merge branch 'master' into daryna/source-tik-tok-marketing/migartion-…
Browse files Browse the repository at this point in the history
…to-low-code
  • Loading branch information
darynaishchenko committed Jul 1, 2024
2 parents 381c17a + 34e92b4 commit 076ca01
Show file tree
Hide file tree
Showing 1,488 changed files with 21,035 additions and 18,075 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.63.2
current_version = 0.63.3
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/connectors_insights.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ jobs:
run: echo "GOOGLE_APPLICATION_CREDENTIALS=$HOME/gcp-sa-key.json" >> $GITHUB_ENV
- name: Run connectors insights
run: |
poetry -C airbyte-ci/connectors/connectors_insights run connectors-insights generate --gcs-uri=gs://prod-airbyte-cloud-connector-metadata-service/connector_insights --connector-directory airbyte-integrations/connectors/ --concurrency 10 ${{ inputs.rewrite == true && '--rewrite' || ''}}
poetry -C airbyte-ci/connectors/connectors_insights run connectors-insights generate --gcs-uri=gs://prod-airbyte-cloud-connector-metadata-service/connector_insights --connector-directory airbyte-integrations/connectors/ --concurrency 10 ${{ inputs.rewrite == 'true' && '--rewrite' || ''}}
4 changes: 2 additions & 2 deletions .github/workflows/connectors_up_to_date.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ concurrency:

on:
schedule:
# Runs everyday at 12:00 UTC
- cron: "0 12 * * *"
# Runs everyday Saturday at 12:00 UTC
- cron: "0 12 * * 6"
workflow_dispatch:
inputs:
connectors-options:
Expand Down
37 changes: 20 additions & 17 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@

This page will walk through the process of developing with the Java CDK.

- [Developing with the Java CDK](#developing-with-the-java-cdk)
- [Intro to the Java CDK](#intro-to-the-java-cdk)
- [What is included in the Java CDK?](#what-is-included-in-the-java-cdk)
- [How is the CDK published?](#how-is-the-cdk-published)
- [Using the Java CDK](#using-the-java-cdk)
- [Building the CDK](#building-the-cdk)
- [Bumping the CDK version](#bumping-the-cdk-version)
- [Publishing the CDK](#publishing-the-cdk)
- [Developing Connectors with the Java CDK](#developing-connectors-with-the-java-cdk)
- [Referencing the CDK from Java connectors](#referencing-the-cdk-from-java-connectors)
- [Developing a connector alongside the CDK](#developing-a-connector-alongside-the-cdk)
- [Publishing the CDK and switching to a pinned CDK reference](#publishing-the-cdk-and-switching-to-a-pinned-cdk-reference)
- [Troubleshooting CDK Dependency Caches](#troubleshooting-cdk-dependency-caches)
- [Developing a connector against a pinned CDK version](#developing-a-connector-against-a-pinned-cdk-version)
- [Changelog](#changelog)
- [Java CDK](#java-cdk)
* [Developing with the Java CDK](#developing-with-the-java-cdk)
* [Intro to the Java CDK](#intro-to-the-java-cdk)
* [What is included in the Java CDK?](#what-is-included-in-the-java-cdk)
* [How is the CDK published?](#how-is-the-cdk-published)
* [Using the Java CDK](#using-the-java-cdk)
* [Building the CDK](#building-the-cdk)
* [Bumping the CDK version](#bumping-the-cdk-version)
* [Publishing the CDK](#publishing-the-cdk)
* [Developing Connectors with the Java CDK](#developing-connectors-with-the-java-cdk)
* [Referencing the CDK from Java connectors](#referencing-the-cdk-from-java-connectors)
* [Developing a connector alongside the CDK](#developing-a-connector-alongside-the-cdk)
* [Publishing the CDK and switching to a pinned CDK reference](#publishing-the-cdk-and-switching-to-a-pinned-cdk-reference)
* [Troubleshooting CDK Dependency Caches](#troubleshooting-cdk-dependency-caches)
* [Developing a connector against a pinned CDK version](#developing-a-connector-against-a-pinned-cdk-version)
* [Changelog](#changelog)
* [Java CDK](#java-cdk)

## Intro to the Java CDK

Expand Down Expand Up @@ -173,7 +173,10 @@ corresponds to that version.
### Java CDK

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.40.5 | 2024-06-26 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.40.4 | 2024-06-18 | [\#40254](https://github.com/airbytehq/airbyte/pull/40254) | Destinations: Do not throw on unrecognized airbyte message type (ignore message instead) |
| 0.40.3 | 2024-06-18 | [\#39526](https://github.com/airbytehq/airbyte/pull/39526) | Destinations: INCOMPLETE stream status is a TRANSIENT error rather than SYSTEM |
| 0.40.2 | 2024-06-18 | [\#39552](https://github.com/airbytehq/airbyte/pull/39552) | Destinations: Throw error if the ConfiguredCatalog has no streams |
| 0.40.1 | 2024-06-14 | [\#39349](https://github.com/airbytehq/airbyte/pull/39349) | Source stats for full refresh streams |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
}

@Throws(SQLException::class)
fun executeWithinTransaction(queries: List<String>) {
fun executeWithinTransaction(queries: List<String>, logStatements: Boolean = true) {
execute { connection: Connection ->
connection.autoCommit = false
for (s in queries) {
LOGGER.info("executing query within transaction: $s")
if (logStatements) {
LOGGER.info("executing query within transaction: $s")
}
connection.createStatement().execute(s)
LOGGER.info("done executing query within transaction: $s")
if (logStatements) {
LOGGER.info("done executing query within transaction: $s")
}
}
connection.commit()
connection.autoCommit = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,15 @@ constructor(
* do it without touching buffer manager.
*/
val partialAirbyteMessage =
airbyteMessageDeserializer.deserializeAirbyteMessage(
message,
)
try {
airbyteMessageDeserializer.deserializeAirbyteMessage(
message,
)
} catch (e: AirbyteMessageDeserializer.UnrecognizedAirbyteMessageTypeException) {
logger.warn { "Ignoring unrecognized message type: ${e.message}" }
return
}

when (partialAirbyteMessage.type) {
AirbyteMessage.Type.RECORD -> {
validateRecord(partialAirbyteMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package io.airbyte.cdk.integrations.destination.async.deser

import com.fasterxml.jackson.databind.exc.ValueInstantiationException
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
Expand All @@ -13,16 +14,27 @@ private val logger = KotlinLogging.logger {}
class AirbyteMessageDeserializer(
private val dataTransformer: StreamAwareDataTransformer = IdentityDataTransformer(),
) {
class UnrecognizedAirbyteMessageTypeException(private val unrecognizedType: String) :
Exception(unrecognizedType) {
override fun toString(): String {
return "Could not deserialize AirbyteMessage: unrecognized type: $unrecognizedType"
}
}

/**
* Deserializes to a [PartialAirbyteMessage] which can represent both a Record or a State
* Message
* Deserializes to a [PartialAirbyteMessage] which can represent both a Record, State, or Trace
* Message.
*
* Throws on deserialization errors, obfuscating the error message to avoid data leakage. In
* recoverable cases (currently only when the top-level message type is unrecognized), throws a
* dedicated exception.
*
* PartialAirbyteMessage holds either:
* * entire serialized message string when message is a valid State Message
* * serialized AirbyteRecordMessage when message is a valid Record Message
*
* @param message the string to deserialize
* @return PartialAirbyteMessage if the message is valid, empty otherwise
* @return PartialAirbyteMessage if the message is valid
*/
fun deserializeAirbyteMessage(
message: String?,
Expand All @@ -32,8 +44,29 @@ class AirbyteMessageDeserializer(
// Use JsonSubTypes and extend StdDeserializer to properly handle this.
// Make immutability a first class citizen in the PartialAirbyteMessage class.
val partial =
Jsons.tryDeserializeExact(message, PartialAirbyteMessage::class.java).orElseThrow {
RuntimeException("Unable to deserialize PartialAirbyteMessage.")
try {
Jsons.deserializeExactUnchecked(message, PartialAirbyteMessage::class.java)
} catch (e: ValueInstantiationException) {
// This is a hack to catch unrecognized message types. Jackson supports
// the equivalent via annotations, but we cannot use them because the
// AirbyteMessage
// is generated from json-schema.
val pat =
Regex("Cannot construct instance of .*AirbyteMessage.Type., problem: ([_A-Z]+)")
val match = pat.find(e.message!!)
if (match != null) {
val unrecognized = match.groups[1]?.value
logger.warn { "Unrecognized message type: $unrecognized" }
throw UnrecognizedAirbyteMessageTypeException(unrecognized!!)
} else {
val obfuscated = Jsons.obfuscateDeserializationException(e)
throw RuntimeException(
"ValueInstantiationException when deserializing PartialAirbyteMessage: $obfuscated"
)
}
} catch (e: Exception) {
val obfuscated = Jsons.obfuscateDeserializationException(e)
throw RuntimeException("Could not deserialize PartialAirbyteMessage: $obfuscated")
}

val msgType = partial.type
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.40.3
version=0.40.5
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import java.util.function.Consumer
import java.util.stream.Stream
import org.apache.commons.lang3.RandomStringUtils
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
Expand Down Expand Up @@ -732,4 +733,38 @@ class AsyncStreamConsumerTest {
}
assertEquals(expRecords, actualRecords)
}

@Test
internal fun deserializeAirbyteMessageWithUnrecognizedType() {
val airbyteMessage = AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
val serialized = Jsons.serialize(airbyteMessage)
// Fake an upstream protocol change
val retyped =
serialized.replace(AirbyteMessage.Type.RECORD.toString(), "__UNKNOWN_TYPE_OF_MESSAGE__")
// Assert that this doesn't throw an exception
consumer.start()
assertDoesNotThrow { consumer.accept(retyped, retyped.length) }
}

@Test
internal fun deserializeAirbyteMessageWithUnrecognizedNonTypeEnum() {
// NOTE: We are only guaranteeing failure on the top-level message type. Anything else
// should break deserialization and result in an *obfuscated* error message.
val airbyteMessage =
AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withState(
AirbyteStateMessage().withType(AirbyteStateMessage.AirbyteStateType.STREAM)
)
val serialized = Jsons.serialize(airbyteMessage)
// Fake an upstream protocol change
val offender = "__UNKNOWN_NONTYPE_ENUM__"
val retyped = serialized.replace("STREAM", offender)
// Assert that this doesn't throw an exception
consumer.start()
val throwable =
assertThrows(RuntimeException::class.java) { consumer.accept(retyped, retyped.length) }
// Ensure that the offending data has been scrubbed from the error message
assertFalse(throwable.message!!.contains(offender))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ object Jsons {
}
}

// WARNING: This message throws bare exceptions on parse failure which might
// leak sensitive data. Use obfuscateDeserializationException() to strip
// the sensitive data before logging.
@JvmStatic
fun <T : Any> deserializeExactUnchecked(jsonString: String?, klass: Class<T>?): T {
return OBJECT_MAPPER_EXACT.readValue(jsonString, klass)
}

@JvmStatic
fun <T : Any> tryDeserialize(jsonString: String, klass: Class<T>): Optional<T> {
return try {
Expand Down Expand Up @@ -425,9 +433,17 @@ object Jsons {
* potentially-sensitive information. </snip...>
*/
private fun <T : Any> handleDeserThrowable(throwable: Throwable): Optional<T> {
// Manually build the stacktrace, excluding the top-level exception object
// so that we don't accidentally include the exception message.
// Otherwise we could just do ExceptionUtils.getStackTrace(t).
val obfuscated = obfuscateDeserializationException(throwable)
LOGGER.warn { "Failed to deserialize json due to $obfuscated" }
return Optional.empty()
}

/**
* Build a stacktrace from the given throwable, enabling us to log or rethrow without leaking
* sensitive information in the exception message (which would be exposed with eg,
* ExceptionUtils.getStackTrace(t).)
*/
fun obfuscateDeserializationException(throwable: Throwable): String {
var t: Throwable = throwable
val sb = StringBuilder()
sb.append(t.javaClass)
Expand All @@ -444,8 +460,7 @@ object Jsons {
sb.append(traceElement.toString())
}
}
LOGGER.warn { "Failed to deserialize json due to $sb" }
return Optional.empty()
return sb.toString()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,8 @@ def create_simple_retriever(
)
url_base = model.requester.url_base if hasattr(model.requester, "url_base") else requester.get_url_base()
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})

# Define cursor only if per partition or common incremental support is needed
cursor = stream_slicer if isinstance(stream_slicer, DeclarativeCursor) else None

cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
from airbyte_cdk.sources.file_based.types import StreamSlice
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
Expand All @@ -42,7 +43,7 @@
"""


@deprecated("This class is experimental. Use at your own risk.")
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream):
@classmethod
def create_from_stream(
Expand Down
4 changes: 4 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
TCatalog = TypeVar("TCatalog")


class ExperimentalClassWarning(DeprecationWarning):
pass


class BaseSource(BaseConnector[TConfig], ABC, Generic[TConfig, TState, TCatalog]):
@abstractmethod
def read_state(self, state_path: str) -> TState:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
from typing import Any, Iterable, Mapping, Optional

from airbyte_cdk.models import AirbyteStream
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from deprecated.classic import deprecated


@deprecated("This class is experimental. Use at your own risk.")
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class AbstractStream(ABC):
"""
AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from airbyte_cdk.sources import AbstractSource, Source
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
Expand All @@ -38,7 +39,7 @@
"""


@deprecated("This class is experimental. Use at your own risk.")
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class StreamFacade(AbstractStreamFacade[DefaultStream], Stream):
"""
The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.
Expand Down Expand Up @@ -326,7 +327,7 @@ def generate(self) -> Iterable[Partition]:
)


@deprecated("This class is experimental. Use at your own risk.")
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class AvailabilityStrategyFacade(AvailabilityStrategy):
def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy):
self._abstract_availability_strategy = abstract_availability_strategy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from abc import ABC, abstractmethod
from typing import Optional

from airbyte_cdk.sources.source import ExperimentalClassWarning
from deprecated.classic import deprecated


Expand Down Expand Up @@ -46,7 +47,7 @@ def message(self) -> Optional[str]:
STREAM_AVAILABLE = StreamAvailable()


@deprecated("This class is experimental. Use at your own risk.")
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class AbstractAvailabilityStrategy(ABC):
"""
AbstractAvailabilityStrategy is an experimental interface developed as part of the Concurrent CDK.
Expand Down
2 changes: 2 additions & 0 deletions airbyte-cdk/python/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ log_cli = 1
log_cli_level = INFO
log_cli_format = %(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)
log_cli_date_format=%Y-%m-%d %H:%M:%S
filterwarnings =
ignore::airbyte_cdk.sources.source.ExperimentalClassWarning
Loading

0 comments on commit 076ca01

Please sign in to comment.