You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

307 lines
10 KiB

import bson
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
from joserfc import jwt
from joserfc.errors import JoseError
from joserfc.jwk import OctKey
from bson.objectid import ObjectId
from app.utils import get_password_hash, verify_password
# Best workaround found for _id typed as ObjectId (creating Exception bcause JSON doesn't support custom types countrary to BSON, used by Mongo)
# also allows to create DTOs at the time, but not at it's best (project structure is chaotic FTM :s)
import app.serializers as serializers # Import all serializers (detailed in __init__.py)
# Import models
from app.models import User, Friend, Token, TokenData, HTTPError
# Import all DTOs (detailed in __init__.py)
from app.dto import FriendAddDTO, FriendListDTO, UserDTO, UserRegisterDTO, PinDTO
# Contains all constants
import app.config as config
import pymongo
# Database setup
client = pymongo.MongoClient(config.MONGODB_URL, username=config.MONGODB_USERNAME, password=config.MONGODB_PASSWORD)
db = client[config.MONGODB_DATABASE]
# FastAPI app instance
app = FastAPI(
servers=[
{"url": "http://127.0.0.1:8000/api/v1", "description": "Dev environment"},
{"url": "https://api.memorymap.fr/api/v1", "description": "Production environment"}
],
root_path="/api/v1",
root_path_in_servers=False
)
origins = [
"*", # Allow all origins
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=config.TOKEN_URL)
# Collections
users_collection = db["users"]
pins_collection = db["pins"]
friends_collection = db["friends"]
# Token management
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.now() + expires_delta
to_encode.update({"exp": expire})
header = {"alg": config.ALGORITHM}
encoded_jwt = jwt.encode(header, to_encode, OctKey.import_key(config.SECRET_KEY))
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, OctKey.import_key(config.SECRET_KEY))
username: str = payload.claims["sub"]
expire_date = payload.claims["exp"]
if username is None or int(datetime.now().timestamp()) > expire_date:
raise credentials_exception
token_data = TokenData(username=username)
except JoseError:
raise credentials_exception
user = users_collection.find_one({"username": token_data.username})
if user is None:
raise credentials_exception
return serializers.user_serialize(user)
# Exceptions
def friend_not_found():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Friend not found"
)
def objectid_misformatted():
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="The ObjectID is misformatted"
)
# Routes
@app.post(
path="/register",
response_model=Token,
responses={409: {"model": HTTPError}}
)
async def register(user: UserRegisterDTO):
user_exists = users_collection.find_one({"username": user.username})
if user_exists:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Username already used"
)
hashed_password = get_password_hash(user.password)
user_id = users_collection.insert_one({"username": user.username, "password": hashed_password})
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer", "user_id": str(user_id.inserted_id)}
@app.post(
path="/login",
response_model=Token,
responses={401: {"model": HTTPError}}
)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = users_collection.find_one({"username": form_data.username})
if not user or not verify_password(form_data.password, user["password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": form_data.username}, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer", "user_id": str(user["_id"])}
@app.get(
path="/logout",
responses={401: {"model": HTTPError}}
)
async def logout(current_user: User = Depends(get_current_user)):
return {"message": "Logged out"}
@app.get(
path="/pin/{id}",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}, 422: {"model": HTTPError}}
)
async def get_pin(id: str, current_user: User = Depends(get_current_user)):
try:
pin = pins_collection.find_one({"_id": ObjectId(id)})
except bson.errors.InvalidId:
objectid_misformatted()
if pin is None:
raise HTTPException(status_code=404, detail="Pin not found")
return serializers.pin_serialize(pin)
@app.patch(
path="/pin/{id}",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}, 422: {"model": HTTPError}}
)
async def update_pin(id: str, pin: PinDTO, current_user: User = Depends(get_current_user)):
try:
result = pins_collection.update_one({"_id": ObjectId(id)}, {"$set": pin.model_dump()})
except bson.errors.InvalidId:
objectid_misformatted()
if result.matched_count == 0:
raise HTTPException(status_code=404, detail="Pin not found")
return {"message": "Pin updated"}
@app.post(
path="/pin/add",
responses={401: {"model": HTTPError}}
)
async def add_pin(pin: PinDTO, current_user: User = Depends(get_current_user)):
pin.user_id = current_user.uid
pin_id = pins_collection.insert_one(pin.model_dump()).inserted_id
return {"id": str(pin_id)}
@app.get(
path="/pins",
responses={401: {"model": HTTPError}}
)
async def list_pins(current_user: User = Depends(get_current_user)):
pins = serializers.pins_serialize(pins_collection.find().to_list(), current_user.uid)
return pins
@app.get(
path="/friend/{id}",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}, 422: {"model": HTTPError}}
)
async def get_friend(id: str, current_user: User = Depends(get_current_user)):
try:
friend = friends_collection.find_one({"_id": ObjectId(id)})
except bson.errors.InvalidId:
objectid_misformatted()
if friend is None: friend_not_found()
return serializers.friend_serialize(friend)
@app.post(
path="/friend/add",
responses={401: {"model": HTTPError}, 409: {"model": HTTPError}}
)
async def add_friend(friend_to_add: FriendAddDTO, current_user: User = Depends(get_current_user)):
# TODO: test if exists
friend: Friend = friend_to_add.model_dump()
if(current_user.uid == friend["friend_user_id"]):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Cannot add yourself as a friend"
)
friend["user_id"] = current_user.uid
friend["status"] = "pending"
friend_id = friends_collection.insert_one(friend).inserted_id
return {"id": str(friend_id)}
@app.delete(
path="/friend/{id}/delete",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}, 422: {"model": HTTPError}}
)
async def delete_friend(id: str, current_user: User = Depends(get_current_user)):
try:
result = friends_collection.delete_one({"_id": ObjectId(id)})
except bson.errors.InvalidId:
objectid_misformatted()
if result.deleted_count == 0: friend_not_found()
return {"message": "Friend deleted"}
@app.patch(
path="/friend/{id}/accept",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}, 422: {"model": HTTPError}}
)
async def accept_friend(id: str, current_user: User = Depends(get_current_user)):
try:
check_friend = friends_collection.find_one({"_id": ObjectId(id)})
if check_friend is None: friend_not_found()
if check_friend["status"] != "pending":
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Friend request already accepted"
)
friends_collection.update_one({"_id": ObjectId(id)}, {"$set": {"status": "accepted"}})
except bson.errors.InvalidId:
objectid_misformatted()
return {"message": "Friend request accepted"}
@app.post(
path="/friend/{id}/deny",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}, 422: {"model": HTTPError}}
)
async def deny_friend(id: str, current_user: User = Depends(get_current_user)):
try:
result = friends_collection.update_one({"_id": ObjectId(id)}, {"$set": {"status": "denied"}})
except bson.errors.InvalidId:
objectid_misformatted()
if result.matched_count == 0: friend_not_found()
return {"message": "Friend request denied"}
@app.get(
path="/friends",
response_model=FriendListDTO,
responses={401: {"model": HTTPError}}
)
async def list_friends(current_user: User = Depends(get_current_user)):
return FriendListDTO(**serializers.friends_serialize(friends_collection.find({"user_id": current_user.uid}).to_list(), friends_collection.find({"friend_user_id": current_user.uid}).to_list()))
@app.get(
path="/users",
responses={401: {"model": HTTPError}, 422: {"model": HTTPError}},
response_model=list[UserDTO]
)
async def search_users(name: str, current_user: User = Depends(get_current_user)):
try:
users = serializers.users_serialize(users_collection.find({"username": {"$regex": name, "$options": "i"}}).to_list())
except pymongo.errors.OperationFailure:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Regex may be wrongly formatted")
return users