Skip to content
This repository has been archived by the owner on Jan 4, 2022. It is now read-only.

Commit

Permalink
Merge branch 'revoke-keys' of github.com:solvebio/django-oauth-toolki…
Browse files Browse the repository at this point in the history
…t into davecap-revoke-keys
  • Loading branch information
synasius committed Jul 3, 2014
2 parents c3ffc81 + 5c60bb3 commit da56a7b
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 1 deletion.
15 changes: 15 additions & 0 deletions oauth2_provider/oauth2_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ def create_token_response(self, request):

return uri, headers, body, status

def create_revocation_response(self, request):
"""
A wrapper method that calls create_revocation_response on a
`server_class` instance.
:param request: The current django.http.HttpRequest object
"""
uri, http_method, body, headers = self._extract_params(request)

headers, body, status = self.server.create_revocation_response(
uri, http_method, body, headers)
uri = headers.get("Location", None)

return uri, headers, body, status

def verify_request(self, request, scopes):
"""
A wrapper method that calls verify_request on `server_class` instance.
Expand Down
25 changes: 25 additions & 0 deletions oauth2_provider/oauth2_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,31 @@ def save_bearer_token(self, token, request, *args, **kwargs):
# TODO check out a more reliable way to communicate expire time to oauthlib
token['expires_in'] = oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS

def revoke_token(self, token, token_type_hint, request, *args, **kwargs):
"""
Revoke an access or refresh token.
:param token: The token string.
:param token_type_hint: access_token or refresh_token.
:param request: The HTTP Request (oauthlib.common.Request)
"""
if token_type_hint not in [None, 'access_token', 'refresh_token']:
token_type_hint = None

if token_type_hint in [None, 'access_token']:
try:
AccessToken.objects.get(token=token).delete()
return
except AccessToken.DoesNotExist:
pass

if token_type_hint in [None, 'refresh_token']:
try:
RefreshToken.objects.get(token=token).delete()
return
except RefreshToken.DoesNotExist:
pass

def validate_user(self, username, password, client, request, *args, **kwargs):
"""
Check username and password correspond to a valid and active User
Expand Down
119 changes: 119 additions & 0 deletions oauth2_provider/tests/test_token_revocation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from __future__ import unicode_literals

import datetime

from django.test import TestCase, RequestFactory
from django.core.urlresolvers import reverse
from django.utils import timezone

from ..compat import urlencode, get_user_model
from ..models import get_application_model, AccessToken, RefreshToken
from ..settings import oauth2_settings

from .test_utils import TestCaseUtils


Application = get_application_model()
UserModel = get_user_model()


class BaseTest(TestCaseUtils, TestCase):
def setUp(self):
self.factory = RequestFactory()
self.test_user = UserModel.objects.create_user("test_user", "[email protected]", "123456")
self.dev_user = UserModel.objects.create_user("dev_user", "[email protected]", "123456")

self.application = Application(
name="Test Application",
redirect_uris="http://localhost http://example.com http://example.it",
user=self.dev_user,
client_type=Application.CLIENT_CONFIDENTIAL,
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
)
self.application.save()

oauth2_settings._SCOPES = ['read', 'write']

def tearDown(self):
self.application.delete()
self.test_user.delete()
self.dev_user.delete()


class TestRevocationView(BaseTest):
def test_revoke_access_token(self):
"""
"""
tok = AccessToken.objects.create(user=self.test_user, token='1234567890',
application=self.application,
expires=timezone.now()+datetime.timedelta(days=1),
scope='read write')
query_string = urlencode({
'client_id': self.application.client_id,
'client_secret': self.application.client_secret,
'token': tok.token,
})
url = "{url}?{qs}".format(url=reverse('oauth2_provider:revoke-token'), qs=query_string)
response = self.client.post(url)
self.assertEqual(response.status_code, 200)
self.assertFalse(AccessToken.objects.filter(id=tok.id).exists())

