You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

255 lines
8.9 KiB

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from typing import Optional
from datetime import datetime, timedelta
from jose import JWTError, jwt
from bson.objectid import ObjectId
from app.utils import get_password_hash, verify_password
# Best workaround found for _id typed as ObjectId (creating Exception bcause JSON doesn't support custom types countrary to BSON, used by Mongo)
# also allows to create DTOs at the time, but not at it's best (project structure is chaotic FTM :s)
import app.serializers as serializers # Import all serializers (detailed in __init__.py)
# Import models
from app.models import User, Pin, Friend, Token, TokenData, HTTPError
# Import all DTOs (detailed in __init__.py)
from app.dto import FriendAddDTO, FriendListDTO, UserDTO, UserRegisterDTO
# Contains all constants
import app.config as config
import pymongo
# Database setup
client = pymongo.MongoClient(config.MONGODB_URL, username=config.MONGODB_USERNAME, password=config.MONGODB_PASSWORD)
db = client[config.MONGODB_DATABASE]
# FastAPI app instance
app = FastAPI(
servers=[
{"url": "http://127.0.0.1:8000/api/v1", "description": "Dev environment"},
{"url": "https://api.memorymap.fr/api/v1", "description": "Production environment"}
],
root_path="/api/v1",
root_path_in_servers=False
)
# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=config.TOKEN_URL)
# Collections
users_collection = db["users"]
pins_collection = db["pins"]
friends_collection = db["friends"]
# Token management
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now() + expires_delta
else:
expire = datetime.now() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, config.SECRET_KEY, algorithm=config.ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, config.SECRET_KEY, algorithms=[config.ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = users_collection.find_one({"username": token_data.username})
if user is None:
raise credentials_exception
return serializers.user_serialize(user)
# Routes
@app.post(
path="/register",
response_model=Token,
responses={409: {"model": HTTPError}}
)
async def register(user: UserRegisterDTO):
user_exists = users_collection.find_one({"username": user.username})
if user_exists:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Username already used"
)
hashed_password = get_password_hash(user.password)
user_id = users_collection.insert_one({"username": user.username, "password": hashed_password})
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer", "user_id": str(user_id)}
@app.post(
path="/login",
response_model=Token,
responses={401: {"model": HTTPError}}
)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = users_collection.find_one({"username": form_data.username})
if not user or not verify_password(form_data.password, user["password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": form_data.username}, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer", "user_id": str(user["_id"])}
@app.get(
path="/logout",
responses={401: {"model": HTTPError}}
)
async def logout(current_user: User = Depends(get_current_user)):
# TODO: find usecase / what to do ??
return {"message": "Logged out"}
@app.get(
path="/pin/{id}",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}}
)
async def get_pin(id: str, current_user: User = Depends(get_current_user)):
pin = pins_collection.find_one({"_id": ObjectId(id)})
if pin is None:
raise HTTPException(status_code=404, detail="Pin not found")
return serializers.pin_serialize(pin)
@app.patch(
path="/pin/{id}",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}}
)
async def update_pin(id: str, pin: Pin, current_user: User = Depends(get_current_user)):
result = pins_collection.update_one({"_id": ObjectId(id)}, {"$set": pin.model_dump()})
if result.matched_count == 0:
raise HTTPException(status_code=404, detail="Pin not found")
return {"message": "Pin updated"}
@app.post(
path="/pin/add",
responses={401: {"model": HTTPError}}
)
async def add_pin(pin: Pin, current_user: User = Depends(get_current_user)):
pin_id = pins_collection.insert_one(pin.model_dump()).inserted_id
return {"id": str(pin_id)}
@app.get(
path="/pins",
responses={401: {"model": HTTPError}}
)
async def list_pins(current_user: User = Depends(get_current_user)):
pins = serializers.pins_serialize(pins_collection.find().to_list())
return pins
def friend_not_found():
raise HTTPException(status_code=404, detail="Friend not found")
@app.get(
path="/friend/{id}",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}}
)
async def get_friend(id: str, current_user: User = Depends(get_current_user)):
friend = friends_collection.find_one({"_id": ObjectId(id)})
if friend is None: friend_not_found()
return friend
@app.post(
path="/friend/add",
responses={401: {"model": HTTPError}, 409: {"model": HTTPError}}
)
async def add_friend(friend_to_add: FriendAddDTO, current_user: User = Depends(get_current_user)):
# TODO: test if exists
friend: Friend = friend_to_add.model_dump()
print(current_user)
if(current_user.uid == friend["friend_user_id"]):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Cannot add yourself as a friend"
)
friend["user_id"] = current_user.uid
friend_id = friends_collection.insert_one(friend).inserted_id
return {"id": str(friend_id)}
@app.delete(
path="/friend/{id}/delete",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}}
)
async def delete_friend(id: str, current_user: User = Depends(get_current_user)):
result = friends_collection.delete_one({"_id": ObjectId(id)})
if result.deleted_count == 0: friend_not_found()
return {"message": "Friend deleted"}
@app.patch(
path="/friend/{id}/accept",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}}
)
async def accept_friend(id: str, current_user: User = Depends(get_current_user)):
result = friends_collection.update_one({"_id": ObjectId(id)}, {"$set": {"status": "accepted"}})
if result.matched_count == 0: friend_not_found()
return {"message": "Friend request accepted"}
@app.post(
path="/friend/{id}/deny",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}}
)
async def deny_friend(id: str, current_user: User = Depends(get_current_user)):
result = friends_collection.update_one({"_id": ObjectId(id)}, {"$set": {"status": "denied"}})
if result.matched_count == 0: friend_not_found()
return {"message": "Friend request denied"}
@app.get(
path="/friends",
response_model=FriendListDTO,
responses={401: {"model": HTTPError}}
)
async def list_friends(current_user: User = Depends(get_current_user)):
return FriendListDTO(**serializers.friends_serialize(friends_collection.find({"user_id": current_user.uid}).to_list(), friends_collection.find({"friend_user_id": current_user.uid}).to_list()))
@app.get(
path="/users",
responses={401: {"model": HTTPError}, 422: {"model": HTTPError}},
response_model=list[UserDTO]
)
async def search_users(name: str, current_user: User = Depends(get_current_user)):
try:
users = serializers.users_serialize(users_collection.find({"username": {"$regex": name, "$options": "i"}}).to_list())
except pymongo.errors.OperationFailure:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Regex may be wrongly formatted")
return users