diff --git a/superset/security/manager.py b/superset/security/manager.py index 38b74d91a3fd7..a8ffb7d03b2f6 100644 --- a/superset/security/manager.py +++ b/superset/security/manager.py @@ -24,13 +24,11 @@ Any, Callable, cast, - DefaultDict, Dict, List, NamedTuple, Optional, Set, - Tuple, TYPE_CHECKING, Union, ) @@ -2125,41 +2123,3 @@ def is_admin(self) -> bool: return current_app.config["AUTH_ROLE_ADMIN"] in [ role.name for role in self.get_user_roles() ] - - def get_permissions( - self, - user: User, - ) -> Tuple[Dict[str, List[List[str]]], DefaultDict[str, List[str]]]: - if not user.roles: - raise AttributeError("User object does not have roles") - - roles = defaultdict(list) - permissions = defaultdict(set) - - query = ( - self.get_session.query(Role.name, Permission.name, ViewMenu.name) - .join(assoc_user_role, assoc_user_role.c.role_id == Role.id) - .join(Role.permissions) - .join(PermissionView.view_menu) - .join(PermissionView.permission) - ) - - if user.is_anonymous: - public_role = current_app.config.get("AUTH_ROLE_PUBLIC") - query = query.filter(Role.name == public_role) - elif self.is_guest_user(user): - guest_role = current_app.config.get("GUEST_ROLE_NAME") - query = query.filter(Role.name == guest_role) - else: - query = query.filter(assoc_user_role.c.user_id == user.id) - - rows = query.all() - for role, permission, view_menu in rows: - if permission in ("datasource_access", "database_access"): - permissions[permission].add(view_menu) - roles[role].append([permission, view_menu]) - - transformed_permissions = defaultdict(list) - for perm in permissions: - transformed_permissions[perm] = list(permissions[perm]) - return roles, transformed_permissions diff --git a/superset/views/utils.py b/superset/views/utils.py index aa4d634b87f29..6b6d5a0fb8579 100644 --- a/superset/views/utils.py +++ b/superset/views/utils.py @@ -15,8 +15,9 @@ # specific language governing permissions and limitations # under the License. import logging +from collections import defaultdict from functools import wraps -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, DefaultDict, Dict, List, Optional, Tuple, Union from urllib import parse import msgpack @@ -93,13 +94,34 @@ def bootstrap_user_data(user: User, include_perms: bool = False) -> Dict[str, An } if include_perms: - roles, permissions = security_manager.get_permissions(user) + roles, permissions = get_permissions(user) payload["roles"] = roles payload["permissions"] = permissions return payload +def get_permissions( + user: User, +) -> Tuple[Dict[str, List[List[str]]], DefaultDict[str, List[str]]]: + if not user.roles: + raise AttributeError("User object does not have roles") + + roles = defaultdict(list) + permissions = defaultdict(set) + + for role in user.roles: + permissions_ = security_manager.get_role_permissions(role) + for permission in permissions_: + if permission[0] in ("datasource_access", "database_access"): + permissions[permission[0]].add(permission[1]) + roles[role.name].append([permission[0], permission[1]]) + transformed_permissions = defaultdict(list) + for perm in permissions: + transformed_permissions[perm] = list(permissions[perm]) + return roles, transformed_permissions + + def get_viz( form_data: FormData, datasource_type: str, diff --git a/tests/integration_tests/security_tests.py b/tests/integration_tests/security_tests.py index d55f9a003447d..bba5cd2f0ac38 100644 --- a/tests/integration_tests/security_tests.py +++ b/tests/integration_tests/security_tests.py @@ -60,7 +60,6 @@ load_world_bank_dashboard_with_slices, load_world_bank_data, ) -from .dashboard_utils import get_table NEW_SECURITY_CONVERGE_VIEWS = ( "Annotation", @@ -1757,54 +1756,6 @@ def test_views_are_secured(self): view_str = "\n".join([str(v) for v in unsecured_views]) raise Exception(f"Some views are not secured:\n{view_str}") - @patch("superset.utils.core.g") - @patch("superset.security.manager.g") - def test_get_permissions_gamma_user(self, mock_sm_g, mock_g): - session = db.session - role_name = "dummy_role" - gamma_user = security_manager.find_user(username="gamma") - security_manager.add_role(role_name) - dummy_role = security_manager.find_role(role_name) - gamma_user.roles.append(dummy_role) - - table = ( - db.session.query(SqlaTable) - .filter_by(table_name="wb_health_population") - .one() - ) - table_perm = table.perm - security_manager.add_permission_role( - dummy_role, - security_manager.find_permission_view_menu("datasource_access", table_perm), - ) - security_manager.add_permission_role( - dummy_role, - security_manager.find_permission_view_menu( - "database_access", table.database.perm - ), - ) - - session.commit() - - mock_g.user = mock_sm_g.user = security_manager.find_user("gamma") - with self.client.application.test_request_context(): - roles, permissions = security_manager.get_permissions(mock_g.user) - assert "dummy_role" in roles - assert "Gamma" in roles - assert sorted(roles["Gamma"]) == sorted(GAMMA_ROLE_PERMISSIONS["Gamma"]) - assert sorted(roles["schema_access_role"]) == sorted( - GAMMA_ROLE_PERMISSIONS["schema_access_role"] - ) - - assert len(permissions) == 2 - assert "[examples].(id:" in permissions["database_access"][0] - assert "[examples].[" in permissions["datasource_access"][0] - - # cleanup - gamma_user = security_manager.find_user(username="gamma") - gamma_user.roles.remove(security_manager.find_role(role_name)) - session.commit() - class TestSecurityManager(SupersetTestCase): """