Skip to content

Commit

Permalink
Rename CustomTrustedRoot to TrustedRoot
Browse files Browse the repository at this point in the history
All other modules should be using our customized version so
the rename should be ok: only trustroot module needs a single
"import as" shenanigan.

Signed-off-by: Jussi Kukkonen <[email protected]>
  • Loading branch information
jku committed Jan 3, 2024
1 parent 650a857 commit a314bbc
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 35 deletions.
6 changes: 3 additions & 3 deletions sigstore/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
RekorClient,
RekorKeyring,
)
from sigstore._internal.trustroot import CustomTrustedRoot
from sigstore._internal.trustroot import TrustedRoot
from sigstore._utils import PEMCert
from sigstore.errors import Error
from sigstore.oidc import (
Expand Down Expand Up @@ -651,7 +651,7 @@ def _sign(args: argparse.Namespace) -> None:
signing_ctx = SigningContext.production()
else:
# Assume "production" trust root if no keys are given as arguments
trusted_root = CustomTrustedRoot.production()
trusted_root = TrustedRoot.production()
if args.ctfe_pem is not None:
ctfe_keys = [args.ctfe_pem.read()]
else:
Expand Down Expand Up @@ -828,7 +828,7 @@ def _collect_verification_state(
if args.rekor_root_pubkey is not None:
rekor_keys = [args.rekor_root_pubkey.read()]
else:
trusted_root = CustomTrustedRoot.production()
trusted_root = TrustedRoot.production()
rekor_keys = trusted_root.get_rekor_keys()

verifier = Verifier(
Expand Down
10 changes: 5 additions & 5 deletions sigstore/_internal/rekor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from sigstore._internal.ctfe import CTKeyring
from sigstore._internal.keyring import Keyring
from sigstore._internal.trustroot import CustomTrustedRoot
from sigstore._internal.trustroot import TrustedRoot
from sigstore.transparency import LogEntry

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -232,11 +232,11 @@ def __del__(self) -> None:
self.session.close()

@classmethod
def production(cls, trust_root: CustomTrustedRoot) -> RekorClient:
def production(cls, trust_root: TrustedRoot) -> RekorClient:
"""
Returns a `RekorClient` populated with the default Rekor production instance.
trust_root must be a `CustomTrustedRoot` for the production TUF repository.
trust_root must be a `TrustedRoot` for the production TUF repository.
"""
rekor_keys = trust_root.get_rekor_keys()
ctfe_keys = trust_root.get_ctfe_keys()
Expand All @@ -248,11 +248,11 @@ def production(cls, trust_root: CustomTrustedRoot) -> RekorClient:
)

@classmethod
def staging(cls, trust_root: CustomTrustedRoot) -> RekorClient:
def staging(cls, trust_root: TrustedRoot) -> RekorClient:
"""
Returns a `RekorClient` populated with the default Rekor staging instance.
trust_root must be a `CustomTrustedRoot` for the staging TUF repository.
trust_root must be a `TrustedRoot` for the staging TUF repository.
"""
rekor_keys = trust_root.get_rekor_keys()
ctfe_keys = trust_root.get_ctfe_keys()
Expand Down
16 changes: 9 additions & 7 deletions sigstore/_internal/trustroot.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
CertificateAuthority,
TransparencyLogInstance,
TrustedRoot,
)
from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
TrustedRoot as _TrustedRoot,
)

from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater
Expand Down Expand Up @@ -56,17 +58,17 @@ def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> boo
return allow_expired or (period.end is None or now <= period.end)


class CustomTrustedRoot(TrustedRoot):
class TrustedRoot(_TrustedRoot):
"""Complete set of trusted entities for a Sigstore client"""

@classmethod
def from_file(cls, path: str) -> "CustomTrustedRoot":
def from_file(cls, path: str) -> "TrustedRoot":
"""Create a new trust root from file"""
tr: CustomTrustedRoot = cls().from_json(Path(path).read_bytes())
tr: TrustedRoot = cls().from_json(Path(path).read_bytes())
return tr

