From a9188ce9a7e3210b11d01be649f4fcc93d500ff0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 16 Jul 2024 14:05:31 -0600 Subject: [PATCH] feat: Emit Postgres schema as `_sdc_postgres_schema` --- tap_postgres/client.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tap_postgres/client.py b/tap_postgres/client.py index b21e77c9..390a46d1 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -264,6 +264,13 @@ class PostgresStream(SQLStream): # JSONB Objects won't be selected without type_conformance_level to ROOT_ONLY TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT_ONLY + @cached_property + def schema(self) -> dict: + """Override schema adding _sdc columns.""" + schema_dict = self._singer_catalog_entry.schema.to_dict() + schema_dict["properties"]["_sdc_postgres_schema"] = {"type": ["string", "null"]} + return schema_dict + def max_record_count(self) -> int | None: """Return the maximum number of records to fetch in a single query.""" return self.config.get("max_record_count") @@ -326,6 +333,7 @@ def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]: # TODO: Standardize record mapping type # https://github.com/meltano/sdk/issues/2096 transformed_record = self.post_process(dict(record)) + transformed_record["_sdc_postgres_schema"] = table.schema if transformed_record is None: # Record filtered out during post_process() continue