Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow the CA and CRL to be file paths #484

Merged
merged 2 commits into from
Oct 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 68 additions & 48 deletions fedmsg/crypto/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
import os
import requests
import time


# A simple dictionary to cache certificates in
_cached_certificates = dict()

_log = logging.getLogger(__name__)


Expand Down Expand Up @@ -98,56 +99,75 @@ def validate_policy(topic, signer, routing_policy, nitpicky=False):
return True


def _load_remote_cert(location, cache, cache_expiry, tries=3, **config):
"""Get a fresh copy from fp.o/fedmsg/crl.pem if ours is getting stale.

Return the local filename.
def load_certificates(ca_location, crl_location=None, invalidate_cache=False):
"""
Load the CA certificate and CRL, caching it for future use.

.. note:: This is not a public API and is subject to change.
.. note::
Providing the location of the CA and CRL as an HTTPS URL is deprecated
and will be removed in a future release.

Args:
location (str): The URL where the certificate is hosted.
cache (str): The absolute path where the certificate should be stored.
cache_expiry (int): How long the cache should be considered fresh, in seconds.
tries (int): The number of times to attempt to retry downloading the certificate.
ca_location (str): The location of the Certificate Authority certificate. This should
be the absolute path to a PEM-encoded file. It can also be an HTTPS url, but this
is deprecated and will be removed in a future release.
crl_location (str): The location of the Certificate Revocation List. This should
be the absolute path to a PEM-encoded file. It can also be an HTTPS url, but
this is deprecated and will be removed in a future release.
invalidate_cache (bool): Whether or not to invalidate the certificate cache.

Returns:
tuple: A tuple of the (CA certificate, CRL) as unicode strings.

Raises:
requests.exception.RequestException: Any exception requests could raise.
IOError: If the location provided could not be opened and read.
"""
alternative_cache = os.path.expanduser("~/.local" + cache)
if crl_location is None:
crl_location = ''

try:
modtime = os.stat(cache).st_mtime
except OSError:
# File does not exist yet.
try:
# Try alternative location.
modtime = os.stat(alternative_cache).st_mtime
# It worked! Use the alternative location
cache = alternative_cache
except OSError:
# Neither file exists
modtime = 0

if (
(not modtime and not cache_expiry) or
(cache_expiry and time.time() - modtime > cache_expiry)
):
try:
with requests.Session() as session:
session.mount('http://', requests.adapters.HTTPAdapter(max_retries=tries))
session.mount('https://', requests.adapters.HTTPAdapter(max_retries=tries))
response = session.get(location, timeout=30)
with open(cache, 'w') as f:
f.write(response.text)
except IOError:
# If we couldn't write to the specified cache location, try a
# similar place but inside our home directory instead.
cache = alternative_cache
usr_dir = '/'.join(cache.split('/')[:-1])

if not os.path.isdir(usr_dir):
os.makedirs(usr_dir)

with open(cache, 'w') as f:
f.write(response.text)

return cache
if invalidate_cache:
del _cached_certificates[ca_location + crl_location]
else:
return _cached_certificates[ca_location + crl_location]
except KeyError:
pass

ca, crl = None, None
if ca_location:
ca = _load_certificate(ca_location)
if crl_location:
crl = _load_certificate(crl_location)

_cached_certificates[ca_location + crl_location] = ca, crl
return ca, crl


def _load_certificate(location):
"""
Load a certificate from the given location.

Args:
location (str): The location to load. This can either be an HTTPS URL or an absolute file
path. This is intended to be used with PEM-encoded certificates and therefore assumes
ASCII encoding.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to mention the deprecation here too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a note of it in the public interface


Returns:
str: The PEM-encoded certificate as a unicode string.

