From 3219f73bcfb868228f1e5f109761a6eeb16bb473 Mon Sep 17 00:00:00 2001 From: Michael Schlenker Date: Thu, 8 Jun 2017 11:34:23 +0200 Subject: [PATCH] Fix import sort order Make the isort check happy. --- src/oic/utils/aes.py | 402 ++++++------ src/oic/utils/http_util.py | 1197 ++++++++++++++++++------------------ tests/test_aes.py | 121 ++-- tests/test_http_util.py | 321 +++++----- 4 files changed, 1022 insertions(+), 1019 deletions(-) diff --git a/src/oic/utils/aes.py b/src/oic/utils/aes.py index ef8c9d615..51f6910dc 100644 --- a/src/oic/utils/aes.py +++ b/src/oic/utils/aes.py @@ -1,200 +1,202 @@ -#!/usr/bin/env python -from future.utils import tobytes - -import os -from base64 import b64decode -from base64 import b64encode - -from Cryptodome import Random -from Cryptodome.Cipher import AES -from six import indexbytes, binary_type, text_type - -__author__ = 'rolandh' - -POSTFIX_MODE = { - "cbc": AES.MODE_CBC, - "cfb": AES.MODE_CFB, - "ecb": AES.MODE_CFB, -} - -BLOCK_SIZE = 16 - - -class AESError(Exception): - pass - - -def build_cipher(key, iv, alg="aes_128_cbc"): - """ - :param key: encryption key - :param iv: init vector - :param alg: cipher algorithm - :return: A Cipher instance - """ - typ, bits, cmode = alg.split("_") - - if not iv: - iv = Random.new().read(AES.block_size) - else: - assert len(iv) == AES.block_size - - if bits not in ["128", "192", "256"]: - raise AESError("Unsupported key length") - try: - assert len(key) == int(bits) >> 3 - except AssertionError: - raise AESError("Wrong Key length") - - try: - return AES.new(tobytes(key), POSTFIX_MODE[cmode], tobytes(iv)), iv - except KeyError: - raise AESError("Unsupported chaining mode") - - -def encrypt(key, msg, iv=None, alg="aes_128_cbc", padding="PKCS#7", - b64enc=True, block_size=BLOCK_SIZE): - """ - :param key: The encryption key - :param iv: init vector - :param msg: Message to be encrypted - :param padding: Which padding that should be used - :param b64enc: Whether the result should be base64encoded - :param block_size: If PKCS#7 padding which block size to use - :return: The encrypted message - """ - - if padding == "PKCS#7": - _block_size = block_size - elif padding == "PKCS#5": - _block_size = 8 - else: - _block_size = 0 - - if _block_size: - plen = _block_size - (len(msg) % _block_size) - c = chr(plen) - msg += (c * plen) - - cipher, iv = build_cipher(tobytes(key), iv, alg) - cmsg = iv + cipher.encrypt(tobytes(msg)) - if b64enc: - return b64encode(cmsg) - else: - return cmsg - - -def decrypt(key, msg, iv=None, padding="PKCS#7", b64dec=True): - """ - :param key: The encryption key - :param iv: init vector - :param msg: Base64 encoded message to be decrypted - :return: The decrypted message - """ - if b64dec: - data = b64decode(msg) - else: - data = msg - - _iv = data[:AES.block_size] - if iv: - assert iv == _iv - cipher, iv = build_cipher(key, iv) - res = cipher.decrypt(data)[AES.block_size:] - if padding in ["PKCS#5", "PKCS#7"]: - res = res[:-indexbytes(res, -1)] - return res.decode("utf-8") - - -class AEAD(object): - """ - Authenticated Encryption with Associated Data Wrapper - - This does encrypts and does an integrity check in one - operation, so you do not need to combine HMAC + encryption - yourself. - - :param key: The key to use for encryption. - :type key: bytes - :param iv: The initialization vector. - :type iv: bytes - :param mode: One of the AEAD available modes. - - Your key and initialization vectors should be created from random bytes - of sufficient length. - - For the default SIV mode, you need one of: - - - 256-bit key, 128-bit IV to use AES-128 - - 384-bit key, 192-bit IV to use AES-192 - - 512-bit key, 256-bit IV to use AES-256 - - """ - def __init__(self, key, iv, mode=AES.MODE_SIV): - assert isinstance(key, binary_type) - assert isinstance(iv, binary_type) - self.key = key - self.mode = mode - self.iv = iv - self.kernel = AES.new(self.key, self.mode, self.iv) - - def add_associated_data(self, data): - """ - Add data to include in the MAC - - This data is protected by the MAC but not encrypted. - - :param data: data to add in the MAC calculation - :type data: bytes - """ - if isinstance(data, text_type): - data = data.encode('utf-8') - self.kernel.update(data) - - def encrypt_and_tag(self, cleardata): - """ - Encrypt the given data - - Encrypts the given data and returns the encrypted - data and the MAC to later verify and decrypt the data. - - :param cleardata: data to encrypt - :type cleardata: bytes - - :returns: 2-tuple of encrypted data and MAC - """ - assert isinstance(cleardata, binary_type) - return self.kernel.encrypt_and_digest(cleardata) - - def decrypt_and_verify(self, cipherdata, tag): - """ - Decrypt and verify - - Checks the integrity against the tag and decrypts the - data. Any associated data used during encryption - needs to be added before calling this too. - - :param cipherdata: The encrypted data - :type cipherdata: bytes - :param tag: The MAC tag - :type tag: bytes - """ - assert isinstance(cipherdata, binary_type) - assert isinstance(tag, binary_type) - try: - return self.kernel.decrypt_and_verify(cipherdata, tag) - except ValueError: - raise AESError("Failed to verify data") - - -if __name__ == "__main__": - key_ = "1234523451234545" # 16 byte key - # Iff padded the message doesn't have to be multiple of 16 in length - msg_ = "ToBeOrNotTobe W.S." - iv_ = os.urandom(16) - encrypted_msg = encrypt(key_, msg_, iv_) - txt = decrypt(key_, encrypted_msg, iv_) - assert txt == msg_ - - encrypted_msg = encrypt(key_, msg_, 0) - txt = decrypt(key_, encrypted_msg, 0) - assert txt == msg_ +#!/usr/bin/env python +from future.utils import tobytes + +import os +from base64 import b64decode +from base64 import b64encode + +from Cryptodome import Random +from Cryptodome.Cipher import AES +from six import binary_type +from six import indexbytes +from six import text_type + +__author__ = 'rolandh' + +POSTFIX_MODE = { + "cbc": AES.MODE_CBC, + "cfb": AES.MODE_CFB, + "ecb": AES.MODE_CFB, +} + +BLOCK_SIZE = 16 + + +class AESError(Exception): + pass + + +def build_cipher(key, iv, alg="aes_128_cbc"): + """ + :param key: encryption key + :param iv: init vector + :param alg: cipher algorithm + :return: A Cipher instance + """ + typ, bits, cmode = alg.split("_") + + if not iv: + iv = Random.new().read(AES.block_size) + else: + assert len(iv) == AES.block_size + + if bits not in ["128", "192", "256"]: + raise AESError("Unsupported key length") + try: + assert len(key) == int(bits) >> 3 + except AssertionError: + raise AESError("Wrong Key length") + + try: + return AES.new(tobytes(key), POSTFIX_MODE[cmode], tobytes(iv)), iv + except KeyError: + raise AESError("Unsupported chaining mode") + + +def encrypt(key, msg, iv=None, alg="aes_128_cbc", padding="PKCS#7", + b64enc=True, block_size=BLOCK_SIZE): + """ + :param key: The encryption key + :param iv: init vector + :param msg: Message to be encrypted + :param padding: Which padding that should be used + :param b64enc: Whether the result should be base64encoded + :param block_size: If PKCS#7 padding which block size to use + :return: The encrypted message + """ + + if padding == "PKCS#7": + _block_size = block_size + elif padding == "PKCS#5": + _block_size = 8 + else: + _block_size = 0 + + if _block_size: + plen = _block_size - (len(msg) % _block_size) + c = chr(plen) + msg += (c * plen) + + cipher, iv = build_cipher(tobytes(key), iv, alg) + cmsg = iv + cipher.encrypt(tobytes(msg)) + if b64enc: + return b64encode(cmsg) + else: + return cmsg + + +def decrypt(key, msg, iv=None, padding="PKCS#7", b64dec=True): + """ + :param key: The encryption key + :param iv: init vector + :param msg: Base64 encoded message to be decrypted + :return: The decrypted message + """ + if b64dec: + data = b64decode(msg) + else: + data = msg + + _iv = data[:AES.block_size] + if iv: + assert iv == _iv + cipher, iv = build_cipher(key, iv) + res = cipher.decrypt(data)[AES.block_size:] + if padding in ["PKCS#5", "PKCS#7"]: + res = res[:-indexbytes(res, -1)] + return res.decode("utf-8") + + +class AEAD(object): + """ + Authenticated Encryption with Associated Data Wrapper + + This does encrypts and does an integrity check in one + operation, so you do not need to combine HMAC + encryption + yourself. + + :param key: The key to use for encryption. + :type key: bytes + :param iv: The initialization vector. + :type iv: bytes + :param mode: One of the AEAD available modes. + + Your key and initialization vectors should be created from random bytes + of sufficient length. + + For the default SIV mode, you need one of: + + - 256-bit key, 128-bit IV to use AES-128 + - 384-bit key, 192-bit IV to use AES-192 + - 512-bit key, 256-bit IV to use AES-256 + + """ + def __init__(self, key, iv, mode=AES.MODE_SIV): + assert isinstance(key, binary_type) + assert isinstance(iv, binary_type) + self.key = key + self.mode = mode + self.iv = iv + self.kernel = AES.new(self.key, self.mode, self.iv) + + def add_associated_data(self, data): + """ + Add data to include in the MAC + + This data is protected by the MAC but not encrypted. + + :param data: data to add in the MAC calculation + :type data: bytes + """ + if isinstance(data, text_type): + data = data.encode('utf-8') + self.kernel.update(data) + + def encrypt_and_tag(self, cleardata): + """ + Encrypt the given data + + Encrypts the given data and returns the encrypted + data and the MAC to later verify and decrypt the data. + + :param cleardata: data to encrypt + :type cleardata: bytes + + :returns: 2-tuple of encrypted data and MAC + """ + assert isinstance(cleardata, binary_type) + return self.kernel.encrypt_and_digest(cleardata) + + def decrypt_and_verify(self, cipherdata, tag): + """ + Decrypt and verify + + Checks the integrity against the tag and decrypts the + data. Any associated data used during encryption + needs to be added before calling this too. + + :param cipherdata: The encrypted data + :type cipherdata: bytes + :param tag: The MAC tag + :type tag: bytes + """ + assert isinstance(cipherdata, binary_type) + assert isinstance(tag, binary_type) + try: + return self.kernel.decrypt_and_verify(cipherdata, tag) + except ValueError: + raise AESError("Failed to verify data") + + +if __name__ == "__main__": + key_ = "1234523451234545" # 16 byte key + # Iff padded the message doesn't have to be multiple of 16 in length + msg_ = "ToBeOrNotTobe W.S." + iv_ = os.urandom(16) + encrypted_msg = encrypt(key_, msg_, iv_) + txt = decrypt(key_, encrypted_msg, iv_) + assert txt == msg_ + + encrypted_msg = encrypt(key_, msg_, 0) + txt = decrypt(key_, encrypted_msg, 0) + assert txt == msg_ diff --git a/src/oic/utils/http_util.py b/src/oic/utils/http_util.py index 77369dfe1..5e7467e7a 100644 --- a/src/oic/utils/http_util.py +++ b/src/oic/utils/http_util.py @@ -1,598 +1,599 @@ -from future.backports.http.cookies import SimpleCookie -from future.backports.urllib.parse import quote - -import base64 -import cgi -import hashlib -import hmac -import logging -import os -import time - -from jwkest import as_unicode -from six import PY2 -from six import text_type, binary_type - -from oic import rndstr -from oic.exception import ImproperlyConfigured -from oic.exception import UnsupportedMethod -from oic.utils import time_util - -from oic.utils.aes import AEAD, AESError - -__author__ = 'rohe0002' - -logger = logging.getLogger(__name__) - -SUCCESSFUL = [200, 201, 202, 203, 204, 205, 206] - -CORS_HEADERS = [ - ("Access-Control-Allow-Origin", "*"), - ("Access-Control-Allow-Methods", "GET"), - ("Access-Control-Allow-Headers", "Authorization") -] - - -class Response(object): - _template = None - _status = '200 OK' - _content_type = 'text/html' - _mako_template = None - _mako_lookup = None - - def __init__(self, message=None, **kwargs): - self.status = kwargs.get("status", self._status) - self.response = kwargs.get("response", self._response) - self.template = kwargs.get("template", self._template) - self.mako_template = kwargs.get("mako_template", self._mako_template) - self.mako_lookup = kwargs.get("template_lookup", self._mako_lookup) - - self.message = message - - self.headers = [] - self.headers.extend(kwargs.get("headers", [])) - _content_type = kwargs.get("content", self._content_type) - - self.headers.append(("Content-type", _content_type)) - - def __call__(self, environ, start_response, **kwargs): - start_response(self.status, self.headers) - return self.response(self.message, **kwargs) - - def _response(self, message="", **argv): - # Have to be more specific, this might be a bit to much. - if message: - try: - if '', '</script>') - except TypeError: - if b'', b'</script>') - - if self.template: - if ("Content-type", "application/json") in self.headers: - return [message.encode("utf-8")] - else: - return [str(self.template % message).encode("utf-8")] - elif self.mako_lookup and self.mako_template: - argv["message"] = message - mte = self.mako_lookup.get_template(self.mako_template) - return [mte.render(**argv)] - else: - if [x for x in self._c_types() if x.startswith('image/')]: - return [message] - elif [x for x in self._c_types() if x == 'application/x-gzip']: - return [message] - - try: - return [message.encode("utf-8")] - except AttributeError: - return [message] - - def info(self): - return {'status': self.status, 'headers': self.headers, - 'message': self.message} - - def add_header(self, ava): - self.headers.append(ava) - - def reply(self, **kwargs): - return self.response(self.message, **kwargs) - - def _c_types(self): - return [y for x, y in self.headers if x == "Content-type"] - - -class Created(Response): - _status = "201 Created" - - -class Accepted(Response): - _status = "202 Accepted" - - -class NonAuthoritativeInformation(Response): - _status = "203 Non Authoritative Information" - - -class NoContent(Response): - _status = "204 No Content" - - -class Redirect(Response): - _template = '\nRedirecting to %s\n' \ - '\nYou are being redirected to %s\n' \ - '\n' - _status = '302 Found' - - def __call__(self, environ, start_response, **kwargs): - location = self.message - self.headers.append(('location', location)) - start_response(self.status, self.headers) - return self.response((location, location, location)) - - -class SeeOther(Response): - _template = '\nRedirecting to %s\n' \ - '\nYou are being redirected to %s\n' \ - '\n' - _status = '303 See Other' - - def __call__(self, environ, start_response, **kwargs): - location = self.message - if PY2: - try: - location = location.encode('utf8') - except UnicodeDecodeError: - pass - self.headers.append(('location', location)) - start_response(self.status, self.headers) - return self.response((location, location, location)) - - -class Forbidden(Response): - _status = '403 Forbidden' - _template = "Not allowed to mess with: '%s'" - - -class BadRequest(Response): - _status = "400 Bad Request" - _template = "%s" - - -class Unauthorized(Response): - _status = "401 Unauthorized" - _template = "%s" - - -class NotFound(Response): - _status = '404 NOT FOUND' - - -class NotSupported(Response): - _status = '405 Not Support' - - -class NotAcceptable(Response): - _status = '406 Not Acceptable' - - -class ServiceError(Response): - _status = '500 Internal Service Error' - - -class InvalidCookieSign(Exception): - pass - - -R2C = { - 200: Response, - 201: Created, - 202: Accepted, - 203: NonAuthoritativeInformation, - 204: NoContent, - 302: Redirect, - 303: SeeOther, - 400: BadRequest, - 401: Unauthorized, - 403: Forbidden, - 404: NotFound, - 405: NotSupported, - 406: NotAcceptable, - 500: ServiceError, -} - - -def factory(code, message, **kwargs): - return R2C[code](message, **kwargs) - - -def extract(environ, empty=False, err=False): - """Extracts strings in form data and returns a dict. - - :param environ: WSGI environ - :param empty: Stops on empty fields (default: Fault) - :param err: Stops on errors in fields (default: Fault) - """ - formdata = cgi.parse(environ['wsgi.input'], environ, empty, err) - # Remove single entries from lists - for key, value in formdata.iteritems(): - if len(value) == 1: - formdata[key] = value[0] - return formdata - - -def geturl(environ, query=True, path=True): - """Rebuilds a request URL (from PEP 333). - - :param query: Is QUERY_STRING included in URI (default: True) - :param path: Is path included in URI (default: True) - """ - url = [environ['wsgi.url_scheme'] + '://'] - if environ.get('HTTP_HOST'): - url.append(environ['HTTP_HOST']) - else: - url.append(environ['SERVER_NAME']) - if environ['wsgi.url_scheme'] == 'https': - if environ['SERVER_PORT'] != '443': - url.append(':' + environ['SERVER_PORT']) - else: - if environ['SERVER_PORT'] != '80': - url.append(':' + environ['SERVER_PORT']) - if path: - url.append(getpath(environ)) - if query and environ.get('QUERY_STRING'): - url.append('?' + environ['QUERY_STRING']) - return ''.join(url) - - -def getpath(environ): - """Builds a path.""" - return ''.join([quote(environ.get('SCRIPT_NAME', '')), - quote(environ.get('PATH_INFO', ''))]) - - -def _expiration(timeout, time_format=None): - if timeout == "now": - return time_util.instant(time_format) - else: - # validity time should match lifetime of assertions - return time_util.in_a_while(minutes=timeout, time_format=time_format) - - -def cookie_signature(key, *parts): - """Generates a cookie signature. - - :param key: The HMAC key to use. - :type key: bytes - :param parts: List of parts to include in the MAC - :type parts: list of bytes or strings - :returns: hexdigest of the HMAC - """ - assert isinstance(key, binary_type) - sha1 = hmac.new(key, digestmod=hashlib.sha1) - for part in parts: - if part: - if isinstance(part, text_type): - sha1.update(part.encode('utf-8')) - else: - sha1.update(part) - return text_type(sha1.hexdigest()) - - -def verify_cookie_signature(sig, key, *parts): - """Constant time verifier for signatures - - :param sig: The signature hexdigest to check - :type sig: text_type - :param key: The HMAC key to use. - :type key: bytes - :param parts: List of parts to include in the MAC - :type parts: list of bytes or strings - :raises: `InvalidCookieSign` when the signature is wrong - """ - assert isinstance(sig, text_type) - return hmac.compare_digest(sig, cookie_signature(key, *parts)) - - -def _make_hashed_key(parts, hashfunc='sha256'): - """ - Construct a key via hashing the parts - - If the parts do not have enough entropy of their - own, this doesn't help. - - The size of the hash digest determines the size. - """ - h = hashlib.new(hashfunc) - for part in parts: - if isinstance(part, text_type): - part = part.encode('utf-8') - if part: - h.update(part) - return h.digest() - - -def make_cookie(name, load, seed, expire=0, domain="", path="", timestamp="", - enc_key=None): - """ - Create and return a cookie - - :param name: Cookie name - :param load: Cookie load - :param seed: A seed key for the HMAC function - :param expire: Number of minutes before this cookie goes stale - :param domain: The domain of the cookie - :param path: The path specification for the cookie - :param timestamp: A time stamp - :param enc_key: The key to use for cookie encryption. - :return: A tuple to be added to headers - """ - cookie = SimpleCookie() - if not timestamp: - timestamp = str(int(time.time())) - - bytes_load = load.encode("utf-8") - bytes_timestamp = timestamp.encode("utf-8") - - # If we have an encryption key, we use an AEAD cipher instead of - # building our own encrypt-and-mac scheme badly. - if enc_key: - # Make sure the key is 256-bit long, for AES-128-SIV - # - # This should go away once we push the keysize requirements up - # to the top level APIs. - key = _make_hashed_key((enc_key, seed)) - - # Random 128-Bit IV - iv = os.urandom(16) - - crypt = AEAD(key, iv) - - # timestamp does not need to be encrypted, just MAC'ed, - # so we add it to 'Associated Data' only. - crypt.add_associated_data(bytes_timestamp) - - ciphertext, tag = crypt.encrypt_and_tag(bytes_load) - cookie_payload = [bytes_timestamp, - base64.b64encode(iv), - base64.b64encode(ciphertext), - base64.b64encode(tag)] - else: - cookie_payload = [ - bytes_load, bytes_timestamp, - cookie_signature(seed, load, timestamp).encode('utf-8')] - - cookie[name] = (b"|".join(cookie_payload)).decode('utf-8') - if path: - cookie[name]["path"] = path - if domain: - cookie[name]["domain"] = domain - if expire: - cookie[name]["expires"] = _expiration(expire, - "%a, %d-%b-%Y %H:%M:%S GMT") - - return tuple(cookie.output().split(": ", 1)) - - -def parse_cookie(name, seed, kaka, enc_key=None): - """Parses and verifies a cookie value - - :param seed: A seed used for the HMAC signature - :param kaka: The cookie - :return: A tuple consisting of (payload, timestamp) - """ - if not kaka: - return None - cookie_obj = SimpleCookie(text_type(kaka)) - morsel = cookie_obj.get(name) - - if isinstance(seed, text_type): - seed = seed.encode('utf-8') - - if morsel: - parts = morsel.value.split("|") - if len(parts) == 3: - # verify the cookie signature - cleartext, timestamp, sig = parts - if not verify_cookie_signature(sig, seed, cleartext, timestamp): - raise InvalidCookieSign() - return cleartext, timestamp - elif len(parts) == 4: - # encrypted and signed - timestamp = parts[0] - iv = base64.b64decode(parts[1]) - ciphertext = base64.b64decode(parts[2]) - tag = base64.b64decode(parts[3]) - - # Make sure the key is 32-Bytes long - key = _make_hashed_key((enc_key, seed)) - - crypt = AEAD(key, iv) - # timestamp does not need to be encrypted, just MAC'ed, - # so we add it to 'Associated Data' only. - crypt.add_associated_data(timestamp.encode('utf-8')) - try: - cleartext = crypt.decrypt_and_verify(ciphertext, tag) - except AESError: - raise InvalidCookieSign() - return cleartext.decode('utf-8'), timestamp - else: - return None - else: - return None - - -def cookie_parts(name, kaka): - cookie_obj = SimpleCookie(text_type(kaka)) - morsel = cookie_obj.get(name) - if morsel: - return morsel.value.split("|") - else: - return None - - -def get_post(environ): - # the environment variable CONTENT_LENGTH may be empty or missing - try: - request_body_size = int(environ.get('CONTENT_LENGTH', 0)) - except ValueError: - request_body_size = 0 - - # When the method is POST the query string will be sent - # in the HTTP request body which is passed by the WSGI server - # in the file like wsgi.input environment variable. - text = environ['wsgi.input'].read(request_body_size) - try: - text = text.decode("utf-8") - except AttributeError: - pass - return text - - -def get_or_post(environ): - _method = environ["REQUEST_METHOD"] - - if _method == "GET": - data = environ.get("QUERY_STRING", "") - elif _method == "POST": - data = get_post(environ) - else: - raise UnsupportedMethod(_method) - - return data - - -def extract_from_request(environ, kwargs=None): - if kwargs is None: - kwargs = {} - - request = None - try: - request = environ["QUERY_STRING"] - except KeyError: - pass - if not request: - try: - request = as_unicode(get_post(environ)) - except KeyError: - pass - kwargs["request"] = request - # authentication information - try: - kwargs["authn"] = environ["HTTP_AUTHORIZATION"] - except KeyError: - pass - try: - kwargs["cookie"] = environ["HTTP_COOKIE"] - except KeyError: - pass - - # intended audience - kwargs["requrl"] = geturl(environ) - kwargs["url"] = geturl(environ, query=False) - kwargs["baseurl"] = geturl(environ, query=False, path=False) - kwargs["path"] = getpath(environ) - return kwargs - - -def wsgi_wrapper(environ, start_response, func, **kwargs): - kwargs = extract_from_request(environ, kwargs) - args = func(**kwargs) - - try: - resp, state = args - return resp(environ, start_response) - except TypeError: - resp = args - return resp(environ, start_response) - except Exception as err: - logger.error("%s" % err) - raise - - -class CookieDealer(object): - def getServer(self): - return self._srv - - def setServer(self, server): - self._srv = server - - srv = property(getServer, setServer) - - def __init__(self, srv, ttl=5): - self.srv = None - self.init_srv(srv) - # minutes before the interaction should be completed - self.cookie_ttl = ttl # N minutes - - def init_srv(self, srv): - if not srv: - return - self.srv = srv - - symkey = getattr(self.srv, 'symkey', None) - if symkey is not None and symkey == "": - msg = "CookieDealer.srv.symkey cannot be an empty value" - raise ImproperlyConfigured(msg) - - if not getattr(srv, 'seed', None): - setattr(srv, 'seed', rndstr().encode("utf-8")) - - def delete_cookie(self, cookie_name=None): - if cookie_name is None: - cookie_name = self.srv.cookie_name - return self.create_cookie("", "", cookie_name=cookie_name, ttl=-1, - kill=True) - - def create_cookie(self, value, typ, cookie_name=None, ttl=-1, kill=False): - if kill: - ttl = -1 - elif ttl < 0: - ttl = self.cookie_ttl - if cookie_name is None: - cookie_name = self.srv.cookie_name - timestamp = str(int(time.time())) - try: - _msg = "::".join([value, timestamp, typ]) - except TypeError: - _msg = "::".join([value[0], timestamp, typ]) - - cookie = make_cookie(cookie_name, _msg, self.srv.seed, - expire=ttl, domain="", path="", - timestamp=timestamp, - enc_key=self.srv.symkey) - if PY2: - return str(cookie[0]), str(cookie[1]) - else: - return cookie - - def getCookieValue(self, cookie=None, cookie_name=None): - return self.get_cookie_value(cookie, cookie_name) - - def get_cookie_value(self, cookie=None, cookie_name=None): - """ - Return information stored in the Cookie - - :param cookie: - :param cookie_name: The name of the cookie I'm looking for - :return: tuple (value, timestamp, type) - """ - if cookie is None or cookie_name is None: - return None - else: - try: - info, timestamp = parse_cookie(cookie_name, - self.srv.seed, cookie, - self.srv.symkey) - except (TypeError, AssertionError): - return None - else: - value, _ts, typ = info.split("::") - if timestamp == _ts: - return value, _ts, typ - return None +from future.backports.http.cookies import SimpleCookie +from future.backports.urllib.parse import quote + +import base64 +import cgi +import hashlib +import hmac +import logging +import os +import time + +from jwkest import as_unicode +from six import PY2 +from six import binary_type +from six import text_type + +from oic import rndstr +from oic.exception import ImproperlyConfigured +from oic.exception import UnsupportedMethod +from oic.utils import time_util +from oic.utils.aes import AEAD +from oic.utils.aes import AESError + +__author__ = 'rohe0002' + +logger = logging.getLogger(__name__) + +SUCCESSFUL = [200, 201, 202, 203, 204, 205, 206] + +CORS_HEADERS = [ + ("Access-Control-Allow-Origin", "*"), + ("Access-Control-Allow-Methods", "GET"), + ("Access-Control-Allow-Headers", "Authorization") +] + + +class Response(object): + _template = None + _status = '200 OK' + _content_type = 'text/html' + _mako_template = None + _mako_lookup = None + + def __init__(self, message=None, **kwargs): + self.status = kwargs.get("status", self._status) + self.response = kwargs.get("response", self._response) + self.template = kwargs.get("template", self._template) + self.mako_template = kwargs.get("mako_template", self._mako_template) + self.mako_lookup = kwargs.get("template_lookup", self._mako_lookup) + + self.message = message + + self.headers = [] + self.headers.extend(kwargs.get("headers", [])) + _content_type = kwargs.get("content", self._content_type) + + self.headers.append(("Content-type", _content_type)) + + def __call__(self, environ, start_response, **kwargs): + start_response(self.status, self.headers) + return self.response(self.message, **kwargs) + + def _response(self, message="", **argv): + # Have to be more specific, this might be a bit to much. + if message: + try: + if '', '</script>') + except TypeError: + if b'', b'</script>') + + if self.template: + if ("Content-type", "application/json") in self.headers: + return [message.encode("utf-8")] + else: + return [str(self.template % message).encode("utf-8")] + elif self.mako_lookup and self.mako_template: + argv["message"] = message + mte = self.mako_lookup.get_template(self.mako_template) + return [mte.render(**argv)] + else: + if [x for x in self._c_types() if x.startswith('image/')]: + return [message] + elif [x for x in self._c_types() if x == 'application/x-gzip']: + return [message] + + try: + return [message.encode("utf-8")] + except AttributeError: + return [message] + + def info(self): + return {'status': self.status, 'headers': self.headers, + 'message': self.message} + + def add_header(self, ava): + self.headers.append(ava) + + def reply(self, **kwargs): + return self.response(self.message, **kwargs) + + def _c_types(self): + return [y for x, y in self.headers if x == "Content-type"] + + +class Created(Response): + _status = "201 Created" + + +class Accepted(Response): + _status = "202 Accepted" + + +class NonAuthoritativeInformation(Response): + _status = "203 Non Authoritative Information" + + +class NoContent(Response): + _status = "204 No Content" + + +class Redirect(Response): + _template = '\nRedirecting to %s\n' \ + '\nYou are being redirected to %s\n' \ + '\n' + _status = '302 Found' + + def __call__(self, environ, start_response, **kwargs): + location = self.message + self.headers.append(('location', location)) + start_response(self.status, self.headers) + return self.response((location, location, location)) + + +class SeeOther(Response): + _template = '\nRedirecting to %s\n' \ + '\nYou are being redirected to %s\n' \ + '\n' + _status = '303 See Other' + + def __call__(self, environ, start_response, **kwargs): + location = self.message + if PY2: + try: + location = location.encode('utf8') + except UnicodeDecodeError: + pass + self.headers.append(('location', location)) + start_response(self.status, self.headers) + return self.response((location, location, location)) + + +class Forbidden(Response): + _status = '403 Forbidden' + _template = "Not allowed to mess with: '%s'" + + +class BadRequest(Response): + _status = "400 Bad Request" + _template = "%s" + + +class Unauthorized(Response): + _status = "401 Unauthorized" + _template = "%s" + + +class NotFound(Response): + _status = '404 NOT FOUND' + + +class NotSupported(Response): + _status = '405 Not Support' + + +class NotAcceptable(Response): + _status = '406 Not Acceptable' + + +class ServiceError(Response): + _status = '500 Internal Service Error' + + +class InvalidCookieSign(Exception): + pass + + +R2C = { + 200: Response, + 201: Created, + 202: Accepted, + 203: NonAuthoritativeInformation, + 204: NoContent, + 302: Redirect, + 303: SeeOther, + 400: BadRequest, + 401: Unauthorized, + 403: Forbidden, + 404: NotFound, + 405: NotSupported, + 406: NotAcceptable, + 500: ServiceError, +} + + +def factory(code, message, **kwargs): + return R2C[code](message, **kwargs) + + +def extract(environ, empty=False, err=False): + """Extracts strings in form data and returns a dict. + + :param environ: WSGI environ + :param empty: Stops on empty fields (default: Fault) + :param err: Stops on errors in fields (default: Fault) + """ + formdata = cgi.parse(environ['wsgi.input'], environ, empty, err) + # Remove single entries from lists + for key, value in formdata.iteritems(): + if len(value) == 1: + formdata[key] = value[0] + return formdata + + +def geturl(environ, query=True, path=True): + """Rebuilds a request URL (from PEP 333). + + :param query: Is QUERY_STRING included in URI (default: True) + :param path: Is path included in URI (default: True) + """ + url = [environ['wsgi.url_scheme'] + '://'] + if environ.get('HTTP_HOST'): + url.append(environ['HTTP_HOST']) + else: + url.append(environ['SERVER_NAME']) + if environ['wsgi.url_scheme'] == 'https': + if environ['SERVER_PORT'] != '443': + url.append(':' + environ['SERVER_PORT']) + else: + if environ['SERVER_PORT'] != '80': + url.append(':' + environ['SERVER_PORT']) + if path: + url.append(getpath(environ)) + if query and environ.get('QUERY_STRING'): + url.append('?' + environ['QUERY_STRING']) + return ''.join(url) + + +def getpath(environ): + """Builds a path.""" + return ''.join([quote(environ.get('SCRIPT_NAME', '')), + quote(environ.get('PATH_INFO', ''))]) + + +def _expiration(timeout, time_format=None): + if timeout == "now": + return time_util.instant(time_format) + else: + # validity time should match lifetime of assertions + return time_util.in_a_while(minutes=timeout, time_format=time_format) + + +def cookie_signature(key, *parts): + """Generates a cookie signature. + + :param key: The HMAC key to use. + :type key: bytes + :param parts: List of parts to include in the MAC + :type parts: list of bytes or strings + :returns: hexdigest of the HMAC + """ + assert isinstance(key, binary_type) + sha1 = hmac.new(key, digestmod=hashlib.sha1) + for part in parts: + if part: + if isinstance(part, text_type): + sha1.update(part.encode('utf-8')) + else: + sha1.update(part) + return text_type(sha1.hexdigest()) + + +def verify_cookie_signature(sig, key, *parts): + """Constant time verifier for signatures + + :param sig: The signature hexdigest to check + :type sig: text_type + :param key: The HMAC key to use. + :type key: bytes + :param parts: List of parts to include in the MAC + :type parts: list of bytes or strings + :raises: `InvalidCookieSign` when the signature is wrong + """ + assert isinstance(sig, text_type) + return hmac.compare_digest(sig, cookie_signature(key, *parts)) + + +def _make_hashed_key(parts, hashfunc='sha256'): + """ + Construct a key via hashing the parts + + If the parts do not have enough entropy of their + own, this doesn't help. + + The size of the hash digest determines the size. + """ + h = hashlib.new(hashfunc) + for part in parts: + if isinstance(part, text_type): + part = part.encode('utf-8') + if part: + h.update(part) + return h.digest() + + +def make_cookie(name, load, seed, expire=0, domain="", path="", timestamp="", + enc_key=None): + """ + Create and return a cookie + + :param name: Cookie name + :param load: Cookie load + :param seed: A seed key for the HMAC function + :param expire: Number of minutes before this cookie goes stale + :param domain: The domain of the cookie + :param path: The path specification for the cookie + :param timestamp: A time stamp + :param enc_key: The key to use for cookie encryption. + :return: A tuple to be added to headers + """ + cookie = SimpleCookie() + if not timestamp: + timestamp = str(int(time.time())) + + bytes_load = load.encode("utf-8") + bytes_timestamp = timestamp.encode("utf-8") + + # If we have an encryption key, we use an AEAD cipher instead of + # building our own encrypt-and-mac scheme badly. + if enc_key: + # Make sure the key is 256-bit long, for AES-128-SIV + # + # This should go away once we push the keysize requirements up + # to the top level APIs. + key = _make_hashed_key((enc_key, seed)) + + # Random 128-Bit IV + iv = os.urandom(16) + + crypt = AEAD(key, iv) + + # timestamp does not need to be encrypted, just MAC'ed, + # so we add it to 'Associated Data' only. + crypt.add_associated_data(bytes_timestamp) + + ciphertext, tag = crypt.encrypt_and_tag(bytes_load) + cookie_payload = [bytes_timestamp, + base64.b64encode(iv), + base64.b64encode(ciphertext), + base64.b64encode(tag)] + else: + cookie_payload = [ + bytes_load, bytes_timestamp, + cookie_signature(seed, load, timestamp).encode('utf-8')] + + cookie[name] = (b"|".join(cookie_payload)).decode('utf-8') + if path: + cookie[name]["path"] = path + if domain: + cookie[name]["domain"] = domain + if expire: + cookie[name]["expires"] = _expiration(expire, + "%a, %d-%b-%Y %H:%M:%S GMT") + + return tuple(cookie.output().split(": ", 1)) + + +def parse_cookie(name, seed, kaka, enc_key=None): + """Parses and verifies a cookie value + + :param seed: A seed used for the HMAC signature + :param kaka: The cookie + :return: A tuple consisting of (payload, timestamp) + """ + if not kaka: + return None + cookie_obj = SimpleCookie(text_type(kaka)) + morsel = cookie_obj.get(name) + + if isinstance(seed, text_type): + seed = seed.encode('utf-8') + + if morsel: + parts = morsel.value.split("|") + if len(parts) == 3: + # verify the cookie signature + cleartext, timestamp, sig = parts + if not verify_cookie_signature(sig, seed, cleartext, timestamp): + raise InvalidCookieSign() + return cleartext, timestamp + elif len(parts) == 4: + # encrypted and signed + timestamp = parts[0] + iv = base64.b64decode(parts[1]) + ciphertext = base64.b64decode(parts[2]) + tag = base64.b64decode(parts[3]) + + # Make sure the key is 32-Bytes long + key = _make_hashed_key((enc_key, seed)) + + crypt = AEAD(key, iv) + # timestamp does not need to be encrypted, just MAC'ed, + # so we add it to 'Associated Data' only. + crypt.add_associated_data(timestamp.encode('utf-8')) + try: + cleartext = crypt.decrypt_and_verify(ciphertext, tag) + except AESError: + raise InvalidCookieSign() + return cleartext.decode('utf-8'), timestamp + else: + return None + else: + return None + + +def cookie_parts(name, kaka): + cookie_obj = SimpleCookie(text_type(kaka)) + morsel = cookie_obj.get(name) + if morsel: + return morsel.value.split("|") + else: + return None + + +def get_post(environ): + # the environment variable CONTENT_LENGTH may be empty or missing + try: + request_body_size = int(environ.get('CONTENT_LENGTH', 0)) + except ValueError: + request_body_size = 0 + + # When the method is POST the query string will be sent + # in the HTTP request body which is passed by the WSGI server + # in the file like wsgi.input environment variable. + text = environ['wsgi.input'].read(request_body_size) + try: + text = text.decode("utf-8") + except AttributeError: + pass + return text + + +def get_or_post(environ): + _method = environ["REQUEST_METHOD"] + + if _method == "GET": + data = environ.get("QUERY_STRING", "") + elif _method == "POST": + data = get_post(environ) + else: + raise UnsupportedMethod(_method) + + return data + + +def extract_from_request(environ, kwargs=None): + if kwargs is None: + kwargs = {} + + request = None + try: + request = environ["QUERY_STRING"] + except KeyError: + pass + if not request: + try: + request = as_unicode(get_post(environ)) + except KeyError: + pass + kwargs["request"] = request + # authentication information + try: + kwargs["authn"] = environ["HTTP_AUTHORIZATION"] + except KeyError: + pass + try: + kwargs["cookie"] = environ["HTTP_COOKIE"] + except KeyError: + pass + + # intended audience + kwargs["requrl"] = geturl(environ) + kwargs["url"] = geturl(environ, query=False) + kwargs["baseurl"] = geturl(environ, query=False, path=False) + kwargs["path"] = getpath(environ) + return kwargs + + +def wsgi_wrapper(environ, start_response, func, **kwargs): + kwargs = extract_from_request(environ, kwargs) + args = func(**kwargs) + + try: + resp, state = args + return resp(environ, start_response) + except TypeError: + resp = args + return resp(environ, start_response) + except Exception as err: + logger.error("%s" % err) + raise + + +class CookieDealer(object): + def getServer(self): + return self._srv + + def setServer(self, server): + self._srv = server + + srv = property(getServer, setServer) + + def __init__(self, srv, ttl=5): + self.srv = None + self.init_srv(srv) + # minutes before the interaction should be completed + self.cookie_ttl = ttl # N minutes + + def init_srv(self, srv): + if not srv: + return + self.srv = srv + + symkey = getattr(self.srv, 'symkey', None) + if symkey is not None and symkey == "": + msg = "CookieDealer.srv.symkey cannot be an empty value" + raise ImproperlyConfigured(msg) + + if not getattr(srv, 'seed', None): + setattr(srv, 'seed', rndstr().encode("utf-8")) + + def delete_cookie(self, cookie_name=None): + if cookie_name is None: + cookie_name = self.srv.cookie_name + return self.create_cookie("", "", cookie_name=cookie_name, ttl=-1, + kill=True) + + def create_cookie(self, value, typ, cookie_name=None, ttl=-1, kill=False): + if kill: + ttl = -1 + elif ttl < 0: + ttl = self.cookie_ttl + if cookie_name is None: + cookie_name = self.srv.cookie_name + timestamp = str(int(time.time())) + try: + _msg = "::".join([value, timestamp, typ]) + except TypeError: + _msg = "::".join([value[0], timestamp, typ]) + + cookie = make_cookie(cookie_name, _msg, self.srv.seed, + expire=ttl, domain="", path="", + timestamp=timestamp, + enc_key=self.srv.symkey) + if PY2: + return str(cookie[0]), str(cookie[1]) + else: + return cookie + + def getCookieValue(self, cookie=None, cookie_name=None): + return self.get_cookie_value(cookie, cookie_name) + + def get_cookie_value(self, cookie=None, cookie_name=None): + """ + Return information stored in the Cookie + + :param cookie: + :param cookie_name: The name of the cookie I'm looking for + :return: tuple (value, timestamp, type) + """ + if cookie is None or cookie_name is None: + return None + else: + try: + info, timestamp = parse_cookie(cookie_name, + self.srv.seed, cookie, + self.srv.symkey) + except (TypeError, AssertionError): + return None + else: + value, _ts, typ = info.split("::") + if timestamp == _ts: + return value, _ts, typ + return None diff --git a/tests/test_aes.py b/tests/test_aes.py index f4b8816b8..bf4fc3a1a 100644 --- a/tests/test_aes.py +++ b/tests/test_aes.py @@ -1,61 +1,60 @@ -import os -import pytest - -from oic.utils.aes import decrypt -from oic.utils.aes import encrypt - -from oic.utils.aes import AEAD -from oic.utils.aes import AESError - - -def test_encrypt_decrypt(): - key_ = b"1234523451234545" # 16 byte key - # Iff padded the message doesn't have to be multiple of 16 in length - msg_ = "ToBeOrNotTobe W.S." - iv_ = os.urandom(16) - encrypted_msg = encrypt(key_, msg_, iv_) - txt = decrypt(key_, encrypted_msg, iv_) - assert txt == msg_ - - encrypted_msg = encrypt(key_, msg_, 0) - txt = decrypt(key_, encrypted_msg, 0) - assert txt == msg_ - - -def test_AEAD_good(): - key = os.urandom(32) - iv = os.urandom(16) - cleartext = b"secret sauce" - extra = ["some", "extra", "data"] - k = AEAD(key, iv) - for d in extra: - k.add_associated_data(d) - ciphertext, tag = k.encrypt_and_tag(cleartext) - - # get a fresh AEAD object - c = AEAD(key, iv) - for d in extra: - c.add_associated_data(d) - cleartext2 = c.decrypt_and_verify(ciphertext, tag) - assert cleartext2 == cleartext - - -def test_AEAD_bad_aad(): - key = os.urandom(32) - iv = os.urandom(16) - cleartext = b"secret sauce" - extra = ["some", "extra", "data"] - k = AEAD(key, iv) - for d in extra: - k.add_associated_data(d) - ciphertext, tag = k.encrypt_and_tag(cleartext) - - # get a fresh AEAD object - c = AEAD(key, iv) - # skip one aad item, MAC is wrong now - for d in extra[:1]: - c.add_associated_data(d) - - with pytest.raises(AESError): - c.decrypt_and_verify(ciphertext, tag) - +import os + +import pytest + +from oic.utils.aes import AEAD +from oic.utils.aes import AESError +from oic.utils.aes import decrypt +from oic.utils.aes import encrypt + + +def test_encrypt_decrypt(): + key_ = b"1234523451234545" # 16 byte key + # Iff padded the message doesn't have to be multiple of 16 in length + msg_ = "ToBeOrNotTobe W.S." + iv_ = os.urandom(16) + encrypted_msg = encrypt(key_, msg_, iv_) + txt = decrypt(key_, encrypted_msg, iv_) + assert txt == msg_ + + encrypted_msg = encrypt(key_, msg_, 0) + txt = decrypt(key_, encrypted_msg, 0) + assert txt == msg_ + + +def test_AEAD_good(): + key = os.urandom(32) + iv = os.urandom(16) + cleartext = b"secret sauce" + extra = ["some", "extra", "data"] + k = AEAD(key, iv) + for d in extra: + k.add_associated_data(d) + ciphertext, tag = k.encrypt_and_tag(cleartext) + + # get a fresh AEAD object + c = AEAD(key, iv) + for d in extra: + c.add_associated_data(d) + cleartext2 = c.decrypt_and_verify(ciphertext, tag) + assert cleartext2 == cleartext + + +def test_AEAD_bad_aad(): + key = os.urandom(32) + iv = os.urandom(16) + cleartext = b"secret sauce" + extra = ["some", "extra", "data"] + k = AEAD(key, iv) + for d in extra: + k.add_associated_data(d) + ciphertext, tag = k.encrypt_and_tag(cleartext) + + # get a fresh AEAD object + c = AEAD(key, iv) + # skip one aad item, MAC is wrong now + for d in extra[:1]: + c.add_associated_data(d) + + with pytest.raises(AESError): + c.decrypt_and_verify(ciphertext, tag) diff --git a/tests/test_http_util.py b/tests/test_http_util.py index 11f31490c..694dff68b 100644 --- a/tests/test_http_util.py +++ b/tests/test_http_util.py @@ -1,160 +1,161 @@ -import datetime - -import pytest - -from six import binary_type -from oic.exception import ImproperlyConfigured -from oic.utils.http_util import CookieDealer -from oic.utils.http_util import Response -from oic.utils.http_util import cookie_parts -from oic.utils.http_util import getpath -from oic.utils.http_util import geturl -from oic.utils.http_util import parse_cookie -from oic.utils.http_util import InvalidCookieSign -from oic.utils.http_util import cookie_signature, verify_cookie_signature - -__author__ = 'roland' - - -class TestResponse(object): - def test_response(self): - response_header = ("X-Test", "foobar") - message = "foo bar" - - def start_response(status, headers): - assert status == "200 OK" - assert response_header in headers - - resp = Response(message, headers=[response_header]) - result = resp({}, start_response) - assert result == [message.encode('utf8')] - - def test_escaped(self): - template = '%s' - response_header = ("XSS-Test", "script") - message = '' - - def start_response(status, headers): - assert status == "200 OK" - assert response_header in headers - - resp = Response(message=message, headers=[response_header], template=template) - assert resp({}, start_response) == ['<script>alert("hi");</script>'.encode('utf8')] - - -@pytest.fixture -def cookie_dealer(): - class DummyServer(): - def __init__(self): - self.symkey = b"0123456789012345" - - return CookieDealer(DummyServer()) - - -class TestCookieDealer(object): - def test_create_cookie_value(self, cookie_dealer): - cookie_value = "Something to pass along" - cookie_typ = "sso" - cookie_name = "Foobar" - - kaka = cookie_dealer.create_cookie(cookie_value, cookie_typ, - cookie_name) - result = cookie_dealer.get_cookie_value(kaka[1], "Foobar") - - value, timestamp, typ = cookie_dealer.get_cookie_value(kaka[1], - "Foobar") - - assert (value, typ) == (cookie_value, cookie_typ) - - def test_delete_cookie(self, cookie_dealer): - cookie_name = "Foobar" - kaka = cookie_dealer.delete_cookie(cookie_name) - cookie_expiration = kaka[1].split(";")[1].split("=")[1] - - now = datetime.datetime.utcnow() # - cookie_timestamp = datetime.datetime.strptime( - cookie_expiration, "%a, %d-%b-%Y %H:%M:%S GMT") - assert cookie_timestamp < now - - def test_cookie_dealer_improperly_configured(self): - class BadServer(): - def __init__(self): - self.symkey = "" - with pytest.raises(ImproperlyConfigured) as err: - CookieDealer(BadServer()) - expected_msg = "CookieDealer.srv.symkey cannot be an empty value" - assert expected_msg in str(err.value) - - -def test_cookie_signature(): - key = b'1234567890abcdef' - parts = ['abc', 'def'] - sig = cookie_signature(key, *parts) - assert verify_cookie_signature(sig, key, *parts) - - -def test_broken_cookie_signature(): - key = b'1234567890abcdef' - parts = ['abc', 'def'] - sig = cookie_signature(key, *parts) - parts.reverse() - assert not verify_cookie_signature(sig, key, *parts) - - -def test_parse_cookie(): - kaka = ('pyoidc=bjmc::1463043535::upm|' - '1463043535|18a201305fa15a96ce4048e1fbb03f7715f86499') - seed = '' - name = 'pyoidc' - result = parse_cookie(name, seed, kaka) - assert result == ('bjmc::1463043535::upm', '1463043535') - - -def test_parse_manipulated_cookie_payload(): - kaka = ('pyoidc=bjmc::1463043536::upm|' - '1463043535|18a201305fa15a96ce4048e1fbb03f7715f86499') - seed = '' - name = 'pyoidc' - with pytest.raises(InvalidCookieSign) as err: - parse_cookie(name, seed, kaka) - - -def test_parse_manipulated_cookie_timestamp(): - kaka = ('pyoidc=bjmc::1463043535::upm|' - '1463043537|18a201305fa15a96ce4048e1fbb03f7715f86499') - seed = '' - name = 'pyoidc' - with pytest.raises(InvalidCookieSign) as err: - parse_cookie(name, seed, kaka) - - -def test_cookie_parts(): - name = 'pyoidc' - kaka = ('pyoidc=bjmc::1463043535::upm|' - '1463043535|18a201305fa15a96ce4048e1fbb03f7715f86499') - result = cookie_parts(name, kaka) - assert result == ['bjmc::1463043535::upm', - '1463043535', - '18a201305fa15a96ce4048e1fbb03f7715f86499'] - - -def test_geturl(): - environ = { - "wsgi.url_scheme": "http", - "SERVER_NAME": "example.com", - "SERVER_PORT": "80", - "SCRIPT_NAME": "/foo", - "PATH_INFO": "/bar", - "QUERY_STRING": "baz=xyz" - } - - assert geturl(environ) == "http://example.com/foo/bar?baz=xyz" - - -def test_getpath(): - environ = { - "SCRIPT_NAME": "/foo", - "PATH_INFO": "/bar", - } - - assert getpath(environ) == "/foo/bar" +import datetime + +import pytest +from six import binary_type + +from oic.exception import ImproperlyConfigured +from oic.utils.http_util import CookieDealer +from oic.utils.http_util import InvalidCookieSign +from oic.utils.http_util import Response +from oic.utils.http_util import cookie_parts +from oic.utils.http_util import cookie_signature +from oic.utils.http_util import getpath +from oic.utils.http_util import geturl +from oic.utils.http_util import parse_cookie +from oic.utils.http_util import verify_cookie_signature + +__author__ = 'roland' + + +class TestResponse(object): + def test_response(self): + response_header = ("X-Test", "foobar") + message = "foo bar" + + def start_response(status, headers): + assert status == "200 OK" + assert response_header in headers + + resp = Response(message, headers=[response_header]) + result = resp({}, start_response) + assert result == [message.encode('utf8')] + + def test_escaped(self): + template = '%s' + response_header = ("XSS-Test", "script") + message = '' + + def start_response(status, headers): + assert status == "200 OK" + assert response_header in headers + + resp = Response(message=message, headers=[response_header], template=template) + assert resp({}, start_response) == ['<script>alert("hi");</script>'.encode('utf8')] + + +@pytest.fixture +def cookie_dealer(): + class DummyServer(): + def __init__(self): + self.symkey = b"0123456789012345" + + return CookieDealer(DummyServer()) + + +class TestCookieDealer(object): + def test_create_cookie_value(self, cookie_dealer): + cookie_value = "Something to pass along" + cookie_typ = "sso" + cookie_name = "Foobar" + + kaka = cookie_dealer.create_cookie(cookie_value, cookie_typ, + cookie_name) + result = cookie_dealer.get_cookie_value(kaka[1], "Foobar") + + value, timestamp, typ = cookie_dealer.get_cookie_value(kaka[1], + "Foobar") + + assert (value, typ) == (cookie_value, cookie_typ) + + def test_delete_cookie(self, cookie_dealer): + cookie_name = "Foobar" + kaka = cookie_dealer.delete_cookie(cookie_name) + cookie_expiration = kaka[1].split(";")[1].split("=")[1] + + now = datetime.datetime.utcnow() # + cookie_timestamp = datetime.datetime.strptime( + cookie_expiration, "%a, %d-%b-%Y %H:%M:%S GMT") + assert cookie_timestamp < now + + def test_cookie_dealer_improperly_configured(self): + class BadServer(): + def __init__(self): + self.symkey = "" + with pytest.raises(ImproperlyConfigured) as err: + CookieDealer(BadServer()) + expected_msg = "CookieDealer.srv.symkey cannot be an empty value" + assert expected_msg in str(err.value) + + +def test_cookie_signature(): + key = b'1234567890abcdef' + parts = ['abc', 'def'] + sig = cookie_signature(key, *parts) + assert verify_cookie_signature(sig, key, *parts) + + +def test_broken_cookie_signature(): + key = b'1234567890abcdef' + parts = ['abc', 'def'] + sig = cookie_signature(key, *parts) + parts.reverse() + assert not verify_cookie_signature(sig, key, *parts) + + +def test_parse_cookie(): + kaka = ('pyoidc=bjmc::1463043535::upm|' + '1463043535|18a201305fa15a96ce4048e1fbb03f7715f86499') + seed = '' + name = 'pyoidc' + result = parse_cookie(name, seed, kaka) + assert result == ('bjmc::1463043535::upm', '1463043535') + + +def test_parse_manipulated_cookie_payload(): + kaka = ('pyoidc=bjmc::1463043536::upm|' + '1463043535|18a201305fa15a96ce4048e1fbb03f7715f86499') + seed = '' + name = 'pyoidc' + with pytest.raises(InvalidCookieSign) as err: + parse_cookie(name, seed, kaka) + + +def test_parse_manipulated_cookie_timestamp(): + kaka = ('pyoidc=bjmc::1463043535::upm|' + '1463043537|18a201305fa15a96ce4048e1fbb03f7715f86499') + seed = '' + name = 'pyoidc' + with pytest.raises(InvalidCookieSign) as err: + parse_cookie(name, seed, kaka) + + +def test_cookie_parts(): + name = 'pyoidc' + kaka = ('pyoidc=bjmc::1463043535::upm|' + '1463043535|18a201305fa15a96ce4048e1fbb03f7715f86499') + result = cookie_parts(name, kaka) + assert result == ['bjmc::1463043535::upm', + '1463043535', + '18a201305fa15a96ce4048e1fbb03f7715f86499'] + + +def test_geturl(): + environ = { + "wsgi.url_scheme": "http", + "SERVER_NAME": "example.com", + "SERVER_PORT": "80", + "SCRIPT_NAME": "/foo", + "PATH_INFO": "/bar", + "QUERY_STRING": "baz=xyz" + } + + assert geturl(environ) == "http://example.com/foo/bar?baz=xyz" + + +def test_getpath(): + environ = { + "SCRIPT_NAME": "/foo", + "PATH_INFO": "/bar", + } + + assert getpath(environ) == "/foo/bar"