@classmethod
def from_tuf(cls, url: str, offline: bool = False) -> "CustomTrustedRoot":
def from_tuf(cls, url: str, offline: bool = False) -> "TrustedRoot":
"""Create a new trust root from a TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
Expand All @@ -76,7 +78,7 @@ def from_tuf(cls, url: str, offline: bool = False) -> "CustomTrustedRoot":
return cls.from_file(path)

@classmethod
def production(cls, offline: bool = False) -> "CustomTrustedRoot":
def production(cls, offline: bool = False) -> "TrustedRoot":
"""Create new trust root from Sigstore production TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
Expand All @@ -85,7 +87,7 @@ def production(cls, offline: bool = False) -> "CustomTrustedRoot":
return cls.from_tuf(DEFAULT_TUF_URL, offline)

@classmethod
def staging(cls, offline: bool = False) -> "CustomTrustedRoot":
def staging(cls, offline: bool = False) -> "TrustedRoot":
"""Create new trust root from Sigstore staging TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
Expand Down
6 changes: 3 additions & 3 deletions sigstore/sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
)
from sigstore._internal.rekor.client import RekorClient
from sigstore._internal.sct import verify_sct
from sigstore._internal.trustroot import CustomTrustedRoot
from sigstore._internal.trustroot import TrustedRoot
from sigstore._utils import B64Str, HexStr, PEMCert, sha256_streaming
from sigstore.oidc import ExpiredIdentity, IdentityToken
from sigstore.transparency import LogEntry
Expand Down Expand Up @@ -271,7 +271,7 @@ def production(cls) -> SigningContext:
"""
Return a `SigningContext` instance configured against Sigstore's production-level services.
"""
trust_root = CustomTrustedRoot.production()
trust_root = TrustedRoot.production()
rekor = RekorClient.production(trust_root)
return cls(
fulcio=FulcioClient.production(),
Expand All @@ -283,7 +283,7 @@ def staging(cls) -> SigningContext:
"""
Return a `SignerContext` instance configured against Sigstore's staging-level services.
"""
trust_root = CustomTrustedRoot.staging()
trust_root = TrustedRoot.staging()
rekor = RekorClient.staging(trust_root)
return cls(
fulcio=FulcioClient.staging(),
Expand Down
6 changes: 3 additions & 3 deletions sigstore/verify/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
)
from sigstore._internal.rekor.client import RekorClient
from sigstore._internal.set import InvalidSETError, verify_set
from sigstore._internal.trustroot import CustomTrustedRoot
from sigstore._internal.trustroot import TrustedRoot
from sigstore._utils import B64Str, HexStr
from sigstore.verify.models import InvalidRekorEntry as InvalidRekorEntryError
from sigstore.verify.models import RekorEntryMissing as RekorEntryMissingError
Expand Down Expand Up @@ -126,7 +126,7 @@ def production(cls) -> Verifier:
"""
Return a `Verifier` instance configured against Sigstore's production-level services.
"""
trust_root = CustomTrustedRoot.production()
trust_root = TrustedRoot.production()
return cls(
rekor=RekorClient.production(trust_root),
fulcio_certificate_chain=trust_root.get_fulcio_certs(),
Expand All @@ -137,7 +137,7 @@ def staging(cls) -> Verifier:
"""
Return a `Verifier` instance configured against Sigstore's staging-level services.
"""
trust_root = CustomTrustedRoot.staging()
trust_root = TrustedRoot.staging()
return cls(
rekor=RekorClient.staging(trust_root),
fulcio_certificate_chain=trust_root.get_fulcio_certs(),
Expand Down
24 changes: 12 additions & 12 deletions test/unit/internal/test_trust_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from cryptography.x509 import load_pem_x509_certificate
from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange

from sigstore._internal.trustroot import CustomTrustedRoot, _is_timerange_valid
from sigstore._internal.trustroot import TrustedRoot, _is_timerange_valid
from sigstore._utils import load_der_public_key, load_pem_public_key
from sigstore.errors import RootError

Expand All @@ -30,10 +30,10 @@ def test_trust_root_tuf_caches_and_requests(mock_staging_tuf, tuf_dirs):
# start with empty target cache, empty local metadata dir
data_dir, cache_dir = tuf_dirs

# keep track of requests the TrustUpdater in CustomTrustedRoot makes
# keep track of requests the TrustUpdater invoked by TrustedRoot makes
reqs, fail_reqs = mock_staging_tuf

trust_root = CustomTrustedRoot.staging()
trust_root = TrustedRoot.staging()
# metadata was "downloaded" from staging
expected = ["root.json", "snapshot.json", "targets.json", "timestamp.json"]
assert sorted(os.listdir(data_dir)) == expected
Expand All @@ -58,7 +58,7 @@ def test_trust_root_tuf_caches_and_requests(mock_staging_tuf, tuf_dirs):
assert fail_reqs == expected_fail_reqs

# New trust root (and TrustUpdater instance), same cache dirs
trust_root = CustomTrustedRoot.staging()
trust_root = TrustedRoot.staging()

# Expect new timestamp and root requests
expected_requests["timestamp.json"] += 1
Expand All @@ -77,10 +77,10 @@ def test_trust_root_tuf_offline(mock_staging_tuf, tuf_dirs):
# start with empty target cache, empty local metadata dir
data_dir, cache_dir = tuf_dirs

# keep track of requests the TrustUpdater in CustomTrustedRoot makes
# keep track of requests the TrustUpdater invoked by TrustedRoot makes
reqs, fail_reqs = mock_staging_tuf

trust_root = CustomTrustedRoot.staging(offline=True)
trust_root = TrustedRoot.staging(offline=True)

# Only the embedded root is in local TUF metadata, nothing is downloaded
expected = ["root.json"]
Expand Down Expand Up @@ -161,39 +161,39 @@ def _pem_keys(keys):
]

# Assert that trust root from TUF contains the expected keys/certs
trust_root = CustomTrustedRoot.staging()
trust_root = TrustedRoot.staging()
assert _der_keys(trust_root.get_ctfe_keys()) == ctfe_keys
assert _der_keys(trust_root.get_rekor_keys()) == rekor_keys
assert trust_root.get_fulcio_certs() == fulcio_certs

# Assert that trust root from offline TUF contains the expected keys/certs
trust_root = CustomTrustedRoot.staging(offline=True)
trust_root = TrustedRoot.staging(offline=True)
assert _der_keys(trust_root.get_ctfe_keys()) == ctfe_keys
assert _der_keys(trust_root.get_rekor_keys()) == rekor_keys
assert trust_root.get_fulcio_certs() == fulcio_certs

# Assert that trust root from file contains the expected keys/certs
path = tuf_asset.target_path("trusted_root.json")
trust_root = CustomTrustedRoot.from_file(path)
trust_root = TrustedRoot.from_file(path)
assert _der_keys(trust_root.get_ctfe_keys()) == ctfe_keys
assert _der_keys(trust_root.get_rekor_keys()) == rekor_keys
assert trust_root.get_fulcio_certs() == fulcio_certs


def test_trust_root_tuf_instance_error():
with pytest.raises(RootError):
CustomTrustedRoot.from_tuf("foo.bar")
TrustedRoot.from_tuf("foo.bar")


def test_trust_root_tuf_ctfe_keys_error(monkeypatch):
trust_root = CustomTrustedRoot.staging(offline=True)
trust_root = TrustedRoot.staging(offline=True)
monkeypatch.setattr(trust_root, "ctlogs", [])
with pytest.raises(Exception, match="Active CTFE keys not found in trusted root"):
trust_root.get_ctfe_keys()


def test_trust_root_fulcio_certs_error(tuf_asset, monkeypatch):
trust_root = CustomTrustedRoot.staging(offline=True)
trust_root = TrustedRoot.staging(offline=True)
monkeypatch.setattr(trust_root, "certificate_authorities", [])
with pytest.raises(
Exception, match="Fulcio certificates not found in trusted root"
Expand Down
4 changes: 2 additions & 2 deletions test/unit/verify/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import pytest

from sigstore._internal.rekor.client import RekorClient
from sigstore._internal.trustroot import CustomTrustedRoot
from sigstore._internal.trustroot import TrustedRoot
from sigstore.verify.models import (
InvalidMaterials,
InvalidRekorEntry,
Expand Down Expand Up @@ -45,7 +45,7 @@ def test_verification_materials_retrieves_rekor_entry(self, signing_materials):
materials = signing_materials("a.txt")
assert materials._rekor_entry is None

trust_root = CustomTrustedRoot.staging()
trust_root = TrustedRoot.staging()
client = RekorClient.staging(trust_root)
entry = materials.rekor_entry(client)
assert entry is not None
Expand Down

0 comments on commit a314bbc

Please sign in to comment.