Skip to content

Commit

Permalink
feat: Break out host, user, port, etc. into individual config v…
Browse files Browse the repository at this point in the history
…alues (#121)

Closes #30, #5

- [x] Need to implement SSL as a configuration option as well (Going to
do separately see #117
)

---------

Co-authored-by: Edgar R. M <[email protected]>
Co-authored-by: Pat Nadolny <[email protected]>
  • Loading branch information
3 people authored Jun 23, 2023
1 parent 1a12789 commit 4482e4e
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 6 deletions.
92 changes: 86 additions & 6 deletions tap_postgres/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,91 @@
import io
import signal
from functools import cached_property
from typing import TYPE_CHECKING, Any
from typing import Any, Mapping, cast

import paramiko
from singer_sdk import SQLTap, Stream
from singer_sdk import typing as th # JSON schema typing helpers
from sqlalchemy.engine import URL
from sqlalchemy.engine.url import make_url
from sshtunnel import SSHTunnelForwarder

from tap_postgres.client import PostgresConnector, PostgresStream

if TYPE_CHECKING:
from sqlalchemy.engine.url import URL


class TapPostgres(SQLTap):
"""Singer tap for Postgres."""

name = "tap-postgres"
default_stream_class = PostgresStream

def __init__(
self,
*args,
**kwargs,
) -> None:
"""Constructor.
Should use JSON Schema instead
See https://github.com/MeltanoLabs/tap-postgres/issues/141
"""
super().__init__(*args, **kwargs)
assert (self.config.get("sqlalchemy_url") is not None) or (
self.config.get("host") is not None
and self.config.get("port") is not None
and self.config.get("user") is not None
and self.config.get("password") is not None
), (
"Need either the sqlalchemy_url to be set or host, port, user,"
+ " and password to be set"
)

config_jsonschema = th.PropertiesList(
th.Property(
"host",
th.StringType,
description=(
"Hostname for postgres instance. "
+ "Note if sqlalchemy_url is set this will be ignored."
),
),
th.Property(
"port",
th.IntegerType,
default=5432,
description=(
"The port on which postgres is awaiting connection. "
+ "Note if sqlalchemy_url is set this will be ignored."
),
),
th.Property(
"user",
th.StringType,
description=(
"User name used to authenticate. "
+ "Note if sqlalchemy_url is set this will be ignored."
),
),
th.Property(
"password",
th.StringType,
secret=True,
description=(
"Password used to authenticate. "
"Note if sqlalchemy_url is set this will be ignored."
),
),
th.Property(
"database",
th.StringType,
description=(
"Database name. "
+ "Note if sqlalchemy_url is set this will be ignored."
),
),
th.Property(
"sqlalchemy_url",
th.StringType,
required=True,
secret=True,
description=(
"Example postgresql://[username]:[password]@localhost:5432/[db_name]"
Expand Down Expand Up @@ -93,6 +153,26 @@ class TapPostgres(SQLTap):
),
).to_dict()

def get_sqlalchemy_url(self, config: Mapping[str, Any]) -> str:
"""Generate a SQLAlchemy URL.
Args:
config: The configuration for the connector.
"""
if config.get("sqlalchemy_url"):
return cast(str, config["sqlalchemy_url"])

else:
sqlalchemy_url = URL.create(
drivername="postgresql+psycopg2",
username=config["user"],
password=config["password"],
host=config["host"],
port=config["port"],
database=config["database"],
)
return cast(str, sqlalchemy_url)

@cached_property
def connector(self) -> PostgresConnector:
"""Get a configured connector for this Tap.
Expand All @@ -101,7 +181,7 @@ def connector(self) -> PostgresConnector:
"""
# We mutate this url to use the ssh tunnel if enabled
url = make_url(self.config["sqlalchemy_url"])
url = make_url(self.get_sqlalchemy_url(config=self.config))
ssh_config = self.config.get("ssh_tunnel", {})

if ssh_config.get("enable", False):
Expand Down
26 changes: 26 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@
"sqlalchemy_url": "postgresql://postgres:postgres@localhost:5432/postgres",
}

NO_SQLALCHEMY_CONFIG = {
"start_date": pendulum.datetime(2022, 11, 1).to_iso8601_string(),
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "postgres",
"database": "postgres",
}


def setup_test_table(table_name, sqlalchemy_url):
"""setup any state specific to the execution of the given module."""
Expand Down Expand Up @@ -69,6 +78,13 @@ def teardown_test_table(table_name, sqlalchemy_url):
custom_suites=[custom_test_replication_key],
)

TapPostgresTestNOSQLALCHEMY = get_tap_test_class(
tap_class=TapPostgres,
config=NO_SQLALCHEMY_CONFIG,
catalog="tests/resources/data.json",
custom_suites=[custom_test_replication_key],
)


# creating testing instance for isolated table in postgres
TapPostgresTestSelectedColumnsOnly = get_tap_test_class(
Expand All @@ -90,6 +106,16 @@ def resource(self):
yield
teardown_test_table(self.table_name, self.sqlalchemy_url)

class TestTapPostgres_NOSQLALCHMY(TapPostgresTestNOSQLALCHEMY):

table_name = TABLE_NAME
sqlalchemy_url = SAMPLE_CONFIG["sqlalchemy_url"]

@pytest.fixture(scope="class")
def resource(self):
setup_test_table(self.table_name, self.sqlalchemy_url)
yield
teardown_test_table(self.table_name, self.sqlalchemy_url)

class TestTapPostgresSelectedColumnsOnly(TapPostgresTestSelectedColumnsOnly):

Expand Down

0 comments on commit 4482e4e

Please sign in to comment.