-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathmain.py
More file actions
114 lines (94 loc) · 4.07 KB
/
main.py
File metadata and controls
114 lines (94 loc) · 4.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from fastapi import Depends, FastAPI, Request, Form
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from models import User
from schema import users, database
from security import AuthHandler, RequiresLoginException
app = FastAPI()
# load static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# load templates
templates = Jinja2Templates(directory="templates")
auth_handler = AuthHandler()
# redirection block
@app.exception_handler(RequiresLoginException)
async def exception_handler(request: Request, exc: RequiresLoginException) -> Response:
''' this handler allows me to route the login exception to the login page.'''
return RedirectResponse(url='/')
@app.middleware("http")
async def create_auth_header(
request: Request,
call_next,):
'''
Check if there are cookies set for authorization. If so, construct the
Authorization header and modify the request (unless the header already
exists!)
'''
if ("Authorization" not in request.headers
and "Authorization" in request.cookies
):
access_token = request.cookies["Authorization"]
request.headers.__dict__["_list"].append(
(
"authorization".encode(),
f"Bearer {access_token}".encode(),
)
)
elif ("Authorization" not in request.headers
and "Authorization" not in request.cookies
):
request.headers.__dict__["_list"].append(
(
"authorization".encode(),
f"Bearer 12345".encode(),
)
)
response = await call_next(request)
return response
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
return templates.TemplateResponse("index.html",
{"request": request})
@app.get("/register/", response_class=HTMLResponse)
async def registration(request: Request):
return templates.TemplateResponse("register.html",
{"request": request})
@app.post("/register/", response_class=HTMLResponse)
async def register(request: Request, email: str = Form(...), password: str = Form(...)):
user = User(email = email,
password= password)
query = users.insert().values(email = user.email,
password= auth_handler.get_hash_password(user.password))
result = await database.execute(query)
# TODO verify success and handle errors
response = templates.TemplateResponse("success.html",
{"request": request, "success_msg": "Registration Successful!",
"path_route": '/', "path_msg": "Click here to login!"})
return response
@app.post("/login/")
async def sign_in(request: Request, response: Response,
email: str = Form(...), password: str = Form(...)):
try:
user = User(email = email,
password= password)
if await auth_handler.authenticate_user(user.email, user.password):
atoken = auth_handler.create_access_token(user.email)
response = templates.TemplateResponse("success.html",
{"request": request, "USERNAME": user.email, "success_msg": "Welcome back! ",
"path_route": '/private/', "path_msg": "Go to your private page!"})
response.set_cookie(key="Authorization", value= f"{atoken}", httponly=True)
return response
else:
return templates.TemplateResponse("error.html",
{"request": request, 'detail': 'Incorrect Username or Password', 'status_code': 404 })
except Exception as err:
return templates.TemplateResponse("error.html",
{"request": request, 'detail': 'Incorrect Username or Password', 'status_code': 401 })
@app.get("/private/", response_class=HTMLResponse)
async def private(request: Request, email=Depends(auth_handler.auth_wrapper)):
try:
return templates.TemplateResponse("private.html",
{"request": request})
except:
raise RequiresLoginException()