Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved verify #1779

Merged
merged 11 commits into from
Oct 4, 2018
5 changes: 4 additions & 1 deletion lemur/certificates/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,10 @@ def check_revoked():
else:
status = verify_string(cert.body, "")

cert.status = 'valid' if status else 'revoked'
if status is None:
cert.status = 'unknown'
else:
cert.status = 'valid' if status else 'revoked'

except Exception as e:
sentry.captureException()
Expand Down
93 changes: 63 additions & 30 deletions lemur/certificates/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@
"""
import requests
import subprocess
from flask import current_app
from requests.exceptions import ConnectionError, InvalidSchema
from cryptography import x509
from cryptography.hazmat.backends import default_backend

from lemur.utils import mktempfile
from lemur.common.utils import parse_certificate

crl_cache = {}

def ocsp_verify(cert_path, issuer_chain_path):

def ocsp_verify(cert, cert_path, issuer_chain_path):
"""
Attempts to verify a certificate via OCSP. OCSP is a more modern version
of CRL in that it will query the OCSP URI in order to determine if the
certificate has been revoked

:param cert:
:param cert_path:
:param issuer_chain_path:
:return bool: True if certificate is valid, False otherwise
Expand All @@ -29,8 +33,14 @@ def ocsp_verify(cert_path, issuer_chain_path):
p1 = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
url, err = p1.communicate()

if not url:
current_app.logger.debug("No OCSP URL in certificate {}".format(cert.serial_number))
return None

p2 = subprocess.Popen(['openssl', 'ocsp', '-issuer', issuer_chain_path,
'-cert', cert_path, "-url", url.strip()], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
'-cert', cert_path, "-url", url.strip()],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

message, err = p2.communicate()

Expand All @@ -40,55 +50,69 @@ def ocsp_verify(cert_path, issuer_chain_path):
raise Exception("Got error when parsing OCSP url")

elif 'revoked' in p_message:
return
current_app.logger.debug("OCSP reports certificate revoked: {}".format(cert.serial_number))
return False

elif 'good' not in p_message:
raise Exception("Did not receive a valid response")

return True


def crl_verify(cert_path):
def crl_verify(cert, cert_path):
"""
Attempts to verify a certificate using CRL.

:param cert:
:param cert_path:
:return: True if certificate is valid, False otherwise
:raise Exception: If certificate does not have CRL
"""
with open(cert_path, 'rt') as c:
cert = parse_certificate(c.read())

distribution_points = cert.extensions.get_extension_for_oid(x509.OID_CRL_DISTRIBUTION_POINTS).value
try:
distribution_points = cert.extensions.get_extension_for_oid(
x509.OID_CRL_DISTRIBUTION_POINTS
).value
except x509.ExtensionNotFound:
current_app.logger.debug("No CRLDP extension in certificate {}".format(cert.serial_number))
return None

for p in distribution_points:
point = p.full_name[0].value

try:
response = requests.get(point)

if response.status_code != 200:
if point not in crl_cache:
current_app.logger.debug("Retrieving CRL: {}".format(point))
try:
response = requests.get(point)

if response.status_code != 200:
raise Exception("Unable to retrieve CRL: {0}".format(point))
except InvalidSchema:
# Unhandled URI scheme (like ldap://); skip this distribution point.
continue
except ConnectionError:
raise Exception("Unable to retrieve CRL: {0}".format(point))
except InvalidSchema:
# Unhandled URI scheme (like ldap://); skip this distribution point.
continue
except ConnectionError:
raise Exception("Unable to retrieve CRL: {0}".format(point))

crl = x509.load_der_x509_crl(response.content, backend=default_backend())
crl_cache[point] = x509.load_der_x509_crl(response.content,
backend=default_backend())
else:
current_app.logger.debug("CRL point is cached {}".format(point))

for r in crl:
if cert.serial == r.serial_number:
for r in crl_cache[point]:
if cert.serial_number == r.serial_number:
try:
reason = r.extensions.get_extension_for_class(x509.CRLReason).value
# Handle "removeFromCRL" revoke reason as unrevoked; continue with the next distribution point.
# Per RFC 5280 section 6.3.3 (k): https://tools.ietf.org/html/rfc5280#section-6.3.3
# Handle "removeFromCRL" revoke reason as unrevoked;
# continue with the next distribution point.
# Per RFC 5280 section 6.3.3 (k):
# https://tools.ietf.org/html/rfc5280#section-6.3.3
if reason == x509.ReasonFlags.remove_from_crl:
break
except x509.ExtensionNotFound:
pass

return
current_app.logger.debug("CRL reports certificate "
"revoked: {}".format(cert.serial_number))
return False

return True

Expand All @@ -101,15 +125,24 @@ def verify(cert_path, issuer_chain_path):
:param issuer_chain_path:
:return: True if valid, False otherwise
"""
with open(cert_path, 'rt') as c:
try:
cert = parse_certificate(c.read())
except ValueError as e:
current_app.logger.error(e)
return None

# OCSP is our main source of truth, in a lot of cases CRLs
# have been deprecated and are no longer updated
try:
return ocsp_verify(cert_path, issuer_chain_path)
except Exception as e:
try:
return crl_verify(cert_path)
except Exception as e:
raise Exception("Failed to verify")
verify_result = ocsp_verify(cert, cert_path, issuer_chain_path)

if verify_result is None:
verify_result = crl_verify(cert, cert_path)

if verify_result is None:
current_app.logger.debug("Failed to verify {}".format(cert.serial_number))

return verify_result


def verify_string(cert_string, issuer_string):
Expand Down
9 changes: 4 additions & 5 deletions lemur/tests/test_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@

def test_verify_simple_cert():
"""Simple certificate without CRL or OCSP."""
# Verification raises an exception for "unknown" if there are no means to verify it
with pytest.raises(Exception, match="Failed to verify"):
verify_string(INTERMEDIATE_CERT_STR, '')
# Verification returns None if there are no means to verify a cert
assert verify_string(INTERMEDIATE_CERT_STR, '') is None


def test_verify_crl_unknown_scheme(cert_builder, private_key):
Expand All @@ -31,7 +30,7 @@ def test_verify_crl_unknown_scheme(cert_builder, private_key):
f.write(cert.public_bytes(serialization.Encoding.PEM))

# Must not raise exception
crl_verify(cert_tmp)
crl_verify(cert, cert_tmp)


def test_verify_crl_unreachable(cert_builder, private_key):
Expand All @@ -48,4 +47,4 @@ def test_verify_crl_unreachable(cert_builder, private_key):
f.write(cert.public_bytes(serialization.Encoding.PEM))

with pytest.raises(Exception, match="Unable to retrieve CRL:"):
crl_verify(cert_tmp)
crl_verify(cert, cert_tmp)