Skip to content

Commit

Permalink
adding decorator for rbac
Browse files Browse the repository at this point in the history
  • Loading branch information
sanudutta45 committed Nov 9, 2024
1 parent a3d410d commit 74b4bc9
Showing 1 changed file with 27 additions and 19 deletions.
46 changes: 27 additions & 19 deletions backend/api/UsersView.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,55 @@
from backend.managers.CasbinRoleManager import CasbinRoleManager
from backend.pagination import parse_pagination_params
from aiosqlite import IntegrityError
from backend.schemas import UserSchema
from functools import wraps
from connexion.exceptions import Forbidden
from connexion import context

def check_permission(action, resourceId="user"):
def decorator(f):
@wraps(f)
async def decorated_function(*args, **kwargs):
cb = CasbinRoleManager()
token_info = context.context['token_info']
if not cb.check_permissions(token_info["role"], resourceId, action):
raise Forbidden(description="Insufficient permissions")

return await f(*args, **kwargs)

return decorated_function
return decorator

class UsersView:
def __init__(self):
self.um = UsersManager()
self.cb = CasbinRoleManager()

async def get(self, token_info, id: str):
if not self.cb.check_permissions(token_info["role"], 'user', 'show'):
return JSONResponse({"message": "Permission denied"}, status_code=403)

@check_permission("show")
async def get(self, id: str):
user = await self.um.retrieve_user(id)
if user is None:
return JSONResponse(status_code=404, headers={"error": "User not found"})
return JSONResponse(user.model_dump(), status_code=200)

async def post(self,token_info, body: dict):
if not self.cb.check_permissions(token_info["role"], 'user', 'create'):
return JSONResponse({"message": "Permission denied"}, status_code=403)
@check_permission("create")
async def post(self, body: dict):
try:
id = await self.um.create_user(body['name'], body['email'])
return JSONResponse({"id": id}, status_code=201, headers={'Location': f'{api_base_url}/users/{id}'})
except IntegrityError:
return JSONResponse({"message": "A user with the provided details already exists."}, status_code=400)

async def put(self, token_info, id: str, body: dict):
if not self.cb.check_permissions(token_info["role"], 'user', 'edit'):
return JSONResponse({"message": "Permission denied"}, status_code=403)

@check_permission("edit")
async def put(self, id: str, body: dict):
await self.um.update_user(id, body['name'], body['email'])
return JSONResponse({"message": "User updated successfully"}, status_code=200)

async def delete(self, token_info, id: str):
if not self.cb.check_permissions(token_info["role"], 'user', 'delete'):
return JSONResponse({"message": "Permission denied"}, status_code=403)
@check_permission("delete")
async def delete(self, id: str):
await self.um.delete_user(id)
return Response(status_code=204)

async def search(self, token_info, filter: str = None, range: str = None, sort: str = None):
if not self.cb.check_permissions(token_info["role"], 'user', 'list'):
return JSONResponse({"message": "Permission denied"}, status_code=403)
@check_permission("list")
async def search(self, filter: str = None, range: str = None, sort: str = None):
result = parse_pagination_params(filter, range, sort)
if isinstance(result, JSONResponse):
return result
Expand Down

0 comments on commit 74b4bc9

Please sign in to comment.