Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): Support config variable for specifying a direct privat… #6609

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,21 @@ class BaseSnowflakeConfig(BaseTimeWindowConfig):
password: Optional[pydantic.SecretStr] = pydantic.Field(
default=None, exclude=True, description="Snowflake password."
)
private_key: Optional[str] = pydantic.Field(
default=None,
description="Private key in a form of '-----BEGIN PRIVATE KEY-----\\nprivate-key\\n-----END PRIVATE KEY-----\\n' if using key pair authentication. Encrypted version of private key will be in a form of '-----BEGIN ENCRYPTED PRIVATE KEY-----\\nencrypted-private-key\\n-----END ECNCRYPTED PRIVATE KEY-----\\n' See: https://docs.snowflake.com/en/user-guide/key-pair-auth.html",
)

private_key_path: Optional[str] = pydantic.Field(
default=None,
description="The path to the private key if using key pair authentication. See: https://docs.snowflake.com/en/user-guide/key-pair-auth.html",
description="The path to the private key if using key pair authentication. Ignored if `private_key` is set. See: https://docs.snowflake.com/en/user-guide/key-pair-auth.html",
)
private_key_password: Optional[pydantic.SecretStr] = pydantic.Field(
default=None,
exclude=True,
description="Password for your private key if using key pair authentication.",
description="Password for your private key. Required if using key pair authentication with encrypted private key.",
)

oauth_config: Optional[OauthConfiguration] = pydantic.Field(
default=None,
description="oauth configuration - https://docs.snowflake.com/en/user-guide/python-connector-example.html#connecting-with-oauth",
Expand Down Expand Up @@ -182,10 +188,13 @@ def authenticator_type_is_valid(cls, v, values, field):
)
if v == "KEY_PAIR_AUTHENTICATOR":
# If we are using key pair auth, we need the private key path and password to be set
if values.get("private_key_path") is None:
if (
values.get("private_key") is None
and values.get("private_key_path") is None
):
raise ValueError(
f"'private_key_path' was none "
f"but should be set when using {v} authentication"
f"Both `private_key` and `private_key_path` are none. "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i see that we're throwing the error below, but ideally we should just be doing the error checking in one place

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is the main place I intend to keep error checking at. The assertion in below method was added primarily for linter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To remove the assert, we can add if.. elif.. , however assert seemed cleaner to me.

f"At least one should be set when using {v} authentication"
)
elif v == "OAUTH_AUTHENTICATOR":
if values.get("oauth_config") is None:
Expand Down Expand Up @@ -275,16 +284,22 @@ def get_sql_alchemy_connect_args(self) -> dict:
if self.authentication_type != "KEY_PAIR_AUTHENTICATOR":
return {}
if self.connect_args is None:
if self.private_key_path is None:
raise ValueError("missing required private key path to read key from")
if self.private_key_password is None:
raise ValueError("missing required private key password")
with open(self.private_key_path, "rb") as key:
p_key = serialization.load_pem_private_key(
key.read(),
password=self.private_key_password.get_secret_value().encode(),
backend=default_backend(),
)
if self.private_key is not None:
pkey_bytes = self.private_key.replace("\\n", "\n").encode()
else:
assert (
self.private_key_path
), "missing required private key path to read key from"
with open(self.private_key_path, "rb") as key:
pkey_bytes = key.read()

p_key = serialization.load_pem_private_key(
pkey_bytes,
password=self.private_key_password.get_secret_value().encode()
if self.private_key_password is not None
else None,
backend=default_backend(),
)

pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER,
Expand Down