Raises:
requests.exception.RequestException: Any exception requests could raise.
IOError: If the location provided could not be opened and read.
"""
if location.startswith('https://'):
_log.info('Downloading x509 certificate from %s', location)
with requests.Session() as session:
session.mount('https://', requests.adapters.HTTPAdapter(max_retries=3))
response = session.get(location, timeout=30)
response.raise_for_status()
return response.text
else:
_log.info('Loading local x509 certificate from %s', location)
with open(location, 'rb') as fd:
return fd.read().decode('ascii')
61 changes: 38 additions & 23 deletions fedmsg/crypto/x509.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
""" ``fedmsg.crypto.x509`` - X.509 backend for :mod:`fedmsg.crypto`. """

import logging
import os
import tempfile
import warnings

from requests.exceptions import RequestException
import six
try:
# Else we need M2Crypto and m2ext
Expand All @@ -31,7 +34,7 @@
except ImportError:
_m2crypto = False

from .utils import _load_remote_cert, validate_policy
from . import utils
from .x509_ng import _cryptography, sign as _crypto_sign, validate as _crypto_validate
import fedmsg.crypto # noqa: E402
import fedmsg.encoding # noqa: E402
Expand Down Expand Up @@ -136,28 +139,40 @@ def fail(reason):
# validate_certificate will one day be a part of M2Crypto.SSL.Context
# https://bugzilla.osafoundation.org/show_bug.cgi?id=11690

default_ca_cert_loc = 'https://fedoraproject.org/fedmsg/ca.crt'
cafile = _load_remote_cert(
config.get('ca_cert_location', default_ca_cert_loc),
config.get('ca_cert_cache', '/etc/pki/fedmsg/ca.crt'),
config.get('ca_cert_cache_expiry', 0),
**config)

ctx = m2ext.SSL.Context()
ctx.load_verify_locations(cafile=cafile)
if not ctx.validate_certificate(cert):
return fail("X509 certificate is not valid.")

# Load and check against the CRL
crl = None
if 'crl_location' in config and 'crl_cache' in config:
crl = _load_remote_cert(
config.get('crl_location', 'https://fedoraproject.org/fedmsg/crl.pem'),
config.get('crl_cache', '/var/cache/fedmsg/crl.pem'),
config.get('crl_cache_expiry', 1800),
**config)
ca_location = config.get('ca_cert_location', 'https://fedoraproject.org/fedmsg/ca.crt')
crl_location = config.get('crl_location', 'https://fedoraproject.org/fedmsg/crl.pem')
fd, cafile = tempfile.mkstemp()
try:
ca_certificate, crl = utils.load_certificates(ca_location, crl_location)
os.write(fd, ca_certificate.encode('ascii'))
os.fsync(fd)
ctx = m2ext.SSL.Context()
ctx.load_verify_locations(cafile=cafile)
if not ctx.validate_certificate(cert):
ca_certificate, crl = utils.load_certificates(
ca_location, crl_location, invalidate_cache=True)
with open(cafile, 'w') as f:
f.write(ca_certificate)
ctx = m2ext.SSL.Context()
ctx.load_verify_locations(cafile=cafile)
if not ctx.validate_certificate(cert):
return fail("X509 certificate is not valid.")
except (IOError, RequestException) as e:
_log.error(str(e))
return False
finally:
os.close(fd)
os.remove(cafile)

if crl:
crl = M2Crypto.X509.load_crl(crl)
try:
fd, crlfile = tempfile.mkstemp(text=True)
os.write(fd, crl.encode('ascii'))
os.fsync(fd)
crl = M2Crypto.X509.load_crl(crlfile)
finally:
os.close(fd)
os.remove(crlfile)
# FIXME -- We need to check that the CRL is signed by our own CA.
# See https://bugzilla.osafoundation.org/show_bug.cgi?id=12954#c2
# if not ctx.validate_certificate(crl):
Expand Down Expand Up @@ -206,7 +221,7 @@ def fail(reason):
signer = subject.get_entries_by_nid(subject.nid['CN'])[0]\
.get_data().as_text()

return validate_policy(
return utils.validate_policy(
message.get('topic'), signer, routing_policy, config.get('routing_nitpicky', False))


Expand Down
42 changes: 15 additions & 27 deletions fedmsg/crypto/x509_ng.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
_cryptography = True
except ImportError: # pragma: no cover
_cryptography = False
from requests.exceptions import RequestException
import six

from . import utils
Expand Down Expand Up @@ -164,35 +165,22 @@ def validate(message, ssldir=None, **config):
certificate = base64.b64decode(message['certificate'])
message = fedmsg.crypto.strip_credentials(message)

crl_file = None
if 'crl_location' in config and 'crl_cache' in config:
crl_file = utils._load_remote_cert(
config.get('crl_location', 'https://fedoraproject.org/fedmsg/crl.pem'),
config.get('crl_cache', '/var/cache/fedmsg/crl.pem'),
config.get('crl_cache_expiry', 1800),
**config
)

ca_file = utils._load_remote_cert(
config.get('ca_cert_location', 'https://fedoraproject.org/fedmsg/ca.crt'),
config.get('ca_cert_cache', '/etc/pki/fedmsg/ca.crt'),
config.get('ca_cert_cache_expiry', 0),
**config
)

with open(ca_file, 'rb') as fd:
ca_certificate = fd.read()

crl = None
if crl_file:
with open(crl_file, 'rb') as fd:
crl = fd.read()

# Unfortunately we can't change this defaulting to Fedora behavior until
# fedmsg-2.0
ca_location = config.get('ca_cert_location', 'https://fedoraproject.org/fedmsg/ca.crt')
crl_location = config.get('crl_location', 'https://fedoraproject.org/fedmsg/crl.pem')
try:
ca_certificate, crl = utils.load_certificates(ca_location, crl_location)
_validate_signing_cert(ca_certificate, certificate, crl)
except X509StoreContextError as e:
_log.error(str(e))
return False
except (IOError, RequestException, X509StoreContextError) as e:
# Maybe the CA/CRL is expired or just rotated, so invalidate the cache and try again
try:
ca_certificate, crl = utils.load_certificates(
ca_location, crl_location, invalidate_cache=True)
_validate_signing_cert(ca_certificate, certificate, crl)
except (IOError, RequestException, X509StoreContextError) as e:
_log.error(str(e))
return False

# Validate the signature of the message itself
try:
Expand Down
8 changes: 2 additions & 6 deletions fedmsg/tests/consumers/test_consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,8 @@ def setUp(self):
'dummy': True,
'ssldir': SSLDIR,
'certname': 'shell-app01.phx2.fedoraproject.org',
'ca_cert_cache': os.path.join(SSLDIR, 'ca.crt'),
'ca_cert_cache_expiry': 1497618475, # Stop fedmsg overwriting my CA, See Issue 420

'crl_location': "http://threebean.org/fedmsg-tests/crl.pem",
'crl_cache': os.path.join(SSLDIR, 'crl.pem'),
'crl_cache_expiry': 1497618475,
'ca_cert_location': os.path.join(SSLDIR, 'ca.crt'),
'crl_location': os.path.join(SSLDIR, 'crl.pem'),
'crypto_validate_backends': ['x509'],
}
self.hub = mock.Mock(config=self.config)
Expand Down
64 changes: 34 additions & 30 deletions fedmsg/tests/crypto/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import shutil
import tempfile
import time
import unittest

import mock
Expand Down Expand Up @@ -105,43 +104,48 @@ def test_topic_missing_not_nitpicky(self):
self.assertTrue(result)


class LoadRemoteCertTests(base.FedmsgTestCase):
class LoadCertificateTests(base.FedmsgTestCase):
"""Tests for :func:`utils._load_remote_cert`."""

def setUp(self):
super(LoadRemoteCertTests, self).setUp()
super(LoadCertificateTests, self).setUp()

self.cache_dir = tempfile.mkdtemp()
self.cache_file = os.path.join(self.cache_dir, 'ca.crt')
self.addCleanup(shutil.rmtree, self.cache_dir, True)

def test_remote_cert(self):
"""Assert downloading a certificate to a cache location works."""
"""Assert requesting a remote certificate works and is cached."""
location = 'https://fedoraproject.org/fedmsg/ca.crt'
with open(os.path.join(base.SSLDIR, 'fedora_ca.crt'), 'r') as fd:
expected_cert = fd.read()
utils._load_remote_cert('https://fedoraproject.org/fedmsg/ca.crt', self.cache_file, 0)

self.assertTrue(os.path.exists(self.cache_file))
with open(self.cache_file, 'r') as fd:
actual_cert = fd.read()
self.assertEqual(expected_cert, actual_cert)

@mock.patch('fedmsg.crypto.utils.os.stat')
def test_valid_cache(self, mock_stat):
"""Assert when the primary cache is valid it's used."""
mock_stat.return_value.st_mtime = time.time()
cache = utils._load_remote_cert('https://example.com/ca.crt', '/my/ca.crt', 60)

