diff --git a/pshtt/__init__.py b/pshtt/__init__.py index 19283fe5..ef7eb44d 100644 --- a/pshtt/__init__.py +++ b/pshtt/__init__.py @@ -1 +1 @@ -__version__ = '0.5.4' +__version__ = '0.6.0' diff --git a/pshtt/cli.py b/pshtt/cli.py index fbfc3375..f840438c 100755 --- a/pshtt/cli.py +++ b/pshtt/cli.py @@ -3,7 +3,7 @@ """pshtt ("pushed") is a tool to test domains for HTTPS best practices. Usage: - pshtt (INPUT ...) [--output OUTFILE] [--sorted] [--json] [--markdown] [--debug] [--timeout TIMEOUT] [--user-agent AGENT] [--cache-third-parties DIR] [--ca-file PATH] + pshtt (INPUT ...) [--output OUTFILE] [--sorted] [--json] [--markdown] [--debug] [--timeout TIMEOUT] [--user-agent AGENT] [--cache-third-parties DIR] [--ca-file PATH] [--pt-int-ca-file PATH] pshtt (-h | --help) Options: @@ -17,6 +17,7 @@ -t --timeout=TIMEOUT Override timeout (in seconds). -c --cache-third-parties=DIR Cache third party data, and what directory to cache it in. -f --ca-file=PATH Specify custom CA bundle (PEM format) + -p --pt-int-ca-file=PATH Specify public trust CA bundle with intermediates (PEM format) Notes: If the first INPUT ends with .csv, domains will be read from CSV. @@ -49,7 +50,7 @@ def to_csv(results, out_filename): row = [result[header] for header in pshtt.HEADERS] writer.writerow(row) - logging.warn("Wrote results to %s.", out_filename) + logging.warning("Wrote results to %s.", out_filename) def to_json(results, out_filename): @@ -62,7 +63,7 @@ def to_json(results, out_filename): out_file.write(json_content + '\n') if out_file is not sys.stdout: - logging.warn("Wrote results to %s.", out_filename) + logging.warning("Wrote results to %s.", out_filename) def to_markdown(results, out_filename): @@ -106,7 +107,8 @@ def main(): 'user_agent': args['--user-agent'], 'timeout': args['--timeout'], 'cache-third-parties': args['--cache-third-parties'], - 'ca_file': args['--ca-file'] + 'ca_file': args['--ca-file'], + 'pt_int_ca_file': args['--pt-int-ca-file'] } # Do the domain inspections diff --git a/pshtt/models.py b/pshtt/models.py index 1ae71a39..d72dd5f7 100644 --- a/pshtt/models.py +++ b/pshtt/models.py @@ -36,8 +36,12 @@ def __init__(self, protocol, host, base_domain): self.headers = {} # will be replaced with a requests.structures.CaseInsensitiveDict self.status = None self.live = None + self.ip = None self.redirect = None + self.server_header = None + self.server_version = None self.unknown_error = False + self.notes = "" # If an endpoint redirects, characterize the redirect behavior self.redirect_immediately_to = None @@ -55,11 +59,17 @@ def __init__(self, protocol, host, base_domain): # Only HTTPS endpoints have these. # Initialize all of them to None, so that it's # discernible if they don't get explicitly set. + self.https_full_connection = None + self.https_client_auth_required = False self.https_valid = None + self.https_public_trusted = None + self.https_custom_trusted = None self.https_bad_chain = None self.https_bad_hostname = None self.https_expired_cert = None self.https_self_signed_cert = None + self.https_cert_chain_len = None + self.https_missing_intermediate_cert = None self.hsts = None self.hsts_header = None self.hsts_max_age = None @@ -81,6 +91,7 @@ def to_object(self): 'url': self.url, 'headers': dict(self.headers), 'status': self.status, + 'ip': self.ip, 'live': self.live, 'redirect': self.redirect, 'redirect_eventually_to': self.redirect_eventually_to, @@ -94,15 +105,24 @@ def to_object(self): 'redirect_eventually_to_http': self.redirect_eventually_to_http, 'redirect_eventually_to_external': self.redirect_eventually_to_external, 'redirect_eventually_to_subdomain': self.redirect_eventually_to_subdomain, + 'server_header': self.server_header, + 'server_version': self.server_version, + 'notes': self.notes, 'unknown_error': self.unknown_error, } if self.protocol == "https": + obj['https_full_connection'] = self.https_full_connection + obj['https_client_auth_required'] = self.https_client_auth_required obj['https_valid'] = self.https_valid + obj['https_public_trusted'] = self.https_public_trusted + obj['https_custom_trusted'] = self.https_custom_trusted obj['https_bad_chain'] = self.https_bad_chain obj['https_bad_hostname'] = self.https_bad_hostname obj['https_expired_cert'] = self.https_expired_cert obj['https_self_signed_cert'] = self.https_self_signed_cert + obj['https_cert_chain_len'] = self.https_cert_chain_len + obj['https_missing_intermediate_cert'] = self.https_missing_intermediate_cert obj['hsts'] = self.hsts obj['hsts_header'] = self.hsts_header obj['hsts_max_age'] = self.hsts_max_age diff --git a/pshtt/pshtt.py b/pshtt/pshtt.py index f1292583..bb7c6788 100644 --- a/pshtt/pshtt.py +++ b/pshtt/pshtt.py @@ -29,26 +29,35 @@ from sslyze.server_connectivity_tester import ServerConnectivityTester, ServerConnectivityError import sslyze.synchronous_scanner -# We're going to be making requests with certificate validation disabled. -requests.packages.urllib3.disable_warnings() +# We're going to be making requests with certificate validation +# disabled. Commented next line due to pylint warning that urllib3 is +# not in requests.packages +# requests.packages.urllib3.disable_warnings() +import urllib3 +urllib3.disable_warnings() # Default, overrideable via --user-agent USER_AGENT = "pshtt, https scanning" -# Defaults to 10 second, overrideable via --timeout -TIMEOUT = 10 +# Defaults to 5 second, overrideable via --timeout +TIMEOUT = 5 # The fields we're collecting, will be keys in JSON and # column headers in CSV. HEADERS = [ - "Domain", "Base Domain", "Canonical URL", "Live", "Redirect", "Redirect To", - "Valid HTTPS", "Defaults to HTTPS", "Downgrades HTTPS", "Strictly Forces HTTPS", + "Domain", "Base Domain", "Canonical URL", "Live", + "HTTPS Live", "HTTPS Full Connection", "HTTPS Client Auth Required", + "Redirect", "Redirect To", + "Valid HTTPS", "HTTPS Publicly Trusted", "HTTPS Custom Truststore Trusted", + "Defaults to HTTPS", "Downgrades HTTPS", "Strictly Forces HTTPS", "HTTPS Bad Chain", "HTTPS Bad Hostname", "HTTPS Expired Cert", "HTTPS Self Signed Cert", "HSTS", "HSTS Header", "HSTS Max Age", "HSTS Entire Domain", "HSTS Preload Ready", "HSTS Preload Pending", "HSTS Preloaded", "Base Domain HSTS Preloaded", "Domain Supports HTTPS", - "Domain Enforces HTTPS", "Domain Uses Strong HSTS", "Unknown Error", + "Domain Enforces HTTPS", "Domain Uses Strong HSTS", "IP", + "Server Header", "Server Version", "HTTPS Cert Chain Length", + "HTTPS Probably Missing Intermediate Cert", "Notes", "Unknown Error", ] # Used for caching the HSTS preload list from Chromium's source. @@ -69,6 +78,7 @@ # Set if user wants to use a custom CA bundle CA_FILE = None STORE = "Mozilla" +PT_INT_CA_FILE = None def inspect(base_domain): @@ -105,10 +115,16 @@ def result_for(domain): 'Base Domain': parent_domain_for(domain.domain), 'Canonical URL': domain.canonical.url, 'Live': is_live(domain), - 'Redirect': is_redirect(domain), + 'Redirect': is_redirect_domain(domain), 'Redirect To': redirects_to(domain), + 'HTTPS Live': is_https_live(domain), + 'HTTPS Full Connection': is_full_connection(domain), + 'HTTPS Client Auth Required': is_client_auth_required(domain), + 'Valid HTTPS': is_valid_https(domain), + 'HTTPS Publicly Trusted': is_publicly_trusted(domain), + 'HTTPS Custom Truststore Trusted': is_custom_trusted(domain), 'Defaults to HTTPS': is_defaults_to_https(domain), 'Downgrades HTTPS': is_downgrades_https(domain), 'Strictly Forces HTTPS': is_strictly_forces_https(domain), @@ -117,6 +133,8 @@ def result_for(domain): 'HTTPS Bad Hostname': is_bad_hostname(domain), 'HTTPS Expired Cert': is_expired_cert(domain), 'HTTPS Self Signed Cert': is_self_signed_cert(domain), + 'HTTPS Cert Chain Length': cert_chain_length(domain), + 'HTTPS Probably Missing Intermediate Cert': is_missing_intermediate_cert(domain), 'HSTS': is_hsts(domain), 'HSTS Header': hsts_header(domain), @@ -131,12 +149,31 @@ def result_for(domain): 'Domain Enforces HTTPS': is_domain_enforces_https(domain), 'Domain Uses Strong HSTS': is_domain_strong_hsts(domain), + 'IP': get_domain_ip(domain), + 'Server Header': get_domain_server_header(domain), + 'Server Version': get_domain_server_version(domain), + 'Notes': get_domain_notes(domain), 'Unknown Error': did_domain_error(domain), } # But also capture the extended data for those who want it. result['endpoints'] = domain.to_object() + # This bit is complicated because of the continue statements, + # perhaps overly so. For instance, the continue statement + # following the "if header in ..." statement after "if not + # result['HTTPS Full Connection]" means that the final if + # statement that sets None values to False does not apply to those + # fields. This code should be rewritten to more clear, or at + # least commented so that it is clearer what is happening to the + # various fields. There is some implied logic due to the continue + # statements that is tricky, at least at first glance. + # + # Also, the comment before "for header in HEADERS" is not accurate + # for the same reason. + # + # - jsf9k + # Convert Header fields from None to False, except for: # - "HSTS Header" # - "HSTS Max Age" @@ -145,6 +182,18 @@ def result_for(domain): if header in ("HSTS Header", "HSTS Max Age", "Redirect To"): continue + if not result['HTTPS Full Connection']: + if header in ('HSTS', 'HSTS Header', 'HSTS Max Age', 'HSTS Entire Domain', 'HSTS Preload Ready', 'Domain Uses Strong HSTS'): + continue + + if header in ('IP', 'Server Header', 'Server Version', 'HTTPS Cert Chain Length') and result[header] is None: + continue + + if header in ('Valid HTTPS', 'HTTPS Publicly Trusted', 'HTTPS Custom Truststore Trusted'): + if not result['HTTPS Live']: + result[header] = False + continue + if result[header] is None: result[header] = False @@ -217,75 +266,147 @@ def basic_check(endpoint): utils.debug("Pinging %s..." % endpoint.url, divider=True) + req = None + try: with ping(endpoint.url) as req: endpoint.live = True if endpoint.protocol == "https": + endpoint.https_full_connection = True endpoint.https_valid = True except requests.exceptions.SSLError as err: - logging.warn("Error validating certificate.") - utils.debug("{0}".format(err)) - - # Retry with certificate validation disabled. - try: - with ping(endpoint.url, verify=False) as req: - pass - except requests.exceptions.SSLError as err: - # If it's a protocol error or other, it's not live. - endpoint.live = False - logging.warn("Unexpected SSL protocol (or other) error during retry.") - utils.debug("{0}".format(err)) - return - except requests.exceptions.RequestException as err: - endpoint.live = False - logging.warn("Unexpected requests exception during retry.") - utils.debug("{0}".format(err)) - return - except OpenSSL.SSL.Error as err: - endpoint.live = False - logging.warn("Unexpected OpenSSL exception during retry.") - utils.debug("{0}".format(err)) - return - except Exception as err: - endpoint.unknown_error = True - logging.warn("Unexpected other unknown exception during requests retry.") - utils.debug("{0}".format(err)) - return + if ( + "bad handshake" in str(err) and ( + "sslv3 alert handshake failure" in str(err) or ( + "Unexpected EOF" in str(err) + ) + ) + ): + logging.warning("{}: Error completing TLS handshake usually due to required client authentication.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) + endpoint.live = True + if endpoint.protocol == "https": + # The https can still be valid with a handshake error, + # sslyze will run later and check if it is not valid + endpoint.https_valid = True + endpoint.https_full_connection = False - # If it was a certificate error of any kind, it's live. + else: + logging.warning("{}: Error connecting over SSL/TLS or validating certificate.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) + # Retry with certificate validation disabled. + try: + with ping(endpoint.url, verify=False) as req: + endpoint.live = True + if endpoint.protocol == "https": + endpoint.https_full_connection = True + # sslyze later will actually check if the cert is valid + endpoint.https_valid = True + except requests.exceptions.SSLError as err: + # If it's a protocol error or other, it's not a full connection, + # but it is live. + endpoint.live = True + if endpoint.protocol == "https": + endpoint.https_full_connection = False + # HTTPS may still be valid, sslyze will double-check later + endpoint.https_valid = True + logging.warning("{}: Unexpected SSL protocol (or other) error during retry.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) + # continue on to SSLyze to check the connection + except requests.exceptions.RequestException as err: + endpoint.live = False + logging.warning("{}: Unexpected requests exception during retry.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) + return + except OpenSSL.SSL.Error as err: + endpoint.live = False + logging.warning("{}: Unexpected OpenSSL exception during retry.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) + return + except Exception as err: + endpoint.unknown_error = True + logging.warning("{}: Unexpected other unknown exception during requests retry.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) + return + + # If it was a certificate error of any kind, it's live, + # unless SSLyze encounters a connection error later endpoint.live = True - # Figure out the error(s). - https_check(endpoint) - except requests.exceptions.ConnectionError as err: - endpoint.live = False - utils.debug("{0}".format(err)) - return + # We can get this for some endpoints that are actually live, + # so if it's https let's try sslyze to be sure + if(endpoint.protocol == "https"): + # https check later will set whether the endpoint is live and valid + endpoint.https_full_connection = False + endpoint.https_valid = True + else: + endpoint.live = False + logging.warning("{}: Error connecting.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) # And this is the parent of ConnectionError and other things. # For example, "too many redirects". # See https://github.com/kennethreitz/requests/blob/master/requests/exceptions.py except requests.exceptions.RequestException as err: endpoint.live = False - logging.warn("Unexpected other requests exception.") - utils.debug("{0}".format(err)) + logging.warning("{}: Unexpected other requests exception.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) return except Exception as err: endpoint.unknown_error = True - logging.warn("Unexpected other unknown exception during initial request.") - utils.debug("{0}".format(err)) + logging.warning("{}: Unexpected other unknown exception during initial request.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) + return + + # Run SSLyze to see if there are any errors + if(endpoint.protocol == "https"): + https_check(endpoint) + # Double-check in case sslyze failed the first time, but the regular conneciton succeeded + if(endpoint.live is False and req is not None): + logging.warning("{}: Trying sslyze again since it connected once already.".format(endpoint.url)) + endpoint.live = True + endpoint.https_valid = True + https_check(endpoint) + if(endpoint.live is False): + # sslyze failed so back everything out and don't continue analyzing the existing response + req = None + endpoint.https_valid = False + endpoint.https_full_connection = False + + if req is None: + # Ensure that full_connection is set to False if we didn't get a response + if endpoint.protocol == "https": + endpoint.https_full_connection = False return + # try to get IP address if we can + try: + if req.raw.closed is False: + ip = req.raw._connection.sock.socket.getpeername()[0] + if endpoint.ip is None: + endpoint.ip = ip + else: + if endpoint.ip != ip: + utils.debug("{}: Endpoint IP is already {}, but requests IP is {}.".format(endpoint.url, endpoint.ip, ip)) + except Exception: + # if the socket has already closed, it will throw an exception, but this is just best effort, so ignore it + pass + # Endpoint is live, analyze the response. endpoint.headers = req.headers endpoint.status = req.status_code + if (req.headers.get('Server') is not None): + endpoint.server_header = req.headers.get('Server') + # *** in the future add logic to convert header to server version if known + if (req.headers.get('Location') is not None) and str(endpoint.status).startswith('3'): endpoint.redirect = True + logging.warning("{}: Found redirect.".format(endpoint.url)) if endpoint.redirect: try: @@ -303,9 +424,8 @@ def basic_check(endpoint): ultimate_req = None except Exception as err: endpoint.unknown_error = True - logging.warn("Unexpected other unknown exception when handling Requests Header.") - utils.debug("{0}".format(err)) - pass + logging.warning("{}: Unexpected other unknown exception when handling Requests Header.".format(endpoint.url)) + utils.debug("{} {}".format(endpoint.url, err)) try: with ping(endpoint.url, allow_redirects=True, verify=False) as ultimate_req: @@ -313,10 +433,13 @@ def basic_check(endpoint): except requests.exceptions.RequestException: # Swallow connection errors, but we won't be saving redirect info. pass + except OpenSSL.SSL.Error: + # Swallow connection errors, but we won't be saving redirect info. + pass except Exception as err: endpoint.unknown_error = True - logging.warn("Unexpected other unknown exception when handling redirect.") - utils.debug("{0}".format(err)) + logging.warning("{}: Unexpected other unknown exception when handling redirect.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) return try: @@ -384,9 +507,8 @@ def basic_check(endpoint): endpoint.redirect_eventually_to_subdomain = endpoint.redirect_immediately_to_subdomain except Exception as err: endpoint.unknown_error = True - logging.warn("Unexpected other unknown exception when establishing redirects.") - utils.debug("{0}".format(err)) - pass + logging.warning("{}: Unexpected other unknown exception when establishing redirects.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) def hsts_check(endpoint): @@ -422,7 +544,7 @@ def hsts_check(endpoint): if "max-age" in header.lower(): endpoint.hsts_max_age = int(temp[0][len("max-age="):]) - if endpoint.hsts_max_age <= 0: + if endpoint.hsts_max_age is None or endpoint.hsts_max_age <= 0: endpoint.hsts = False return @@ -435,8 +557,8 @@ def hsts_check(endpoint): endpoint.hsts_preload = True except Exception as err: endpoint.unknown_error = True - logging.warn("Unknown exception when handling HSTS check.") - utils.debug("{0}".format(err)) + logging.warning("{}: Unknown exception when handling HSTS check.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) return @@ -444,42 +566,97 @@ def https_check(endpoint): """ Uses sslyze to figure out the reason the endpoint wouldn't verify. """ - utils.debug("sslyzing %s..." % endpoint.url) + utils.debug("sslyzing {}...".format(endpoint.url)) # remove the https:// from prefix for sslyze try: hostname = endpoint.url[8:] server_tester = ServerConnectivityTester(hostname=hostname, port=443) server_info = server_tester.perform() + endpoint.live = True + ip = server_info.ip_address + if endpoint.ip is None: + endpoint.ip = ip + else: + if endpoint.ip != ip: + utils.debug("{}: Endpoint IP is already {}, but requests IP is {}.".format(endpoint.url, endpoint.ip, ip)) + if server_info.client_auth_requirement.name == 'REQUIRED': + endpoint.https_client_auth_required = True + logging.warning("{}: Client Authentication REQUIRED".format(endpoint.url)) except ServerConnectivityError as err: - logging.warn("Error in sslyze server connectivity check when connecting to {}".format(err.server_info.hostname)) - utils.debug("{0}".format(err)) + endpoint.live = False + endpoint.https_valid = False + logging.warning("{}: Error in sslyze server connectivity check when connecting to {}".format(endpoint.url, err.server_info.hostname)) + utils.debug("{}: {}".format(endpoint.url, err)) return except Exception as err: endpoint.unknown_error = True - logging.warn("Unknown exception in sslyze server connectivity check.") - utils.debug("{0}".format(err)) + logging.warning("{}: Unknown exception in sslyze server connectivity check.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) return try: + cert_plugin_result = None command = sslyze.plugins.certificate_info_plugin.CertificateInfoScanCommand(ca_file=CA_FILE) scanner = sslyze.synchronous_scanner.SynchronousScanner() cert_plugin_result = scanner.run_scan_command(server_info, command) except Exception as err: - endpoint.unknown_error = True - logging.warn("Unknown exception in sslyze scanner.") - utils.debug("{0}".format(err)) - return + try: + if("timed out" in str(err)): + logging.warning("{}: Retrying sslyze scanner certificate plugin.".format(endpoint.url)) + cert_plugin_result = scanner.run_scan_command(server_info, command) + except Exception: + pass + if(cert_plugin_result is None): + logging.warning("{}: Unknown exception in sslyze scanner certificate plugin.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) + endpoint.unknown_error = True + endpoint.https_valid = None # could make this False, but there was an error so we don't know + return + + try: + public_trust = True + custom_trust = True + public_not_trusted_string = "" + validation_results = cert_plugin_result.path_validation_result_list + for result in validation_results: + if result.is_certificate_trusted: + # We're assuming that it is trusted to start with + pass + else: + if 'Custom' in result.trust_store.name: + custom_trust = False + else: + public_trust = False + if len(public_not_trusted_string) > 0: + public_not_trusted_string += ", " + public_not_trusted_string += result.trust_store.name + if public_trust: + logging.warning("{}: Publicly trusted by common trust stores.".format(endpoint.url)) + else: + logging.warning("{}: Not publicly trusted - not trusted by {}.".format(endpoint.url, public_not_trusted_string)) + if CA_FILE is not None: + if custom_trust: + logging.warning("{}: Trusted by custom trust store.".format(endpoint.url)) + else: + logging.warning("{}: Not trusted by custom trust store.".format(endpoint.url)) + else: + custom_trust = None + endpoint.https_public_trusted = public_trust + endpoint.https_custom_trusted = custom_trust + except Exception as err: + # Ignore exception + utils.debug("{}: Unknown exception examining trust: {}".format(endpoint.url, err)) try: cert_response = cert_plugin_result.as_text() except AttributeError: - logging.warn("Known error in sslyze 1.X with EC public keys. See https://github.com/nabla-c0d3/sslyze/issues/215") + logging.warning("{}: Known error in sslyze 1.X with EC public keys. See https://github.com/nabla-c0d3/sslyze/issues/215".format(endpoint.url)) return None except Exception as err: endpoint.unknown_error = True - logging.warn("Unknown exception in cert plugin.") - utils.debug("{0}".format(err)) + logging.warning("{}: Unknown exception in cert plugin.".format(endpoint.url)) + utils.debug("{}: {}".format(endpoint.url, err)) return # Debugging @@ -548,6 +725,46 @@ def https_check(endpoint): ): endpoint.https_bad_hostname = True + try: + endpoint.https_cert_chain_len = len(cert_plugin_result.certificate_chain) + if ( + endpoint.https_self_signed_cert is False and ( + len(cert_plugin_result.certificate_chain) < 2 + ) + ): + # *** TODO check that it is not a bad hostname and that the root cert is trusted before suggesting that it is an intermediate cert issue. + endpoint.https_missing_intermediate_cert = True + if(cert_plugin_result.successful_trust_store is None): + logging.warning("{}: Untrusted certificate chain, probably due to missing intermediate certificate.".format(endpoint.url)) + utils.debug("{}: Only {} certificates in certificate chain received.".format(endpoint.url, cert_plugin_result.certificate_chain.__len__())) + elif(custom_trust is True and public_trust is False): + # recheck public trust using custom public trust store with manually added intermediate certificates + if(PT_INT_CA_FILE is not None): + try: + cert_plugin_result = None + command = sslyze.plugins.certificate_info_plugin.CertificateInfoScanCommand(ca_file=PT_INT_CA_FILE) + cert_plugin_result = scanner.run_scan_command(server_info, command) + if(cert_plugin_result.successful_trust_store is not None): + public_trust = True + endpoint.https_public_trusted = public_trust + logging.warning("{}: Trusted by special public trust store with intermediate certificates.".format(endpoint.url)) + except Exception: + pass + else: + endpoint.https_missing_intermediate_cert = False + except Exception: + # Squash exceptions + pass + + # If anything is wrong then https is not valid + if ( + endpoint.https_expired_cert or + endpoint.https_self_signed_cert or + endpoint.https_bad_chain or + endpoint.https_bad_hostname + ): + endpoint.https_valid = False + def canonical_endpoint(http, httpwww, https, httpswww): """ @@ -676,39 +893,92 @@ def is_live(domain): return http.live or httpwww.live or https.live or httpswww.live -def is_redirect(domain): +def is_https_live(domain): + """ + Domain is https live if any https endpoint is live. + """ + https, httpswww = domain.https, domain.httpswww + + return https.live or httpswww.live + + +def is_full_connection(domain): + """ + Domain is "fully connected" if any https endpoint is fully connected. + """ + https, httpswww = domain.https, domain.httpswww + + return https.https_full_connection or httpswww.https_full_connection + + +def is_client_auth_required(domain): + """ + Domain requires client authentication if *any* HTTPS endpoint requires it for full TLS connection. + """ + https, httpswww = domain.https, domain.httpswww + + return https.https_client_auth_required or httpswww.https_client_auth_required + + +def is_redirect_or_down(endpoint): + """ + Endpoint is a redirect or down if it is a redirect to an external site or it is down in any of 3 ways: + it is not live, it is HTTPS and has a bad hostname in the cert, or it responds with a 4xx error code + """ + return ( + endpoint.redirect_eventually_to_external or + (not endpoint.live) or + ( + endpoint.protocol == "https" and + endpoint.https_bad_hostname + ) or + ( + endpoint.status is not None and + endpoint.status >= 400 + ) + ) + + +def is_redirect(endpoint): + """ + Endpoint is a redirect if it is a redirect to an external site + """ + return endpoint.redirect_eventually_to_external + + +def is_redirect_domain(domain): """ Domain is "a redirect domain" if at least one endpoint is a redirect, and all endpoints are either redirects or down. """ http, httpwww, https, httpswww = domain.http, domain.httpwww, domain.https, domain.httpswww - # TODO: make sub-function of the conditional below. - # def is_redirect_or_down(endpoint): - return is_live(domain) and ( ( - https.redirect_eventually_to_external or - (not https.live) or - https.https_bad_hostname or - https.status >= 400 - ) and - ( - httpswww.redirect_eventually_to_external or - (not httpswww.live) or - httpswww.https_bad_hostname or - httpswww.status >= 400 + is_redirect(http) or is_redirect(httpwww) or is_redirect(https) or is_redirect(httpswww) ) and + is_redirect_or_down(https) and + is_redirect_or_down(httpswww) and + is_redirect_or_down(httpwww) and + is_redirect_or_down(http) + ) + + +def is_http_redirect_domain(domain): + """ + Domain is "an http redirect domain" if at least one http endpoint + is a redirect, and all other http endpoints are either redirects + or down. + """ + http, httpwww, = domain.http, domain.httpwww + + return is_live(domain) and ( ( - httpwww.redirect_eventually_to_external or - (not httpwww.live) or - httpwww.status >= 400 + is_redirect(http) or is_redirect(httpwww) ) and - ( - http.redirect_eventually_to_external or - (not http.live) or - http.status >= 400 - )) + is_redirect_or_down(httpwww) and + is_redirect_or_down(http) + ) def redirects_to(domain): @@ -717,7 +987,7 @@ def redirects_to(domain): """ canonical = domain.canonical - if is_redirect(domain): + if is_redirect_domain(domain): return canonical.redirect_eventually_to else: return None @@ -802,9 +1072,43 @@ def down_or_redirects(endpoint): return https_somewhere and all_http_unused +def is_publicly_trusted(domain): + """ + A domain has a "Publicly Trusted" certificate if its canonical + endpoint has a publicly trusted certificate. + """ + canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww + + # Evaluate the HTTPS version of the canonical hostname + if canonical.host == "root": + evaluate = https + else: + evaluate = httpswww + + return evaluate.live and evaluate.https_public_trusted + + +def is_custom_trusted(domain): + """ + A domain has a "Custom Trusted" certificate if its canonical + endpoint has a certificate that is trusted by the custom + truststore. + """ + canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww + + # Evaluate the HTTPS version of the canonical hostname + if canonical.host == "root": + evaluate = https + else: + evaluate = httpswww + + return evaluate.live and evaluate.https_custom_trusted + + def is_bad_chain(domain): """ - Domain has a bad chain if either https endpoints contain a bad chain + Domain has a bad chain if its canonical https endpoint has a bad + chain """ canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww @@ -818,7 +1122,8 @@ def is_bad_chain(domain): def is_bad_hostname(domain): """ - Domain has a bad hostname if either https endpoint fails hostname validation + Domain has a bad hostname if its canonical https endpoint fails + hostname validation """ canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww @@ -832,7 +1137,7 @@ def is_bad_hostname(domain): def is_expired_cert(domain): """ - Returns if the either https endpoint has an expired cert + Returns if its canonical https endpoint has an expired cert """ canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww @@ -846,7 +1151,7 @@ def is_expired_cert(domain): def is_self_signed_cert(domain): """ - Returns if the either https endpoint has a self-signed cert cert + Returns if its canonical https endpoint has a self-signed cert cert """ canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww @@ -858,6 +1163,35 @@ def is_self_signed_cert(domain): return canonical_https.https_self_signed_cert +def cert_chain_length(domain): + """ + Returns the cert chain length for the canonical HTTPS endpoint + """ + canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww + + if canonical.host == "www": + canonical_https = httpswww + else: + canonical_https = https + + return canonical_https.https_cert_chain_len + + +def is_missing_intermediate_cert(domain): + """ + Returns whether the served cert chain is probably missing the + needed intermediate certificate for the canonical HTTPS endpoint + """ + canonical, https, httpswww = domain.canonical, domain.https, domain.httpswww + + if canonical.host == "www": + canonical_https = httpswww + else: + canonical_https = https + + return canonical_https.https_missing_intermediate_cert + + def is_hsts(domain): """ Domain has HSTS if its canonical HTTPS endpoint has HSTS. @@ -1004,7 +1338,10 @@ def is_domain_enforces_https(domain): """ return is_domain_supports_https(domain) and ( is_defaults_to_https(domain) or ( - is_strictly_forces_https(domain) and is_redirect(domain) + is_strictly_forces_https(domain) and ( + is_redirect_domain(domain) or + is_http_redirect_domain(domain) + ) ) ) @@ -1019,6 +1356,66 @@ def is_domain_strong_hsts(domain): return None +def get_domain_ip(domain): + """ + Get the IP for the domain. Any IP that responded is good enough. + """ + if domain.canonical.ip is not None: + return domain.canonical.ip + if domain.https.ip is not None: + return domain.https.ip + if domain.httpswww.ip is not None: + return domain.httpswww.ip + if domain.httpwww.ip is not None: + return domain.httpwww.ip + if domain.http.ip is not None: + return domain.http.ip + return None + + +def get_domain_server_header(domain): + """ + Get the Server header from the response for the domain. + """ + if domain.canonical.server_header is not None: + return domain.canonical.server_header.replace(',', ';') + if domain.https.server_header is not None: + return domain.https.server_header.replace(',', ';') + if domain.httpswww.server_header is not None: + return domain.httpswww.server_header.replace(',', ';') + if domain.httpwww.server_header is not None: + return domain.httpwww.server_header.replace(',', ';') + if domain.http.server_header is not None: + return domain.http.server_header.replace(',', ';') + return None + + +def get_domain_server_version(domain): + """ + Get the Server version based on the Server header for the web server. + """ + if domain.canonical.server_version is not None: + return domain.canonical.server_version + if domain.https.server_version is not None: + return domain.https.server_version + if domain.httpswww.server_version is not None: + return domain.httpswww.server_version + if domain.httpwww.server_version is not None: + return domain.httpwww.server_version + if domain.http.server_version is not None: + return domain.http.server_version + return None + + +def get_domain_notes(domain): + """ + Combine all domain notes if there are any. + """ + all_notes = domain.http.notes + domain.httpwww.notes + domain.https.notes + domain.httpswww.notes + all_notes = all_notes.replace(',', ';') + return all_notes + + def did_domain_error(domain): """ Checks if the domain had an Unknown error somewhere @@ -1044,7 +1441,7 @@ def load_preload_pending(): try: request = requests.get(pending_url) except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as err: - logging.warn('Failed to fetch pending preload list: {}'.format(pending_url)) + logging.warning('Failed to fetch pending preload list: {}'.format(pending_url)) logging.debug('{}'.format(err)) return [] @@ -1075,7 +1472,7 @@ def load_preload_list(): try: request = requests.get(file_url) except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as err: - logging.warn('Failed to fetch preload list: {}'.format(file_url)) + logging.warning('Failed to fetch preload list: {}'.format(file_url)) logging.debug('{}'.format(err)) return [] @@ -1111,7 +1508,7 @@ def load_suffix_list(): try: cache_file = fetch() except URLError as err: - logging.warn("Unable to download the Public Suffix List...") + logging.warning("Unable to download the Public Suffix List...") utils.debug("{}".format(err)) return [] content = cache_file.readlines() @@ -1209,7 +1606,7 @@ def initialize_external_data( def inspect_domains(domains, options): # Override timeout, user agent, preload cache, default CA bundle - global TIMEOUT, USER_AGENT, THIRD_PARTIES_CACHE, CA_FILE, STORE + global TIMEOUT, USER_AGENT, THIRD_PARTIES_CACHE, CA_FILE, PT_INT_CA_FILE, STORE if options.get('timeout'): TIMEOUT = int(options['timeout']) @@ -1227,6 +1624,9 @@ def inspect_domains(domains, options): # "Custom" Option from the sslyze output. STORE = "Custom" + if options.get('pt_int_ca_file'): + PT_INT_CA_FILE = options['pt_int_ca_file'] + # If this has been run once already by a Python API client, it # can be safely run without hitting the network or disk again, # and without overriding the data the Python user set for them. diff --git a/tests/test_cli.py b/tests/test_cli.py index 44134717..01d613fc 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -57,9 +57,14 @@ def test_single_result(self): ('Base Domain', 'example.com'), ('Canonical URL', 'http://example.com'), ('Live', 'False'), + ('HTTPS Live', 'False'), + ('HTTPS Full Connection', 'False'), + ('HTTPS Client Auth Required', 'False'), ('Redirect', 'False'), ('Redirect To', ''), ('Valid HTTPS', 'False'), + ('HTTPS Publicly Trusted', 'False'), + ('HTTPS Custom Truststore Trusted', 'False'), ('Defaults to HTTPS', 'False'), ('Downgrades HTTPS', 'False'), ('Strictly Forces HTTPS', 'False'), @@ -67,20 +72,28 @@ def test_single_result(self): ('HTTPS Bad Hostname', 'False'), ('HTTPS Expired Cert', 'False'), ('HTTPS Self Signed Cert', 'False'), - ('HSTS', 'False'), + ('HSTS', ''), ('HSTS Header', ''), ('HSTS Max Age', ''), - ('HSTS Entire Domain', 'False'), + ('HSTS Entire Domain', ''), ('HSTS Preload Ready', 'False'), ('HSTS Preload Pending', 'False'), ('HSTS Preloaded', 'False'), ('Base Domain HSTS Preloaded', 'False'), ('Domain Supports HTTPS', 'False'), ('Domain Enforces HTTPS', 'False'), - ('Domain Uses Strong HSTS', 'False'), + ('Domain Uses Strong HSTS', ''), + ('IP', ''), + ('Server Header', ''), + ('Server Version', ''), + ('HTTPS Cert Chain Length', ''), + ('HTTPS Probably Missing Intermediate Cert', 'False'), + ('Notes', ''), ('Unknown Error', 'False'), ] + self.maxDiff = None + header = ','.join(t[0] for t in domain_data) values = ','.join(t[1] for t in domain_data) expected = header + '\n' + values + '\n'