Skip to content

Commit

Permalink
Merge pull request #1779 from explody/improved_verify
Browse files Browse the repository at this point in the history
Improved verify
  • Loading branch information
castrapel authored Oct 4, 2018
2 parents eb7eb6a + 79033f4 commit 3185272
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 36 deletions.
5 changes: 4 additions & 1 deletion lemur/certificates/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,10 @@ def check_revoked():
else:
status = verify_string(cert.body, "")

cert.status = 'valid' if status else 'revoked'
if status is None:
cert.status = 'unknown'
else:
cert.status = 'valid' if status else 'revoked'

except Exception as e:
sentry.captureException()
Expand Down
93 changes: 63 additions & 30 deletions lemur/certificates/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@
"""
import requests
import subprocess
from flask import current_app
from requests.exceptions import ConnectionError, InvalidSchema
from cryptography import x509
from cryptography.hazmat.backends import default_backend

from lemur.utils import mktempfile
from lemur.common.utils import parse_certificate

crl_cache = {}

def ocsp_verify(cert_path, issuer_chain_path):

def ocsp_verify(cert, cert_path, issuer_chain_path):
"""
Attempts to verify a certificate via OCSP. OCSP is a more modern version
of CRL in that it will query the OCSP URI in order to determine if the
certificate has been revoked
:param cert:
:param cert_path:
:param issuer_chain_path:
:return bool: True if certificate is valid, False otherwise
Expand All @@ -29,8 +33,14 @@ def ocsp_verify(cert_path, issuer_chain_path):
p1 = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
url, err = p1.communicate()

if not url:
current_app.logger.debug("No OCSP URL in certificate {}".format(cert.serial_number))
return None

p2 = subprocess.Popen(['openssl', 'ocsp', '-issuer', issuer_chain_path,
'-cert', cert_path, "-url", url.strip()], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
'-cert', cert_path, "-url", url.strip()],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

message, err = p2.communicate()

Expand All @@ -40,55 +50,69 @@ def ocsp_verify(cert_path, issuer_chain_path):
raise Exception("Got error when parsing OCSP url")

elif 'revoked' in p_message:
return
current_app.logger.debug("OCSP reports certificate revoked: {}".format(cert.serial_number))
return False

elif 'good' not in p_message:
raise Exception("Did not receive a valid response")

return True


def crl_verify(cert_path):
def crl_verify(cert, cert_path):
"""
Attempts to verify a certificate using CRL.
:param cert:
:param cert_path:
:return: True if certificate is valid, False otherwise
:raise Exception: If certificate does not have CRL
"""
with open(cert_path, 'rt') as c:
cert = parse_certificate(c.read())

distribution_points = cert.extensions.get_extension_for_oid(x509.OID_CRL_DISTRIBUTION_POINTS).value
try:
distribution_points = cert.extensions.get_extension_for_oid(
x509.OID_CRL_DISTRIBUTION_POINTS
).value
except x509.ExtensionNotFound:
current_app.logger.debug("No CRLDP extension in certificate {}".format(cert.serial_number))
return None

for p in distribution_points:
point = p.full_name[0].value

try:
response = requests.get(point)

if response.status_code != 200:
if point not in crl_cache:
current_app.logger.debug("Retrieving CRL: {}".format(point))
try:
response = requests.get(point)

if response.status_code != 200:
raise Exception("Unable to retrieve CRL: {0}".format(point))
except InvalidSchema:
# Unhandled URI scheme (like ldap://); skip this distribution point.
continue
except ConnectionError:
raise Exception("Unable to retrieve CRL: {0}".format(point))
except InvalidSchema:
# Unhandled URI scheme (like ldap://); skip this distribution point.
continue
except ConnectionError:
raise Exception("Unable to retrieve CRL: {0}".format(point))

crl = x509.load_der_x509_crl(response.content, backend=default_backend())
crl_cache[point] = x509.load_der_x509_crl(response.content,
backend=default_backend())
else:
current_app.logger.debug("CRL point is cached {}".format(point))

for r in crl:
if cert.serial == r.serial_number:
for r in crl_cache[point]:
if cert.serial_number == r.serial_number:
try:
reason = r.extensions.get_extension_for_class(x509.CRLReason).value
# Handle "removeFromCRL" revoke reason as unrevoked; continue with the next distribution point.
# Per RFC 5280 section 6.3.3 (k): https://tools.ietf.org/html/rfc5280#section-6.3.3
# Handle "removeFromCRL" revoke reason as unrevoked;
# continue with the next distribution point.
# Per RFC 5280 section 6.3.3 (k):
# https://tools.ietf.org/html/rfc5280#section-6.3.3
if reason == x509.ReasonFlags.remove_from_crl:
break
except x509.ExtensionNotFound:
pass

return
current_app.logger.debug("CRL reports certificate "
"revoked: {}".format(cert.serial_number))
return False

return True

Expand All @@ -101,15 +125,24 @@ def verify(cert_path, issuer_chain_path):
:param issuer_chain_path:
:return: True if valid, False otherwise
"""
with open(cert_path, 'rt') as c:
try:
cert = parse_certificate(c.read())
except ValueError as e:
current_app.logger.error(e)
return None

# OCSP is our main source of truth, in a lot of cases CRLs
# have been deprecated and are no longer updated
try:
return ocsp_verify(cert_path, issuer_chain_path)
except Exception as e:
try:
return crl_verify(cert_path)
except Exception as e:
raise Exception("Failed to verify")
verify_result = ocsp_verify(cert, cert_path, issuer_chain_path)

if verify_result is None:
verify_result = crl_verify(cert, cert_path)

if verify_result is None:
current_app.logger.debug("Failed to verify {}".format(cert.serial_number))

return verify_result


def verify_string(cert_string, issuer_string):
Expand Down
9 changes: 4 additions & 5 deletions lemur/tests/test_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@

def test_verify_simple_cert():
"""Simple certificate without CRL or OCSP."""
# Verification raises an exception for "unknown" if there are no means to verify it
with pytest.raises(Exception, match="Failed to verify"):
verify_string(INTERMEDIATE_CERT_STR, '')
# Verification returns None if there are no means to verify a cert
assert verify_string(INTERMEDIATE_CERT_STR, '') is None


def test_verify_crl_unknown_scheme(cert_builder, private_key):
Expand All @@ -31,7 +30,7 @@ def test_verify_crl_unknown_scheme(cert_builder, private_key):
f.write(cert.public_bytes(serialization.Encoding.PEM))

# Must not raise exception
crl_verify(cert_tmp)
crl_verify(cert, cert_tmp)


def test_verify_crl_unreachable(cert_builder, private_key):
Expand All @@ -48,4 +47,4 @@ def test_verify_crl_unreachable(cert_builder, private_key):
f.write(cert.public_bytes(serialization.Encoding.PEM))

with pytest.raises(Exception, match="Unable to retrieve CRL:"):
crl_verify(cert_tmp)
crl_verify(cert, cert_tmp)

0 comments on commit 3185272

Please sign in to comment.