diff --git a/backend/api/UsersView.py b/backend/api/UsersView.py index 189b1b7f..a043e4e5 100644 --- a/backend/api/UsersView.py +++ b/backend/api/UsersView.py @@ -4,47 +4,55 @@ from backend.managers.CasbinRoleManager import CasbinRoleManager from backend.pagination import parse_pagination_params from aiosqlite import IntegrityError -from backend.schemas import UserSchema +from functools import wraps +from connexion.exceptions import Forbidden +from connexion import context + +def check_permission(action, resourceId="user"): + def decorator(f): + @wraps(f) + async def decorated_function(*args, **kwargs): + cb = CasbinRoleManager() + token_info = context.context['token_info'] + if not cb.check_permissions(token_info["role"], resourceId, action): + raise Forbidden(description="Insufficient permissions") + + return await f(*args, **kwargs) + + return decorated_function + return decorator class UsersView: def __init__(self): self.um = UsersManager() - self.cb = CasbinRoleManager() - async def get(self, token_info, id: str): - if not self.cb.check_permissions(token_info["role"], 'user', 'show'): - return JSONResponse({"message": "Permission denied"}, status_code=403) - + @check_permission("show") + async def get(self, id: str): user = await self.um.retrieve_user(id) if user is None: return JSONResponse(status_code=404, headers={"error": "User not found"}) return JSONResponse(user.model_dump(), status_code=200) - async def post(self,token_info, body: dict): - if not self.cb.check_permissions(token_info["role"], 'user', 'create'): - return JSONResponse({"message": "Permission denied"}, status_code=403) + @check_permission("create") + async def post(self, body: dict): try: id = await self.um.create_user(body['name'], body['email']) return JSONResponse({"id": id}, status_code=201, headers={'Location': f'{api_base_url}/users/{id}'}) except IntegrityError: return JSONResponse({"message": "A user with the provided details already exists."}, status_code=400) - async def put(self, token_info, id: str, body: dict): - if not self.cb.check_permissions(token_info["role"], 'user', 'edit'): - return JSONResponse({"message": "Permission denied"}, status_code=403) - + @check_permission("edit") + async def put(self, id: str, body: dict): await self.um.update_user(id, body['name'], body['email']) return JSONResponse({"message": "User updated successfully"}, status_code=200) - async def delete(self, token_info, id: str): - if not self.cb.check_permissions(token_info["role"], 'user', 'delete'): - return JSONResponse({"message": "Permission denied"}, status_code=403) + @check_permission("delete") + async def delete(self, id: str): await self.um.delete_user(id) return Response(status_code=204) - async def search(self, token_info, filter: str = None, range: str = None, sort: str = None): - if not self.cb.check_permissions(token_info["role"], 'user', 'list'): - return JSONResponse({"message": "Permission denied"}, status_code=403) + @check_permission("list") + async def search(self, filter: str = None, range: str = None, sort: str = None): result = parse_pagination_params(filter, range, sort) if isinstance(result, JSONResponse): return result