Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per domain authorization for certificate issuance #3889

Merged
merged 10 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs/administration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,23 @@ For more information about how to use social logins, see: `Satellizer <https://g

USER_MEMBERSHIP_PROVIDER = "<yourmembershippluginslug>"

Authorization Providers
~~~~~~~~~~~~~~~~~~~~~~~


If you are not using a custom authorization provider you do not need to configure any of these options

.. data:: USER_DOMAIN_AUTHORIZATION_PROVIDER
:noindex:

An optional plugin to perform authorization of domains for which certificate is being issued. Provide plugin slug here.
charhate marked this conversation as resolved.
Show resolved Hide resolved
Plugin is used to check if caller is authorized to issue a certificate for a given Common Name and Subject Alternative
Name (SAN) of type DNSName. Plugin shall be an implementation of DomainAuthorizationPlugin.

::

USER_DOMAIN_AUTHORIZATION_PROVIDER = "<yourauthorizationpluginslug>"

Metric Providers
~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -2015,6 +2032,8 @@ get it added.
Want to create your own extension? See :doc:`../developer/plugins/index` to get started.


.. _iam_target:

Identity and Access Management
==============================

Expand All @@ -2041,3 +2060,30 @@ These permissions are applied to the user upon login and refreshed on every requ
.. seealso::

`Flask-Principal <https://pythonhosted.org/Flask-Principal>`_

To allow integration with external access/membership management tools that may exist in your organization, lemur offers
below plugins in addition to it's own RBAC implementation.

Membership Plugin
-----------------

:Authors:
Sayali Charhate <[email protected]>
:Type:
User Membership
:Description:
Adds support to learn and validate user membership details from an external service. User memberships are used to
create user roled dynamically as described in :ref:`iam_target`. Configure this plugin slug as `USER_MEMBERSHIP_PROVIDER`

Authorization Plugins
---------------------

:Authors:
Sayali Charhate <[email protected]>
:Type:
External Authorization
:Description:
Adds support to implement custom authorization logic that is best suited for your enterprise. Lemur offers `AuthorizationPlugin`
and its extended version `DomainAuthorizationPlugin`. One can implement `DomainAuthorizationPlugin` and configure its
slug as `USER_DOMAIN_AUTHORIZATION_PROVIDER` to check if caller is authorized to issue a certificate for a given Common
Name and Subject Alternative Name (SAN) of type DNSName
1 change: 1 addition & 0 deletions lemur/api_keys/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class ApiKey(db.Model):
ttl = Column(BigInteger)
issued_at = Column(BigInteger)
revoked = Column(Boolean)
application_name = Column(String, nullable=True)

