Skip to content

Commit

Permalink
Separate TUF and trusted root management code
Browse files Browse the repository at this point in the history
The purpose of this is to later enable both "--trust-root <FILE>" and
some sort of offline functionality.
 * Trusted root can now be initialized from tuf, offline tuf or
   from a file
 * _internal.tuf module is now used only from the new trustroot module
 * Tests are modified to use the CustomTrustRoot API now but they still
   (also) test the internal TUF implementation details
 * The new functionality (offline & from_file) is tested but is not
   exposed to UI
 * TrustUpdater now updates metadata when it is created (if not offline):
   This does not change application functionality as a online
   TrustUpdater is only created if a TUF update is needed

Signed-off-by: Jussi Kukkonen <[email protected]>
  • Loading branch information
jku committed Dec 19, 2023
1 parent 0883808 commit 2c23aa8
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 221 deletions.
14 changes: 7 additions & 7 deletions sigstore/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
RekorClient,
RekorKeyring,
)
from sigstore._internal.tuf import TrustUpdater
from sigstore._internal.trustroot import CustomTrustedRoot
from sigstore._utils import PEMCert
from sigstore.errors import Error
from sigstore.oidc import (
Expand Down Expand Up @@ -650,16 +650,16 @@ def _sign(args: argparse.Namespace) -> None:
elif args.fulcio_url == DEFAULT_FULCIO_URL and args.rekor_url == DEFAULT_REKOR_URL:
signing_ctx = SigningContext.production()
else:
# Assume "production" keys if none are given as arguments
updater = TrustUpdater.production()
# Assume "production" trust root if no keys are given as arguments
trusted_root = CustomTrustedRoot.production()
if args.ctfe_pem is not None:
ctfe_keys = [args.ctfe_pem.read()]
else:
ctfe_keys = updater.get_ctfe_keys()
ctfe_keys = trusted_root.get_ctfe_keys()
if args.rekor_root_pubkey is not None:
rekor_keys = [args.rekor_root_pubkey.read()]
else:
rekor_keys = updater.get_rekor_keys()
rekor_keys = trusted_root.get_rekor_keys()

ct_keyring = CTKeyring(Keyring(ctfe_keys))
rekor_keyring = RekorKeyring(Keyring(rekor_keys))
Expand Down Expand Up @@ -828,8 +828,8 @@ def _collect_verification_state(
if args.rekor_root_pubkey is not None:
rekor_keys = [args.rekor_root_pubkey.read()]
else:
updater = TrustUpdater.production()
rekor_keys = updater.get_rekor_keys()
trusted_root = CustomTrustedRoot.production()
rekor_keys = trusted_root.get_rekor_keys()

verifier = Verifier(
rekor=RekorClient(
Expand Down
18 changes: 9 additions & 9 deletions sigstore/_internal/rekor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from sigstore._internal.ctfe import CTKeyring
from sigstore._internal.keyring import Keyring
from sigstore._internal.tuf import TrustUpdater
from sigstore._internal.trustroot import CustomTrustedRoot
from sigstore.transparency import LogEntry

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -232,14 +232,14 @@ def __del__(self) -> None:
self.session.close()

@classmethod
def production(cls, updater: TrustUpdater) -> RekorClient:
def production(cls, trust_root: CustomTrustedRoot) -> RekorClient:
"""
Returns a `RekorClient` populated with the default Rekor production instance.
updater must be a `TrustUpdater` for the production TUF repository.
trust_root must be a `CustomTrustedRoot` for the production TUF repository.
"""
rekor_keys = updater.get_rekor_keys()
ctfe_keys = updater.get_ctfe_keys()
rekor_keys = trust_root.get_rekor_keys()
ctfe_keys = trust_root.get_ctfe_keys()

return cls(
DEFAULT_REKOR_URL,
Expand All @@ -248,14 +248,14 @@ def production(cls, updater: TrustUpdater) -> RekorClient:
)

@classmethod
def staging(cls, updater: TrustUpdater) -> RekorClient:
def staging(cls, trust_root: CustomTrustedRoot) -> RekorClient:
"""
Returns a `RekorClient` populated with the default Rekor staging instance.
updater must be a `TrustUpdater` for the staging TUF repository.
trust_root must be a `CustomTrustedRoot` for the staging TUF repository.
"""
rekor_keys = updater.get_rekor_keys()
ctfe_keys = updater.get_ctfe_keys()
rekor_keys = trust_root.get_rekor_keys()
ctfe_keys = trust_root.get_ctfe_keys()

return cls(
STAGING_REKOR_URL,
Expand Down
148 changes: 148 additions & 0 deletions sigstore/_internal/trustroot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Copyright 2023 The Sigstore Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Trust root management for sigstore-python.
"""

from __future__ import annotations

from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable

from cryptography.x509 import Certificate, load_der_x509_certificate
from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange
from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
CertificateAuthority,
TransparencyLogInstance,
TrustedRoot,
)

from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater
from sigstore.errors import MetadataError


def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool:
"""
Given a `period`, checks that the the current time is not before `start`. If
`allow_expired` is `False`, also checks that the current time is not after
`end`.
"""
now = datetime.now(timezone.utc)

# If there was no validity period specified, the key is always valid.
if not period:
return True

# Active: if the current time is before the starting period, we are not yet
# valid.
if now < period.start:
return False

# If we want Expired keys, the key is valid at this point. Otherwise, check
# that we are within range.
return allow_expired or (period.end is None or now <= period.end)


class CustomTrustedRoot(TrustedRoot):
"""Complete set of trusted entities for a Sigstore client"""

@classmethod
def from_file(cls, path: str) -> "CustomTrustedRoot":
"""Create a new trust root from file"""
tr: CustomTrustedRoot = cls().from_json(Path(path).read_bytes())
return tr

@classmethod
def from_tuf(cls, url: str, offline: bool = False) -> "CustomTrustedRoot":
"""Create a new trust root from a TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
"""
path = TrustUpdater(url, offline).get_trusted_root_path()
return cls.from_file(path)

@classmethod
def production(cls, offline: bool = False) -> "CustomTrustedRoot":
"""Create new trust root from Sigstore production TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
"""
return cls.from_tuf(DEFAULT_TUF_URL, offline)

@classmethod
def staging(cls, offline: bool = False) -> "CustomTrustedRoot":
"""Create new trust root from Sigstore staging TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
"""
return cls.from_tuf(STAGING_TUF_URL, offline)

@staticmethod
def _get_tlog_keys(tlogs: list[TransparencyLogInstance]) -> Iterable[bytes]:
"""Return public key contents given transparency log instances."""

for key in tlogs:
if not _is_timerange_valid(key.public_key.valid_for, allow_expired=False):
continue
key_bytes = key.public_key.raw_bytes
if key_bytes:
yield key_bytes

@staticmethod
def _get_ca_keys(
cas: list[CertificateAuthority], *, allow_expired: bool
) -> Iterable[bytes]:
"""Return public key contents given certificate authorities."""

for ca in cas:
if not _is_timerange_valid(ca.valid_for, allow_expired=allow_expired):
continue
for cert in ca.cert_chain.certificates:
yield cert.raw_bytes

def get_ctfe_keys(self) -> list[bytes]:
"""Return the active CTFE public keys contents."""
ctfes: list[bytes] = list(self._get_tlog_keys(self.ctlogs))
if not ctfes:
raise MetadataError("Active CTFE keys not found in trusted root")
return ctfes

def get_rekor_keys(self) -> list[bytes]:
"""Return the rekor public key content."""
keys: list[bytes] = list(self._get_tlog_keys(self.tlogs))

if len(keys) != 1:
raise MetadataError("Did not find one active Rekor key in trusted root")
return keys

def get_fulcio_certs(self) -> list[Certificate]:
"""Return the Fulcio certificates."""

certs: list[Certificate]

# Return expired certificates too: they are expired now but may have
# been active when the certificate was used to sign.
certs = [
load_der_x509_certificate(c)
for c in self._get_ca_keys(self.certificate_authorities, allow_expired=True)
]

if not certs:
raise MetadataError("Fulcio certificates not found in trusted root")
return certs
Loading

0 comments on commit 2c23aa8

Please sign in to comment.