Skip to content

Commit

Permalink
feat(ingest): snowflake - config variable for specifying a direct pri…
Browse files Browse the repository at this point in the history
…vate key (datahub-project#6609)
  • Loading branch information
mayurinehate authored and cccs-Dustin committed Feb 1, 2023
1 parent 18a5b56 commit 7a0c870
Showing 1 changed file with 30 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,21 @@ class BaseSnowflakeConfig(BaseTimeWindowConfig):
password: Optional[pydantic.SecretStr] = pydantic.Field(
default=None, exclude=True, description="Snowflake password."
)
private_key: Optional[str] = pydantic.Field(
default=None,
description="Private key in a form of '-----BEGIN PRIVATE KEY-----\\nprivate-key\\n-----END PRIVATE KEY-----\\n' if using key pair authentication. Encrypted version of private key will be in a form of '-----BEGIN ENCRYPTED PRIVATE KEY-----\\nencrypted-private-key\\n-----END ECNCRYPTED PRIVATE KEY-----\\n' See: https://docs.snowflake.com/en/user-guide/key-pair-auth.html",
)

private_key_path: Optional[str] = pydantic.Field(
default=None,
description="The path to the private key if using key pair authentication. See: https://docs.snowflake.com/en/user-guide/key-pair-auth.html",
description="The path to the private key if using key pair authentication. Ignored if `private_key` is set. See: https://docs.snowflake.com/en/user-guide/key-pair-auth.html",
)
private_key_password: Optional[pydantic.SecretStr] = pydantic.Field(
default=None,
exclude=True,
description="Password for your private key if using key pair authentication.",
description="Password for your private key. Required if using key pair authentication with encrypted private key.",
)

oauth_config: Optional[OauthConfiguration] = pydantic.Field(
default=None,
description="oauth configuration - https://docs.snowflake.com/en/user-guide/python-connector-example.html#connecting-with-oauth",
Expand Down Expand Up @@ -182,10 +188,13 @@ def authenticator_type_is_valid(cls, v, values, field):
)
if v == "KEY_PAIR_AUTHENTICATOR":
# If we are using key pair auth, we need the private key path and password to be set
if values.get("private_key_path") is None:
if (
values.get("private_key") is None
and values.get("private_key_path") is None
):
raise ValueError(
f"'private_key_path' was none "
f"but should be set when using {v} authentication"
f"Both `private_key` and `private_key_path` are none. "
f"At least one should be set when using {v} authentication"
)
elif v == "OAUTH_AUTHENTICATOR":
if values.get("oauth_config") is None:
Expand Down Expand Up @@ -275,16 +284,22 @@ def get_sql_alchemy_connect_args(self) -> dict:
if self.authentication_type != "KEY_PAIR_AUTHENTICATOR":
return {}
if self.connect_args is None:
if self.private_key_path is None:
raise ValueError("missing required private key path to read key from")
if self.private_key_password is None:
raise ValueError("missing required private key password")
with open(self.private_key_path, "rb") as key:
p_key = serialization.load_pem_private_key(
key.read(),
password=self.private_key_password.get_secret_value().encode(),
backend=default_backend(),
)
if self.private_key is not None:
pkey_bytes = self.private_key.replace("\\n", "\n").encode()
else:
assert (
self.private_key_path
), "missing required private key path to read key from"
with open(self.private_key_path, "rb") as key:
pkey_bytes = key.read()

p_key = serialization.load_pem_private_key(
pkey_bytes,
password=self.private_key_password.get_secret_value().encode()
if self.private_key_password is not None
else None,
backend=default_backend(),
)

pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER,
Expand Down

0 comments on commit 7a0c870

Please sign in to comment.