def __repr__(self):
return "ApiKey(name={name}, user_id={user_id}, ttl={ttl}, issued_at={iat}, revoked={revoked})".format(
Expand Down
2 changes: 2 additions & 0 deletions lemur/auth/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ def decorated_function(*args, **kwargs):
expired_time = datetime.fromtimestamp(access_key.issued_at) + timedelta(days=access_key.ttl)
if current_time >= expired_time:
return dict(message="Token has expired"), 403
if access_key.application_name:
g.caller_application = access_key.application_name

user = user_service.get(payload["sub"])

Expand Down
17 changes: 17 additions & 0 deletions lemur/certificates/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from lemur.constants import SUCCESS_METRIC_STATUS, FAILURE_METRIC_STATUS
from lemur.destinations.models import Destination
from lemur.domains.models import Domain
from lemur.domains.service import is_authorized_for_domain
from lemur.endpoints import service as endpoint_service
from lemur.extensions import metrics, signals
from lemur.notifications.messaging import send_revocation_notification
Expand Down Expand Up @@ -1240,3 +1241,19 @@ def is_valid_owner(email):

# expecting owner to be an existing team DL
return user_membership_provider.does_group_exist(email)


def allowed_issuance_for_domain(common_name, extensions):
check_permission_for_cn = True if common_name else False

# authorize issuance for every x509.DNSName SAN
if extensions and extensions.get("sub_alt_names"):
for san in extensions["sub_alt_names"]["names"]:
if isinstance(san, x509.DNSName):
if san.value == common_name:
check_permission_for_cn = False
is_authorized_for_domain(san.value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not returning the outcome of the AuthZ evaluation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It raises UnauthorizedError if authz fails (additional details)


# lemur UI copies CN as SAN (x509.DNSName). Permission check for CN might already be covered above.
if check_permission_for_cn:
is_authorized_for_domain(common_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not returning the outcome of the AuthZ evaluation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

21 changes: 10 additions & 11 deletions lemur/certificates/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from flask import Blueprint, make_response, jsonify, g, current_app
from flask_restful import reqparse, Api, inputs
from lemur.plugins.bases.authorization import UnauthorizedError
from sentry_sdk import capture_exception

from lemur.common.schema import validate_schema
Expand Down Expand Up @@ -511,23 +512,21 @@ def post(self, data=None):
roles.append(role)
authority_permission = AuthorityPermission(data["authority"].id, roles)

if authority_permission.can():
data["creator"] = g.user
if not authority_permission.can():
return dict(message=f"You are not authorized to use the authority: {data['authority'].name}"), 403

data["creator"] = g.user
try:
service.allowed_issuance_for_domain(data["common_name"], data["extensions"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question as above about the outcome of the authZ eval

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It raises UnauthorizedError if authz fails (additional details), which gets caught below to return 403 along with error message

except UnauthorizedError as e:
return dict(message=str(e)), 403
else:
cert = service.create(**data)
if isinstance(cert, Certificate):
# only log if created, not pending
log_service.create(g.user, "create_cert", certificate=cert)
return cert

return (
dict(
message="You are not authorized to use the authority: {0}".format(
data["authority"].name
)
),
403,
)


class CertificatesUpload(AuthenticatedResource):
""" Defines the 'certificates' upload endpoint """
Expand Down
27 changes: 27 additions & 0 deletions lemur/domains/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
"""
from sqlalchemy import and_

from flask import current_app, g

from lemur import database
from lemur.certificates.models import Certificate
from lemur.domains.models import Domain
from lemur.plugins.base import plugins


def get(domain_id):
Expand Down Expand Up @@ -57,6 +60,30 @@ def is_domain_sensitive(name):
return database.find_all(query, Domain, {}).all()


def is_authorized_for_domain(name):
"""
If authorization plugin is available, perform the check to see if current user can issue certificate for a give
charhate marked this conversation as resolved.
Show resolved Hide resolved
domain.
Return True if authorized, False otherwise.
If authorization plugin is not available, return true by default

:param name: domain (string) for which authorization check is being done
:return: True/False
"""
if current_app.config.get("USER_DOMAIN_AUTHORIZATION_PROVIDER") is None:
# nothing to check since USER_DOMAIN_AUTHORIZATION_PROVIDER is not configured
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we want to return True here, per above description

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching, modified description (additional details)


user_domain_authorization_provider = plugins.get(current_app.config.get("USER_DOMAIN_AUTHORIZATION_PROVIDER"))
caller = g.caller_application if hasattr(g, 'caller_application') else g.user.email
# if the caller can be mapped to an application name, use that to perform authorization
# this could be true when using API key to call lemur (migration script e2d406ada25c_.py)
_, error = user_domain_authorization_provider.is_authorized(domain=name, caller=caller)
charhate marked this conversation as resolved.
Show resolved Hide resolved

if error:
raise error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are not returning the outcome if is_authorized()?

Copy link
Contributor Author

@charhate charhate Dec 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding propagation of authz result, it instead relies on raising UnauthorizedError. Raising error with useful details within custom plugin implementation can be useful to return an actionable message when certificate issuance is denied. The exception will get caught at certificate create Post call to return 403 along with the error message.
I did some minor changes to above method

  1. look at authorized boolean, raise UnauthorizedError() if authz returns False
  2. update description to reflect behavior


def create(name, sensitive):
"""
Create a new domain
Expand Down
22 changes: 22 additions & 0 deletions lemur/migrations/versions/e2d406ada25c_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Add column application_name to api_keys table. Null values are allowed.

Revision ID: e2d406ada25c
Revises: 189e5fda5bf8
Create Date: 2021-11-24 14:48:18.747487

"""

# revision identifiers, used by Alembic.
revision = 'e2d406ada25c'
down_revision = '189e5fda5bf8'

from alembic import op
import sqlalchemy as sa


def upgrade():
op.add_column("api_keys", sa.Column("application_name", sa.String(256), nullable=True))


def downgrade():
op.drop_column("api_keys", "application_name")
4 changes: 3 additions & 1 deletion lemur/plugins/bases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@
from .notification import NotificationPlugin, ExpirationNotificationPlugin # noqa
from .export import ExportPlugin # noqa
from .tls import TLSPlugin # noqa
from .membership import MembershipPlugin # noqa
from .membership import MembershipPlugin # noqa
from .authorization import AuthorizationPlugin # noqa
from .authorization import DomainAuthorizationPlugin # noqa
45 changes: 45 additions & 0 deletions lemur/plugins/bases/authorization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""
.. module: lemur.plugins.bases.authorization
:platform: Unix
:copyright: (c) 2021 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.

.. moduleauthor:: Sayali Charhate <[email protected]>
"""

from lemur.exceptions import LemurException
from lemur.plugins.base import Plugin


class AuthorizationPlugin(Plugin):
"""
This is the base class for authorization providers. Check if the caller is authorized to access a resource.
"""
type = "authorization"

def is_authorized(self, resource, caller):
raise NotImplementedError


class DomainAuthorizationPlugin(AuthorizationPlugin):
"""
This is the base class for domain authorization providers. Check if the caller can issue certificates for a domain.
"""
type = "domain-authorization"

def is_authorized(self, domain, caller):
raise NotImplementedError


class UnauthorizedError(LemurException):
"""
Raised when user is unauthorized to perform an action on the resource
"""
def __init__(self, user, resource, action, details):
self.user = user
self.resource = resource
self.action = action
self.details = details

def __str__(self):
return repr(f"{self.user} is not authorized to perform {self.action} on {self.resource}: {self.details}")
87 changes: 85 additions & 2 deletions lemur/tests/test_certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,8 @@ def test_create_basic_csr(client):
owner="[email protected]",
key_type="RSA2048",
extensions=dict(
names=dict(
sub_alt_names=x509.SubjectAlternativeName(
sub_alt_names=dict(
names=x509.SubjectAlternativeName(
[
x509.DNSName("test.example.com"),
x509.DNSName("test2.example.com"),
Expand Down Expand Up @@ -1578,3 +1578,86 @@ def start_server():
daemon.setDaemon(True) # Set as a daemon so it will be killed once the main thread is dead.
daemon.start()
return daemon


def mocked_is_authorized_for_domain(name):
domain_in_error = "fail.lemur.com"
if name == domain_in_error:
raise UnauthorizedError(user="dummy_user", resource=domain_in_error, action="issue_certificate",
details="unit test, mocked failure")


@pytest.mark.parametrize(
"common_name, extensions, expected_error, authz_check_count",
[
("fail.lemur.com", None, True, 1),
("fail.lemur.com", dict(
sub_alt_names=dict(
names=x509.SubjectAlternativeName(
[
x509.DNSName("test.example.com"),
x509.DNSName("test2.example.com"),
]
)
)
), True, 3), # CN is checked after SAN
("test.example.com", dict(
sub_alt_names=dict(
names=x509.SubjectAlternativeName(
[
x509.DNSName("fail.lemur.com"),
x509.DNSName("test2.example.com"),
]
)
)
), True, 1),
(None, dict(
sub_alt_names=dict(
names=x509.SubjectAlternativeName(
[
x509.DNSName("fail.lemur.com"),
x509.DNSName("test2.example.com"),
]
)
)
), True, 1),
("pass.lemur.com", None, False, 1),
("pass.lemur.com", dict(
sub_alt_names=dict(
names=x509.SubjectAlternativeName(
[
x509.DNSName("test.example.com"),
x509.DNSName("test2.example.com"),
]
)
)
), False, 3),
("pass.lemur.com", dict(
sub_alt_names=dict(
names=x509.SubjectAlternativeName(
[
x509.DNSName("test.example.com"),
x509.DNSName("pass.lemur.com"),
]
)
)
), False, 2), # CN repeated in SAN
],
)
def test_allowed_issuance_for_domain(common_name, extensions, expected_error, authz_check_count):
from lemur.certificates.service import allowed_issuance_for_domain

with patch(
'lemur.certificates.service.is_authorized_for_domain', side_effect=mocked_is_authorized_for_domain
) as wrapper:
try:
allowed_issuance_for_domain(common_name, extensions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no return values being tested?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asserting on exception below (additional details)

if expected_error:
assert False, f"UnauthorizedError did not occur, input: CN({common_name}), SAN({extensions})"
except UnauthorizedError as e:
if expected_error:
pass
else:
assert False, f"UnauthorizedError occured, input: CN({common_name}), SAN({extensions})"

assert wrapper.call_count == authz_check_count