Blorg-Backend/main.py

400 lines
14 KiB
Python

import os
import sys
import atexit
import signal
import base64
from typing import Union, Annotated
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Header
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import dbHandler
import userHandler
import securityHandler
import tokenHandler
import blogHandler
blogDir = "./blogs"
debug: bool = True
def debugPrint(msg: str) -> None:
if debug:
print("(MAIN) PRINT: " + msg)
def debugInitPrint(msg: str) -> None:
if debug:
print("(MAIN INIT) PRINT: " + msg)
dbConnection = dbHandler.connect("blorgdb",
"172.20.0.10",
"dev",
"dev",
"5432")
def apiInit():
dbHandler.initTable(dbConnection, "Users", """
ID SERIAL PRIMARY KEY,
Username VARCHAR(255),
Email VARCHAR(255),
FirstName VARCHAR(255),
LastName VARCHAR(255),
Description VARCHAR(255),
Country VARCHAR(255),
Theme VARCHAR(255),
AccentColor VARCHAR(255),
PasswordHash VARCHAR(255)
""")
dbHandler.initTable(dbConnection, "SignOns", """
ID SERIAL PRIMARY KEY,
UserID INTEGER,
LoginSuccess BOOLEAN,
DateAttempted TIMESTAMP,
IPLocationAttempted VARCHAR(255)
""")
dbHandler.initTable(dbConnection, "AuthTokens", """
ID SERIAL PRIMARY KEY,
Token VARCHAR(2048),
OwnerID INTEGER,
DateCreated TIMESTAMP,
DateExpiry TIMESTAMP,
IPLocationCreated VARCHAR(255)
""")
dbHandler.initTable(dbConnection, "Blogs", """
ID SERIAL PRIMARY KEY,
AuthorID INTEGER,
CategoryID INTEGER,
DatePosted TIMESTAMP,
Title VARCHAR(255),
Description VARCHAR(255)
""")
dbHandler.initTable(dbConnection, "Categories", """
ID SERIAL PRIMARY KEY,
Name VARCHAR(255)
""")
if not os.path.exists(blogDir):
debugInitPrint("Blog data directory does not exist! Creating blog data directory at " + blogDir + "...")
os.mkdir(blogDir)
else:
debugInitPrint("Blog data directory already exists! Skipping directory creation...")
# userHandler.createUser(dbConnection, "testuser", "Test", "User", "A test user", "TestCountry", "TestTheme", "TestColor", "testuser")
def apiCleanup():
dbHandler.disconnect(dbConnection)
@asynccontextmanager
async def apiLifespan(app: FastAPI):
# API Init
apiInit()
# API Clean up
yield
apiCleanup()
app = FastAPI(lifespan=apiLifespan)
origins = [
"http://localhost",
"http://localhost:8080",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def getroot():
return {"Hello": "World"}
class ApiBody(BaseModel):
username: str
password: str
@app.post("/api")
def postapi(body: ApiBody):
debugPrint(body.username)
debugPrint(body.password)
return body
class postuserLoginBody(BaseModel):
username: str
password: str
rememberMe: bool
@app.post("/api/user/login")
def postuserLogin(body: postuserLoginBody, request: Request):
try:
if userHandler.checkUserExistence(dbConnection, body.username):
userID = userHandler.getIDByUsername(dbConnection, body.username)
if securityHandler.handlePasswordVerification(dbConnection, body.password, userID):
return {"success": True, "authToken": tokenHandler.createToken(dbConnection, userID, body.rememberMe, request.client.host), "message": "User login success!"}
else:
return {"success": False, "authToken": None, "message": "User login failed! Please check your password."}
else:
return {"success": False, "authToken": None, "message": "User login failed! User does not exist."}
except Exception as error:
msg = "User login failed! Unexpected server error. " + repr(error)
debugPrint(msg)
return {"success": False, "authToken": None, "message": msg}
class postuserCreate(BaseModel):
username: str
password: str
rememberMe: bool
@app.post("/api/user/create")
def postuserCreate(body: postuserCreate, request: Request):
try:
if not userHandler.checkUserExistence(dbConnection, body.username):
userHandler.createUser(dbConnection, body.username, "Test", "User", "A test user", "TestCountry", "TestTheme", "TestColor", body.password)
userID = userHandler.getIDByUsername(dbConnection, body.username)
return {"success": True, "authToken": tokenHandler.createToken(dbConnection, userID, body.rememberMe, request.client.host), "message": "User login success!"}
else:
return {"success": False, "authToken": None, "message": "User login failed! User already exists."}
except Exception as error:
msg = "User login failed! Unexpected server error. " + repr(error)
debugPrint(msg)
return {"success": False, "authToken": None, "message": msg}
@app.get("/api/user/IDByAuthToken")
def getuserIDByAuthToken(authToken: Annotated[str | None, Header()] = None):
try:
if tokenHandler.validateTokenExistence(dbConnection, authToken):
userID = userHandler.getIDByAuthToken(dbConnection, authToken)
return {"success": True, "userID": userID, "message": "Get userID by authToken succeeded!"}
else:
return {"success": False, "userID": None, "message": "Get userID by authToken failed! authToken provided is not valid."}
except Exception as error:
msg = "Get userID by authToken failed! Unexpected server error. " + repr(error)
debugPrint(msg)
return {"success": False, "authToken": None, "message": msg}
@app.get("/api/user/publicInfo/{userID}")
def getuserPublicInfo(userID: int):
try:
if userHandler.checkIDExistence(dbConnection, userID):
return {
"success": True,
"username": userHandler.getUserInfoByID(dbConnection, userID, "username"),
"firstName": userHandler.getUserInfoByID(dbConnection, userID, "firstname"),
"lastName": userHandler.getUserInfoByID(dbConnection, userID, "lastname"),
"message": "Get public info succeeded!"
}
else:
return {
"success": False,
"username": None,
"firstName": None,
"lastName": None,
"message": "Get public info failed! UserID provided does not exist."
}
except Exception as error:
msg = "Get public info failed! Unexpected server error. " + repr(error)
debugPrint(msg)
return {
"success": False,
"username": None,
"firstName": None,
"lastName": None,
"message": msg
}
@app.get("/api/user/settings/account")
def getuserSettingsAccount(authToken: Annotated[str | None, Header()] = None):
try:
if tokenHandler.validateTokenExistence(dbConnection, authToken):
userID = userHandler.getIDByAuthToken(dbConnection, authToken)
return {
"success": True,
"username": userHandler.getUserInfoByID(dbConnection, userID, "username"),
"firstName": userHandler.getUserInfoByID(dbConnection, userID, "firstname"),
"lastName": userHandler.getUserInfoByID(dbConnection, userID, "lastname"),
"message": "Get user settings succeeded!"
}
else:
return {
"success": False,
"username": None,
"firstName": None,
"lastName": None,
"message": "Get user settings failed! authToken provided is not valid."
}
except Exception as error:
msg = "Get user settings failed! Unexpected server error. " + repr(error)
debugPrint(msg)
return {
"success": False,
"username": None,
"firstName": None,
"lastName": None,
"message": msg
}
@app.get("/api/user/blog/cardInfo/range")
def getblogCardInfoRange(rangeStart: int = 1, rangeEnd: int = 25, sortByLatest: bool = True):
try:
blogCardData = blogHandler.getBlogCardRange(dbConnection, rangeStart, rangeEnd, sortByLatest)
blogCardData[0] = {
"success": True,
"message": "Get card info succeeded!"
}
return blogCardData
except Exception as error:
blogCardData = {}
msg = "Get card info failed! Unexpected server error. " + repr(error)
debugPrint(msg)
blogCardData[0] = {
"success": False,
"message": msg
}
return blogCardData
@app.get("/api/blog/contents/HTML/{blogID}")
def getblogContentsHTML(blogID: int):
try:
if blogHandler.checkIDExistence(dbConnection, blogID):
return {
"success": True,
"contents": blogHandler.getBlogContentsHTML(blogID),
"message": "Get blog contents in HTML succeeded!"
}
else:
return {
"success": False,
"contents": None,
"message": "Get blog contents in HTML failed! BlogID provided does not exist."
}
except Exception as error:
msg = "Get blog contents in HTML failed! Unexpected server error. " + repr(error)
debugPrint(msg)
return {
"success": False,
"contents": None,
"message": msg
}
class postblogCreateBody(BaseModel):
authToken: str
orgContents: str
@app.post("/api/blog/create")
def postblogCreate(body: postblogCreateBody):
try:
if tokenHandler.validateTokenExistence(dbConnection, body.authToken):
decodedOrgContents = base64.b32decode(body.orgContents).decode()
userID = userHandler.getIDByAuthToken(dbConnection, body.authToken)
newBlog = blogHandler.uploadBlog(dbConnection, userID, decodedOrgContents)
if newBlog:
return {
"success": True,
"blogID": newBlog,
"message": "Create blog succeeded!"
}
else:
return {
"success": False,
"blogID": newBlog,
"message": "Create blog failed! Unknown error parsing org data."
}
else:
return {
"success": False,
"blogID": None,
"message": "Create blog failed! authToken provided is not valid."
}
except Exception as error:
msg = "Create blog failed! Unexpected server error. " + repr(error)
debugPrint(msg)
return {
"success": False,
"blogID": None,
"message": msg
}
class putuserSettingsChange(BaseModel):
authToken: str
newValue: str
@app.put("/api/user/settings/change/{settingName}")
def putuserSettingsChange(body: putuserSettingsChange, settingName: str):
try:
if tokenHandler.validateTokenExistence(dbConnection, body.authToken):
userID = userHandler.getIDByAuthToken(dbConnection, body.authToken)
settingNameLowercase = settingName.lower()
if userHandler.checkUserSettingExistence(dbConnection, settingNameLowercase):
oldValue = userHandler.getUserInfoByID(dbConnection, userID, settingNameLowercase)
changedValue = userHandler.changeUserSettingValue(dbConnection, userID, settingNameLowercase, body.newValue)
return {
"success": True,
"message": "Change user settings succeeded! changed " + settingNameLowercase + " from " + oldValue + " to " + body.newValue + "."
}
else:
return {
"success": False,
"message": "Change user settings failed! Setting " + settingName + " does not exist."
}
else:
return {
"success": False,
"message": "Change user settings failed! authToken provided is not valid."
}
except Exception as error:
msg = "Change user settings failed! Unexpected server error. " + repr(error)
debugPrint(msg)
return {
"success": False,
"message": msg
}
# GET
# /api/user/IDByAuthToken
# - userID
# Could be merged into a single endpoint which you can optionally query what you need from
# /api/user/publicInfo/{userID}
# - username
# - firstname
# - lastname
# - profile picture
# - location
# - public email (For contact)
# Could be merged into a single endpoint which you can optionally query what you need from
# /api/user/privateInfo/{userID}
# - private email (For authentication/login)
# Could be merged into a single endpoint which you can optionally query what you need from
# /api/blog/title/{blogID}
# /api/blog/authorID/{blogID}
# /api/blog/categoryID/{blogID}
# /api/blog/pictureURL/{blogID}
# /api/blog/description/{blogID}
# /api/blog/datePosted/{blogID}
# /api/blog/contents/HTML/{blogID}
# /api/blog/contents/org/{blogID}
# POST
# /api/user/changeInfo
# TODO: Need to overhaul this to accept an input file which we can read in chunks to ensure a too big file isn't sent, frontend will convert textbox/form data appropriately
# https://github.com/steinnes/content-size-limit-asgi
# https://github.com/tiangolo/fastapi/issues/362
# /api/blog/create
# /api/blog/edit
@app.get("/api/debug")
def getdebug(request: Request):
return request.json()
@app.get("/api")
def getapi():
return {"Hello": "API!"}