-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
113 lines (95 loc) · 3.87 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from fastapi import FastAPI, Form, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from datetime import datetime, timedelta
from user import get_user_data, get_password_hash
import jwt
from fastapi import Request
import os
ALGORITHM = "RS256"
# ---------------------------------------------------------------------------- #
# Read Data #
# ---------------------------------------------------------------------------- #
with open("keys/jwt_private_key.pem", "r") as f:
PRIVATE_KEY = f.read()
with open("keys/jwt_public_key.pem", "r") as f:
PUBLIC_KEY = f.read()
with open("templates/sign-in.html", "r") as f:
login_html = f.read()
with open("templates/signed-in.html", "r") as f:
signed_in_html = f.read()
DATABASE_CONNECTION_STRING = os.getenv(
"DATABASE_CONNECTION_STRING",
"http://lectorium:lectorium@database:5984"
)
ACCESS_TOKEN_EXPIRE_MINUTES = int(
os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", 30)
)
# ---------------------------------------------------------------------------- #
# App #
# ---------------------------------------------------------------------------- #
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class Token(BaseModel):
access_token: str
token_type: str
# ---------------------------------------------------------------------------- #
# Helpers #
# ---------------------------------------------------------------------------- #
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, PRIVATE_KEY, algorithm=ALGORITHM)
return encoded_jwt
# ---------------------------------------------------------------------------- #
# Endpoints #
# ---------------------------------------------------------------------------- #
@app.get("/auth", response_class=HTMLResponse)
async def login_form():
return login_html
@app.post("/auth/login")
async def login(
request: Request,
username: str = Form(...),
password: str = Form(...),
):
# Check if user exists
user = get_user_data(
server_url=DATABASE_CONNECTION_STRING,
username=username
)
if user is None:
if request.headers.get("accept") == "application/json":
raise HTTPException(status_code=400, detail="Invalid credentials")
else:
return HTMLResponse(content=login_html)
# Check if password is correct
hashed_password = get_password_hash(
pwd=password,
salt=user["salt"],
iterations=user["iterations"],
)
if hashed_password != user["derived_key"]:
if request.headers.get("accept") == "application/json":
raise HTTPException(status_code=400, detail="Invalid credentials")
else:
return HTMLResponse(content=login_html)
# User is authenticated, create JWT token
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={
"sub": username,
"_couchdb.roles": user["roles"]
}, expires_delta=access_token_expires
)
if request.headers.get("accept") == "application/json":
response = JSONResponse(content={"message": "Login successful"})
else:
response = HTMLResponse(content=signed_in_html)
response.set_cookie(key="Authorization", value=access_token, httponly=True)
return response