self.assertEqual('/my/ca.crt', cache)
mock_stat.assert_called_once_with('/my/ca.crt')

@mock.patch('fedmsg.crypto.utils.os.stat')
def test_valid_alternate_cache(self, mock_stat):
"""Assert when the alternate cache is valid it's used."""
mock_stat.side_effect = [OSError, mock.Mock(st_mtime=time.time())]
cache = utils._load_remote_cert('https://example.com/ca.crt', '/my/ca.crt', 60)

self.assertEqual(os.path.expanduser('~/.local/my/ca.crt'), cache)
self.assertEqual('/my/ca.crt', mock_stat.call_args_list[0][0][0])
self.assertEqual(
os.path.expanduser('~/.local/my/ca.crt'), mock_stat.call_args_list[1][0][0])

with mock.patch.dict('fedmsg.crypto.utils._cached_certificates', clear=True):
ca, crl = utils.load_certificates(location)
self.assertEqual((expected_cert, None), utils._cached_certificates[location])
self.assertEqual(expected_cert, ca)
self.assertTrue(crl is None)

@mock.patch('fedmsg.crypto.utils._load_certificate')
def test_valid_cache(self, mock_load_cert):
"""Assert when the cache is present it is used."""
location = '/crt'

with mock.patch.dict('fedmsg.crypto.utils._cached_certificates', {'/crt': ('crt', None)}):
ca, crl = utils.load_certificates(location)
self.assertEqual('crt', ca)
self.assertTrue(crl is None)
self.assertEqual(0, mock_load_cert.call_count)

@mock.patch('fedmsg.crypto.utils._load_certificate')
def test_invalidate_cache(self, mock_load_cert):
"""Assert when the cache is present it is used."""
location = '/crt'
mock_load_cert.return_value = 'fresh_ca'

with mock.patch.dict('fedmsg.crypto.utils._cached_certificates', {'/crt': ('crt', None)}):
ca, crl = utils.load_certificates(location, invalidate_cache=True)
self.assertEqual(('fresh_ca', None), utils._cached_certificates[location])
self.assertEqual('fresh_ca', ca)
self.assertTrue(crl is None)
mock_load_cert.called_once_with('/crt')
Loading