More helper functions related to token handler, implement API endpoint to check userID belonging to a valid token
This commit is contained in:
parent
63bc8b2b69
commit
180cfba7c1
42
main.py
42
main.py
|
|
@ -1,11 +1,11 @@
|
|||
import sys
|
||||
import atexit
|
||||
import signal
|
||||
from typing import Union
|
||||
from typing import Union, Annotated
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi import FastAPI, Request, Header
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
|
@ -110,16 +110,46 @@ def postlogin(body: loginBody, request: Request):
|
|||
try:
|
||||
if userHandler.checkUserExistence(dbConnection, body.username):
|
||||
userID = userHandler.getIDByUsername(dbConnection, body.username)
|
||||
if securityHandler.handlePassword(dbConnection, body.password, userID):
|
||||
if securityHandler.handlePasswordVerification(dbConnection, body.password, userID):
|
||||
return {"success": True, "authToken": tokenHandler.createToken(dbConnection, userID, body.rememberMe, request.client.host), "message": "User login success!"}
|
||||
else:
|
||||
return {"success": False, "authToken": "none", "message": "User login failed! Please check your password."}
|
||||
return {"success": False, "authToken": None, "message": "User login failed! Please check your password."}
|
||||
else:
|
||||
return {"success": False, "authToken": "none", "message": "User login failed! User does not exist."}
|
||||
return {"success": False, "authToken": None, "message": "User login failed! User does not exist."}
|
||||
except Exception as error:
|
||||
msg = "User login failed! Unexpected server error. " + repr(error)
|
||||
print(msg)
|
||||
return {"success": False, "authToken": "none", "message": msg}
|
||||
return {"success": False, "authToken": None, "message": msg}
|
||||
|
||||
@app.get("/api/userIDByAuthToken")
|
||||
def getuserIDByAuthToken(authToken: Annotated[str | None, Header()] = None):
|
||||
try:
|
||||
if tokenHandler.validateTokenExistence(dbConnection, authToken):
|
||||
userID = userHandler.getIDByAuthToken(dbConnection, authToken)
|
||||
return {"success": True, "userID": userID, "message": "Get userID by authToken succeeded!"}
|
||||
else:
|
||||
return {"success": False, "userID": None, "message": "Get userID by authToken failed! authToken provided is not valid."}
|
||||
except Exception as error:
|
||||
msg = "Get userID by authToken failed! Unexpected server error. " + repr(error)
|
||||
print(msg)
|
||||
return {"success": False, "authToken": None, "message": msg}
|
||||
|
||||
|
||||
# GET
|
||||
# /api/userByAuthToken
|
||||
# - userID
|
||||
# /api/publicInfo/{userID}
|
||||
# - username
|
||||
# - firstname
|
||||
# - lastname
|
||||
# - profile picture
|
||||
# - location
|
||||
# - public email (For contact)
|
||||
# /api/privateInfo/{userID}
|
||||
# - private email (For authentication/login)
|
||||
|
||||
# POST
|
||||
# /api/changeInfo/{infotype}
|
||||
|
||||
@app.get("/api")
|
||||
def getapi():
|
||||
|
|
|
|||
|
|
@ -1,10 +1,14 @@
|
|||
annotated-types==0.6.0
|
||||
anyio==4.3.0
|
||||
argon2-cffi==23.1.0
|
||||
argon2-cffi-bindings==21.2.0
|
||||
cffi==1.16.0
|
||||
click==8.1.7
|
||||
fastapi==0.110.1
|
||||
h11==0.14.0
|
||||
idna==3.7
|
||||
psycopg2-binary==2.9.9
|
||||
pycparser==2.22
|
||||
pydantic==2.7.0
|
||||
pydantic_core==2.18.1
|
||||
sniffio==1.3.1
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ def verifyRehash(hash: str) -> bool:
|
|||
except:
|
||||
return False
|
||||
|
||||
def handlePassword(dbConnection: psycopg2.extensions.connection, password: str, userID: int) -> bool:
|
||||
def handlePasswordVerification(dbConnection: psycopg2.extensions.connection, password: str, userID: int) -> bool:
|
||||
hash = userHandler.getHashValueByUserID(dbConnection, userID)
|
||||
userIDstr = str(userID)
|
||||
debugPrint("Now verifying password against hash for user ID " + userIDstr + "...")
|
||||
|
|
|
|||
|
|
@ -11,6 +11,9 @@ def debugPrint(msg: str) -> None:
|
|||
if debug:
|
||||
print("(TOKEN HANDLER) PRINT: " + msg)
|
||||
|
||||
def validateTokenExistence(dbConnection: psycopg2.extensions.connection, authToken: str) -> bool:
|
||||
return dbHandler.checkFieldValueExistence(dbConnection, "authtokens", "token", authToken)
|
||||
|
||||
def createToken(dbConnection: psycopg2.extensions.connection, userID: int, rememberMe: bool, locationIP: str) -> str:
|
||||
debugPrint("Now initialising new token with following attributes : userID = " + str(userID) + ", rememberMe = " + str(rememberMe) + ", locationIP = " + locationIP + "...")
|
||||
randToken = secrets.token_hex(1023)
|
||||
|
|
|
|||
|
|
@ -31,3 +31,12 @@ def getIDByUsername(dbConnection: psycopg2.extensions.connection, username: str)
|
|||
username=sql.Literal(username)
|
||||
)
|
||||
return int(dbHandler._execQuery(dbConnection, sanitisedQuery)[0][0])
|
||||
|
||||
def getIDByAuthToken(dbConnection: psycopg2.extensions.connection, authToken: str) -> int:
|
||||
debugPrint("Attempting to get ID from authToken...")
|
||||
sanitisedQuery = sql.SQL("""
|
||||
SELECT ownerid FROM authtokens WHERE "token" = {authToken}
|
||||
""").format(
|
||||
authToken=sql.Literal(authToken)
|
||||
)
|
||||
return int(dbHandler._execQuery(dbConnection, sanitisedQuery)[0][0])
|
||||
|
|
|
|||
Loading…
Reference in New Issue