def test_revoke_access_token_with_hint(self):
"""
"""
tok = AccessToken.objects.create(user=self.test_user, token='1234567890',
application=self.application,
expires=timezone.now()+datetime.timedelta(days=1),
scope='read write')
query_string = urlencode({
'client_id': self.application.client_id,
'client_secret': self.application.client_secret,
'token': tok.token,
'token_type_hint': 'access_token'
})
url = "{url}?{qs}".format(url=reverse('oauth2_provider:revoke-token'), qs=query_string)
response = self.client.post(url)
self.assertEqual(response.status_code, 200)
self.assertFalse(AccessToken.objects.filter(id=tok.id).exists())

def test_revoke_access_token_with_invalid_hint(self):
"""
"""
tok = AccessToken.objects.create(user=self.test_user, token='1234567890',
application=self.application,
expires=timezone.now()+datetime.timedelta(days=1),
scope='read write')
# invalid hint should have no effect
query_string = urlencode({
'client_id': self.application.client_id,
'client_secret': self.application.client_secret,
'token': tok.token,
'token_type_hint': 'bad_hint'
})
url = "{url}?{qs}".format(url=reverse('oauth2_provider:revoke-token'), qs=query_string)
response = self.client.post(url)
self.assertEqual(response.status_code, 200)
self.assertFalse(AccessToken.objects.filter(id=tok.id).exists())

def test_revoke_refresh_token(self):
"""
"""
tok = AccessToken.objects.create(user=self.test_user, token='1234567890',
application=self.application,
expires=timezone.now()+datetime.timedelta(days=1),
scope='read write')
rtok = RefreshToken.objects.create(user=self.test_user, token='999999999',
application=self.application, access_token=tok)
query_string = urlencode({
'client_id': self.application.client_id,
'client_secret': self.application.client_secret,
'token': rtok.token,
})
url = "{url}?{qs}".format(url=reverse('oauth2_provider:revoke-token'), qs=query_string)
response = self.client.post(url)
self.assertEqual(response.status_code, 200)
self.assertFalse(RefreshToken.objects.filter(id=rtok.id).exists())
1 change: 1 addition & 0 deletions oauth2_provider/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
'',
url(r'^authorize/$', views.AuthorizationView.as_view(), name="authorize"),
url(r'^token/$', views.TokenView.as_view(), name="token"),
url(r'^revoke_token/$', views.RevokeTokenView.as_view(), name="revoke-token"),
)

# Application management views
Expand Down
2 changes: 1 addition & 1 deletion oauth2_provider/views/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .base import AuthorizationView, TokenView
from .base import AuthorizationView, TokenView, RevokeTokenView
from .application import ApplicationRegistration, ApplicationDetail, ApplicationList, \
ApplicationDelete, ApplicationUpdate
from .generic import ProtectedResourceView, ScopedProtectedResourceView, ReadWriteScopedResourceView
16 changes: 16 additions & 0 deletions oauth2_provider/views/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,19 @@ def post(self, request, *args, **kwargs):
for k, v in headers.items():
response[k] = v
return response


class RevokeTokenView(CsrfExemptMixin, OAuthLibMixin, View):
"""
Implements an endpoint to revoke access or refresh tokens
"""
server_class = Server
validator_class = oauth2_settings.OAUTH2_VALIDATOR_CLASS

def post(self, request, *args, **kwargs):
url, headers, body, status = self.create_revocation_response(request)
response = HttpResponse(content=body, status=status)

for k, v in headers.items():
response[k] = v
return response
10 changes: 10 additions & 0 deletions oauth2_provider/views/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ def create_token_response(self, request):
core = self.get_oauthlib_core()
return core.create_token_response(request)

def create_revocation_response(self, request):
"""
A wrapper method that calls create_revocation_response on the
`server_class` instance.
:param request: The current django.http.HttpRequest object
"""
core = self.get_oauthlib_core()
return core.create_revocation_response(request)

def verify_request(self, request):
"""
A wrapper method that calls verify_request on `server_class` instance.
Expand Down

0 comments on commit da56a7b

Please sign in to comment.