From 826d689abb1e841722d9eee8173a65aa4a95b7fa Mon Sep 17 00:00:00 2001 From: Sukanta Roy <96112956+sukantaroy01@users.noreply.github.com> Date: Sat, 7 Sep 2024 06:07:41 +0530 Subject: [PATCH] Fix: Resolve issue where PyAirbyte would fail if property names contain the dot character (`'.'`), e.g. with `source-google-ads` (#343) Co-authored-by: Ajit Pratap Singh <18012955+ajitpratap0@users.noreply.github.com> Co-authored-by: uditchaudhary --- .gitignore | 3 ++ airbyte/shared/catalog_providers.py | 24 ++++++++++---- .../fixtures/source-test/source_test/run.py | 32 +++++++++++++++++++ tests/unit_tests/test_text_normalization.py | 1 + 4 files changed, 54 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 9b752740..1666fe96 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,6 @@ dmypy.json # Cython debug symbols cython_debug/ + +# Pycharm +.idea \ No newline at end of file diff --git a/airbyte/shared/catalog_providers.py b/airbyte/shared/catalog_providers.py index 6887f1e6..bf22a76b 100644 --- a/airbyte/shared/catalog_providers.py +++ b/airbyte/shared/catalog_providers.py @@ -16,6 +16,7 @@ ) from airbyte import exceptions as exc +from airbyte._util.name_normalizers import LowerCaseNormalizer from airbyte.strategies import WriteMethod, WriteStrategy @@ -149,13 +150,24 @@ def get_primary_keys( if not pks: return [] - joined_pks = [".".join(pk) for pk in pks] - for pk in joined_pks: - if "." in pk: - msg = f"Nested primary keys are not yet supported. Found: {pk}" - raise NotImplementedError(msg) + normalized_pks: list[list[str]] = [ + [LowerCaseNormalizer.normalize(c) for c in pk] for pk in pks + ] - return joined_pks + for pk_nodes in normalized_pks: + if len(pk_nodes) != 1: + raise exc.AirbyteError( + message=( + "Nested primary keys are not supported. " + "Each PK column should have exactly one node. " + ), + context={ + "stream_name": stream_name, + "primary_key_nodes": pk_nodes, + }, + ) + + return [pk_nodes[0] for pk_nodes in normalized_pks] def get_cursor_key( self, diff --git a/tests/integration_tests/fixtures/source-test/source_test/run.py b/tests/integration_tests/fixtures/source-test/source_test/run.py index 5455a5c3..bcf2b77e 100644 --- a/tests/integration_tests/fixtures/source-test/source_test/run.py +++ b/tests/integration_tests/fixtures/source-test/source_test/run.py @@ -65,6 +65,23 @@ }, }, }, + { + "name": "primary-key-with-dot", + "description": "This stream has a primary key with dot similar what is there in GAds.", + "source_defined_primary_key": [["table1.Column1"]], + "source_defined_cursor": False, + "supported_sync_modes": ["full_refresh"], + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "table1.Column1": {"type": "string"}, + "table1.Column2": {"type": "number"}, + "table1.empty_column": {"type": "string"}, + "table1.big_number": {"type": "number"}, + }, + }, + }, ] }, } @@ -137,6 +154,19 @@ "emitted_at": 1704067200, }, } +sample_record_primary_key_with_dot = { + "type": "RECORD", + "record": { + "data": { + "table1.Column1": "value1", + "table1.Column2": 1, + "table1.empty_column": None, + "table1.big_number": 1234567890123456, + }, + "stream": "primary-key-with-dot", + "emitted_at": 1704067200, + }, +} def parse_args(): @@ -184,3 +214,5 @@ def run(): print(json.dumps(sample_record2_stream1)) elif stream["stream"]["name"] == "stream2": print(json.dumps(sample_record_stream2)) + elif stream["stream"]["name"] == "primary-key-with-dot": + print(json.dumps(sample_record_primary_key_with_dot)) diff --git a/tests/unit_tests/test_text_normalization.py b/tests/unit_tests/test_text_normalization.py index 8c0cd2ea..b246c65c 100644 --- a/tests/unit_tests/test_text_normalization.py +++ b/tests/unit_tests/test_text_normalization.py @@ -211,6 +211,7 @@ def test_case_insensitive_w_pretty_keys( ("", "", True), ("*", "", True), ("!@$", "", True), + ("some.col", "some_col", False), ], ) def test_lower_case_normalizer(