Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Filter schemas based on user-provided config #218

Merged
merged 2 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com).
| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
| sqlalchemy_url | False | None | Example postgresql://[username]:[password]@localhost:5432/[db_name] |
| filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified Postgres schemas and ignore others. If left blank, the tap automatically determines ALL available Postgres schemas. |
| ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object |
| ssh_tunnel.enable | True (if ssh_tunnel set) | False | Enable an ssh tunnel (also known as bastion host), see the other ssh_tunnel.* properties for more details.
| ssh_tunnel.host | True (if ssh_tunnel set) | False | Host of the bastion host, this is the host we'll connect to via ssh
Expand Down
16 changes: 16 additions & 0 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from singer_sdk import SQLConnector, SQLStream
from singer_sdk import typing as th
from singer_sdk.helpers._typing import TypeConformanceLevel
from sqlalchemy.engine import Engine
from sqlalchemy.engine.reflection import Inspector

if TYPE_CHECKING:
from sqlalchemy.dialects import postgresql
Expand Down Expand Up @@ -165,6 +167,20 @@ def sdk_typing_object(

return sqltype_lookup["string"] # safe failover to str

def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
"""Return a list of schema names in DB, or overrides with user-provided values.

Args:
engine: SQLAlchemy engine
inspected: SQLAlchemy inspector instance for engine

Returns:
List of schema names
"""
if "filter_schemas" in self.config and len(self.config["filter_schemas"]) != 0:
return self.config["filter_schemas"]
return super().get_schema_names(engine, inspected)


class PostgresStream(SQLStream):
"""Stream class for Postgres streams."""
Expand Down
9 changes: 9 additions & 0 deletions tap_postgres/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ def __init__(
"Example postgresql://[username]:[password]@localhost:5432/[db_name]"
),
),
th.Property(
"filter_schemas",
th.ArrayType(th.StringType),
description=(
"If an array of schema names is provided, the tap will only process "
"the specified Postgres schemas and ignore others. If left blank, the "
"tap automatically determines ALL available Postgres schemas."
),
),
th.Property(
"ssh_tunnel",
th.ObjectType(
Expand Down
26 changes: 25 additions & 1 deletion tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import datetime
import decimal
import json
Expand All @@ -9,7 +10,7 @@
from singer_sdk.testing import get_tap_test_class, suites
from singer_sdk.testing.runners import TapTestRunner
from sqlalchemy import Column, DateTime, Integer, MetaData, Numeric, String, Table
from sqlalchemy.dialects.postgresql import DATE, JSONB, TIME, TIMESTAMP, JSON
from sqlalchemy.dialects.postgresql import BIGINT, DATE, JSON, JSONB, TIME, TIMESTAMP
from test_replication_key import TABLE_NAME, TapTestReplicationKey
from test_selected_columns_only import (
TABLE_NAME_SELECTED_COLUMNS_ONLY,
Expand Down Expand Up @@ -290,6 +291,29 @@ def test_decimal():
assert "number" in schema_message["schema"]["properties"]["column"]["type"]


def test_filter_schemas():
"""Only return tables from a given schema"""
table_name = "test_filter_schemas"
engine = sqlalchemy.create_engine(SAMPLE_CONFIG["sqlalchemy_url"])

metadata_obj = MetaData()
table = Table(table_name, metadata_obj, Column("id", BIGINT), schema="new_schema")

with engine.connect() as conn:
conn.execute("CREATE SCHEMA IF NOT EXISTS new_schema")
if table.exists(conn):
table.drop(conn)
metadata_obj.create_all(conn)
filter_schemas_config = copy.deepcopy(SAMPLE_CONFIG)
filter_schemas_config.update({"filter_schemas": ["new_schema"]})
tap = TapPostgres(config=filter_schemas_config)
tap_catalog = json.loads(tap.catalog_json_text)
altered_table_name = f"new_schema-{table_name}"
# Check that the only stream in the catalog is the one table put into new_schema
assert len(tap_catalog["streams"]) == 1
assert tap_catalog["streams"][0]["stream"] == altered_table_name


class PostgresTestRunner(TapTestRunner):
def run_sync_dry_run(self) -> bool:
"""
Expand Down