import typing import psycopg2 from psycopg2 import sql def debugPrint(msg: str) -> None: print(msg) def debugPrintNotice(dbConnection: psycopg2.extensions.connection, i: int) -> None: print("(DB HANDLER) " + dbConnection.notices[i]) def connect(databaseOption: str, hostOption: str, userOption: str, passwordOption: str, portOption: str) -> psycopg2.extensions.connection: return psycopg2.connect(database=databaseOption, host=hostOption, user=userOption, password=passwordOption, port=portOption) def initTable(dbConnection: psycopg2.extensions.connection, tableName: str, tableFormat: str): dbCursor = dbConnection.cursor() dbCursor.execute(""" DO $$ BEGIN IF (EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '""" + tableName.lower() + """')) THEN RAISE NOTICE '""" + tableName + """ Table does exist! Skipping creating table.'; ELSE RAISE NOTICE '""" + tableName + """ Table does not exist! Creating table.'; CREATE TABLE """ + tableName.lower() + """ ( """ + tableFormat + """ ); END IF; END; $$""") debugPrintNotice(dbConnection, -1) dbConnection.commit() dbCursor.close() def insertRow(dbConnection: psycopg2.extensions.connection, tableName: str, tableFormat: list[str], tableValues: list): debugPrint("Attempting to insert new row...") sanitisedQuery = sql.SQL(""" INSERT INTO {table} ({format}) VALUES ({values}) RETURNING *; """).format( table=sql.Identifier(tableName), format=sql.SQL(", ").join( sql.Identifier(value) for value in tableFormat ), values=sql.SQL(", ").join( sql.Literal(value) for value in tableValues ) ) debugPrint(sanitisedQuery.as_string(dbConnection)) commitQuery(dbConnection, sanitisedQuery) def commitQuery(dbConnection: psycopg2.extensions.connection, query: sql.Composable) -> list: debugPrint("Commit query executing...") dbCursor = dbConnection.cursor() dbCursor.execute(query) dbConnection.commit() dbResults = dbCursor.fetchall() dbCursor.close() return dbResults def execQuery(dbConnection: psycopg2.extensions.connection, query: sql.Composable) -> list: debugPrint("Exec query executing...") dbCursor = dbConnection.cursor() dbCursor.execute(query) dbResults = dbCursor.fetchall() dbCursor.close() return dbResults