Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCPSigner import #480

Merged
merged 11 commits into from
Jan 11, 2023
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Issues = "https://github.com/secure-systems-lab/securesystemslib/issues"

[project.optional-dependencies]
crypto = ["cryptography>=37.0.0"]
gcpkms = ["google-cloud-kms"]
gcpkms = ["google-cloud-kms", "cryptography>=37.0.0"]
pynacl = ["pynacl>1.2.0"]
PySPX = ["PySPX==0.5.0"]
asn1 = ["asn1crypto"]
Expand Down
95 changes: 90 additions & 5 deletions securesystemslib/signer/_gcp_signer.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
"""Signer implementation for Google Cloud KMS"""

import logging
from typing import Optional
from typing import Optional, Tuple
from urllib import parse

import securesystemslib.hash as sslib_hash
from securesystemslib import exceptions
from securesystemslib.keys import _get_keyid
from securesystemslib.signer._key import Key
from securesystemslib.signer._signer import SecretsHandler, Signature, Signer
from securesystemslib.signer._signer import (
SecretsHandler,
Signature,
Signer,
SSlibKey,
)

logger = logging.getLogger(__name__)

GCP_IMPORT_ERROR = None
try:
from google.cloud import kms
from google.cloud.kms_v1.types import CryptoKeyVersion
except ImportError:
GCP_IMPORT_ERROR = (
"google-cloud-kms library required to sign with Google Cloud keys."
Expand All @@ -29,9 +36,14 @@ class GCPSigner(Signer):
The signer uses "ambient" credentials: typically environment var
GOOGLE_APPLICATION_CREDENTIALS that points to a file with valid
credentials. These will be found by google.cloud.kms, see
https://cloud.google.com/docs/authentication/getting-started
(and https://github.com/google-github-actions/auth for the relevant
GitHub action).
https://cloud.google.com/docs/authentication/getting-started.
Some practical authentication options include:
* GitHub Action: https://github.com/google-github-actions/auth
* gcloud CLI: https://cloud.google.com/sdk/gcloud

The specific permissions that GCPSigner needs are:
* roles/cloudkms.signer for sign()
* roles/cloudkms.publicKeyViewer for import()

Arguments:
gcp_keyid: Fully qualified GCP KMS key name, like
Expand Down Expand Up @@ -71,6 +83,79 @@ def from_priv_key_uri(

return cls(uri.path, public_key)

@classmethod
def import_(cls, gcp_keyid: str) -> Tuple[str, Key]:
"""Load key and signer details from KMS

Returns the private key uri and the public key. This method should only
be called once per key: the uri and Key should be stored for later use.
"""
if GCP_IMPORT_ERROR:
raise exceptions.UnsupportedLibraryError(GCP_IMPORT_ERROR)

client = kms.KeyManagementServiceClient()
request = {"name": gcp_keyid}
kms_pubkey = client.get_public_key(request)
try:
keytype, scheme = cls._get_keytype_and_scheme(kms_pubkey.algorithm)
except KeyError as e:
raise exceptions.UnsupportedAlgorithmError(
f"{kms_pubkey.algorithm} is not a supported signing algorithm"
) from e

keyval = {"public": kms_pubkey.pem}
keyid = _get_keyid(keytype, scheme, keyval)
public_key = SSlibKey(keyid, keytype, scheme, keyval)

return f"{cls.SCHEME}:{gcp_keyid}", public_key

@staticmethod
def _get_keytype_and_scheme(algorithm: int) -> Tuple[str, str]:
"""Return keytype and scheme for the KMS algorithm enum"""
keytypes_and_schemes = {
CryptoKeyVersion.CryptoKeyVersionAlgorithm.EC_SIGN_P256_SHA256: (
"ecdsa",
"ecdsa-sha2-nistp256",
),
CryptoKeyVersion.CryptoKeyVersionAlgorithm.EC_SIGN_P384_SHA384: (
"ecdsa",
"ecdsa-sha2-nistp384",
),
CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PSS_2048_SHA256: (
"rsa",
"rsassa-pss-sha256",
),
CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PSS_3072_SHA256: (
"rsa",
"rsassa-pss-sha256",
),
CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PSS_4096_SHA256: (
"rsa",
"rsassa-pss-sha256",
),
CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PSS_4096_SHA512: (
"rsa",
"rsassa-pss-sha512",
),
CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_2048_SHA256: (
"rsa",
"rsa-pkcs1v15-sha256",
),
CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_3072_SHA256: (
"rsa",
"rsa-pkcs1v15-sha256",
),
CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_4096_SHA256: (
"rsa",
"rsa-pkcs1v15-sha256",
),
CryptoKeyVersion.CryptoKeyVersionAlgorithm.RSA_SIGN_PKCS1_4096_SHA512: (
"rsa",
"rsa-pkcs1v15-sha512",
),
}
return keytypes_and_schemes[algorithm]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth creating a ticket to test these all schemes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that would be nice -- it will require creating all of these keys in the KMS though


@staticmethod
def _get_hash_algorithm(public_key: Key) -> str:
"""Helper function to return payload hash algorithm used for this key"""
Expand Down
24 changes: 13 additions & 11 deletions securesystemslib/signer/_hsm_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from securesystemslib import KEY_TYPE_ECDSA
from securesystemslib.exceptions import UnsupportedLibraryError
from securesystemslib.keys import _get_keyid
from securesystemslib.signer._key import Key, SSlibKey
from securesystemslib.signer._signature import Signature
from securesystemslib.signer._signer import SecretsHandler, Signer
Expand Down Expand Up @@ -191,13 +192,14 @@ def _find_key_values(
return ECDomainParameters.load(bytes(params)), bytes(point)

@classmethod
def pubkey_from_hsm(
cls, sslib_keyid: str, hsm_keyid: Optional[int] = None
) -> SSlibKey:
"""Export public key from HSM.
def import_(cls, hsm_keyid: Optional[int] = None) -> Tuple[str, SSlibKey]:
"""Import public key and signer details from HSM.

Returns a private key URI (for Signer.from_priv_key_uri()) and a public
key. import_() should be called once and the returned URI and public
key should be stored for later use.

Arguments:
sslib_keyid: Key identifier that is unique within the metadata it is used in.
hsm_keyid: Key identifier on the token. Default is 2 (meaning PIV key slot 9c).

Raises:
Expand Down Expand Up @@ -240,12 +242,12 @@ def pubkey_from_hsm(
.decode()
)

return SSlibKey(
sslib_keyid,
KEY_TYPE_ECDSA,
_SCHEME_FOR_CURVE[curve],
{"public": public_pem},
)
keyval = {"public": public_pem}
scheme = _SCHEME_FOR_CURVE[curve]
keyid = _get_keyid(KEY_TYPE_ECDSA, scheme, keyval)
key = SSlibKey(keyid, KEY_TYPE_ECDSA, scheme, keyval)

return "hsm:", key

@classmethod
def from_priv_key_uri(
Expand Down
48 changes: 31 additions & 17 deletions tests/check_kms_signers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,25 @@
import unittest

from securesystemslib.exceptions import UnverifiedSignatureError
from securesystemslib.signer import Key, Signer
from securesystemslib.signer import GCPSigner, Key, Signer


class TestKMSKeys(unittest.TestCase):
"""Test that KMS keys can be used to sign."""

def test_gcp(self):
pubkey = Key.from_dict(
"218611b80052667026c221f8774249b0f6b8b310d30a5c45a3b878aa3a02f39e",
{
"keytype": "ecdsa",
"scheme": "ecdsa-sha2-nistp256",
"keyval": {
"public": "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/ptvrXYuUc2ZaKssHhtg/IKNbO1X\ncDWlbKqLNpaK62MKdOwDz1qlp5AGHZkTY9tO09iq1F16SvVot1BQ9FJ2dw==\n-----END PUBLIC KEY-----\n"
},
},
)
gcp_id = "projects/python-tuf-kms/locations/global/keyRings/securesystemslib-tests/cryptoKeys/ecdsa-sha2-nistp256/cryptoKeyVersions/1"

def test_gcp_sign(self):
"""Test that GCP KMS key works for signing

NOTE: The KMS account is setup to only accept requests from the
Expand All @@ -35,25 +47,27 @@ def test_gcp(self):
"""

data = "data".encode("utf-8")
pubkey = Key.from_dict(
"abcd",
{
"keyid": "abcd",
"keytype": "ecdsa",
"scheme": "ecdsa-sha2-nistp256",
"keyval": {
"public": "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/ptvrXYuUc2ZaKssHhtg/IKNbO1X\ncDWlbKqLNpaK62MKdOwDz1qlp5AGHZkTY9tO09iq1F16SvVot1BQ9FJ2dw==\n-----END PUBLIC KEY-----\n"
},
},
)
gcp_id = "projects/python-tuf-kms/locations/global/keyRings/securesystemslib-tests/cryptoKeys/ecdsa-sha2-nistp256/cryptoKeyVersions/1"

signer = Signer.from_priv_key_uri(f"gcpkms:{gcp_id}", pubkey)
signer = Signer.from_priv_key_uri(f"gcpkms:{self.gcp_id}", self.pubkey)
sig = signer.sign(data)

pubkey.verify_signature(sig, data)
self.pubkey.verify_signature(sig, data)
with self.assertRaises(UnverifiedSignatureError):
pubkey.verify_signature(sig, b"NOT DATA")
self.pubkey.verify_signature(sig, b"NOT DATA")

def test_gcp_import(self):
"""Test that GCP KMS key can be imported

NOTE: The KMS account is setup to only accept requests from the
Securesystemslib GitHub Action environment: test cannot pass elsewhere.

In case of problems with KMS account, please file an issue and
assign @jku.
"""

uri, key = GCPSigner.import_(self.gcp_id)
self.assertEqual(key, self.pubkey)
self.assertEqual(uri, f"gcpkms:{self.gcp_id}")


if __name__ == "__main__":
Expand Down
9 changes: 3 additions & 6 deletions tests/test_hsm_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class TestHSM(unittest.TestCase):
See .github/workflows/hsm.yml for how this can be done on Linux, macOS and Windows.
"""

sslib_keyid = "a" * 64 # Mock SSlibKey conform sha256 hex digest keyid
hsm_keyid = 1
hsm_keyid_default = 2
hsm_user_pin = "123456"
Expand Down Expand Up @@ -139,7 +138,7 @@ def test_hsm(self):
"""Test HSM key export and signing."""

for hsm_keyid in [self.hsm_keyid, self.hsm_keyid_default]:
key = HSMSigner.pubkey_from_hsm(self.sslib_keyid, hsm_keyid)
_, key = HSMSigner.import_(hsm_keyid)
signer = HSMSigner(hsm_keyid, key, lambda sec: self.hsm_user_pin)
sig = signer.sign(b"DATA")
key.verify_signature(sig, b"DATA")
Expand All @@ -150,11 +149,9 @@ def test_hsm(self):
def test_hsm_uri(self):
"""Test HSM default key export and signing from URI."""

key = HSMSigner.pubkey_from_hsm(
self.sslib_keyid, self.hsm_keyid_default
)
uri, key = HSMSigner.import_(self.hsm_keyid_default)
signer = Signer.from_priv_key_uri(
"hsm:", key, lambda sec: self.hsm_user_pin
uri, key, lambda sec: self.hsm_user_pin
)
sig = signer.sign(b"DATA")
key.verify_signature(sig, b"DATA")
Expand Down