You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
254 lines
8.8 KiB
254 lines
8.8 KiB
from fastapi import FastAPI, Depends, HTTPException, status
|
|
from fastapi.responses import JSONResponse
|
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
|
from typing import Optional
|
|
from datetime import datetime, timedelta
|
|
from jose import JWTError, jwt
|
|
from bson.objectid import ObjectId
|
|
from app.utils import get_password_hash, verify_password
|
|
|
|
# Best workaround found for _id typed as ObjectId (creating Exception bcause JSON doesn't support custom types countrary to BSON, used by Mongo)
|
|
# also allows to create DTOs at the time, but not at it's best (project structure is chaotic FTM :s)
|
|
from app.serializers import * # Import all serializers (detailed in __init__.py)
|
|
|
|
# Import all models (detailed in __init__.py)
|
|
from app.models import *
|
|
|
|
# Import all DTOs (detailed in __init__.py)
|
|
from app.dto import *
|
|
|
|
# Contains all constants
|
|
from app.config import *
|
|
|
|
import pymongo
|
|
|
|
# Database setup
|
|
client = pymongo.MongoClient(MONGODB_URL, username=MONGODB_USERNAME, password=MONGODB_PASSWORD)
|
|
db = client[MONGODB_DATABASE]
|
|
|
|
# FastAPI app instance
|
|
app = FastAPI(
|
|
servers=[
|
|
{"url": "http://127.0.0.1:8000/api/v1", "description": "Dev environment"},
|
|
{"url": "https://api.memorymap.fr/api/v1", "description": "Production environment"}
|
|
],
|
|
root_path="/api/v1",
|
|
root_path_in_servers=False
|
|
)
|
|
|
|
# OAuth2 scheme
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=TOKEN_URL)
|
|
|
|
# Collections
|
|
users_collection = db["users"]
|
|
pins_collection = db["pins"]
|
|
friends_collection = db["friends"]
|
|
|
|
# Token management
|
|
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
|
|
to_encode = data.copy()
|
|
|
|
if expires_delta:
|
|
expire = datetime.now() + expires_delta
|
|
else:
|
|
expire = datetime.now() + timedelta(minutes=15)
|
|
|
|
to_encode.update({"exp": expire})
|
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
return encoded_jwt
|
|
|
|
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
|
|
credentials_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Could not validate credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
raise credentials_exception
|
|
token_data = TokenData(username=username)
|
|
except JWTError:
|
|
raise credentials_exception
|
|
|
|
user = users_collection.find_one({"username": token_data.username})
|
|
if user is None:
|
|
raise credentials_exception
|
|
|
|
return user_serialize(user)
|
|
|
|
|
|
# Routes - TODO: find workaround to display 401/409/... HTTP error codes in openapi.json
|
|
@app.post(
|
|
path="/register",
|
|
response_model=Token,
|
|
responses={409: {"model": Message}}
|
|
)
|
|
async def register(user: UserRegisterDTO):
|
|
user_exists = users_collection.find_one({"username": user.username})
|
|
if user_exists:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Username already used"
|
|
)
|
|
|
|
hashed_password = get_password_hash(user.password)
|
|
user_id = users_collection.insert_one({"username": user.username, "password": hashed_password})
|
|
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires)
|
|
|
|
return {"access_token": access_token, "token_type": "bearer", "user_id": str(user_id)}
|
|
|
|
@app.post(
|
|
path="/login",
|
|
response_model=Token,
|
|
responses={401: {"model": Message}}
|
|
)
|
|
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
|
|
user = users_collection.find_one({"username": form_data.username})
|
|
if not user or not verify_password(form_data.password, user["password"]):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(data={"sub": form_data.username}, expires_delta=access_token_expires)
|
|
|
|
return {"access_token": access_token, "token_type": "bearer", "user_id": str(user["_id"])}
|
|
|
|
@app.get(
|
|
path="/logout",
|
|
responses={401: {"model": Message}}
|
|
)
|
|
async def logout(current_user: User = Depends(get_current_user)):
|
|
# TODO: find usecase / what to do ??
|
|
return {"message": "Logged out"}
|
|
|
|
@app.get(
|
|
path="/pin/{id}",
|
|
responses={401: {"model": Message}, 404: {"model": Message}}
|
|
)
|
|
async def get_pin(id: str, current_user: User = Depends(get_current_user)):
|
|
pin = pins_collection.find_one({"_id": ObjectId(id)})
|
|
if pin is None:
|
|
raise HTTPException(status_code=404, detail="Pin not found")
|
|
|
|
return pin
|
|
|
|
@app.patch(
|
|
path="/pin/{id}",
|
|
responses={401: {"model": Message}, 404: {"model": Message}}
|
|
)
|
|
async def update_pin(id: str, pin: Pin, current_user: User = Depends(get_current_user)):
|
|
result = pins_collection.update_one({"_id": ObjectId(id)}, {"$set": pin.model_dump()})
|
|
if result.matched_count == 0:
|
|
raise HTTPException(status_code=404, detail="Pin not found")
|
|
|
|
return {"message": "Pin updated"}
|
|
|
|
@app.post(
|
|
path="/pin/add",
|
|
responses={401: {"model": Message}}
|
|
)
|
|
async def add_pin(pin: Pin, current_user: User = Depends(get_current_user)):
|
|
pin_id = pins_collection.insert_one(pin.model_dump()).inserted_id
|
|
return {"id": str(pin_id)}
|
|
|
|
@app.get(
|
|
path="/pins",
|
|
responses={401: {"model": Message}}
|
|
)
|
|
async def list_pins(current_user: User = Depends(get_current_user)):
|
|
pins = pins_serialize(pins_collection.find().to_list())
|
|
return pins
|
|
|
|
@app.get(
|
|
path="/friend/{id}",
|
|
responses={401: {"model": Message}, 404: {"model": Message}}
|
|
)
|
|
async def get_friend(id: str, current_user: User = Depends(get_current_user)):
|
|
friend = friends_collection.find_one({"_id": ObjectId(id)})
|
|
if friend is None:
|
|
raise HTTPException(status_code=404, detail="Friend not found")
|
|
|
|
return friend
|
|
|
|
@app.post(
|
|
path="/friend/add",
|
|
responses={401: {"model": Message}, 409: {"model": Message}}
|
|
)
|
|
async def add_friend(friendAdd: FriendAddDTO, current_user: User = Depends(get_current_user)):
|
|
# TODO: test if exists
|
|
friend: Friend = friendAdd.model_dump()
|
|
|
|
print(current_user)
|
|
|
|
if(current_user.uid == friend["friend_user_id"]):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Cannot add yourself as a friend"
|
|
)
|
|
|
|
friend["user_id"] = current_user.uid
|
|
friend_id = friends_collection.insert_one(friend).inserted_id
|
|
return {"id": str(friend_id)}
|
|
|
|
@app.delete(
|
|
path="/friend/{id}/delete",
|
|
responses={401: {"model": Message}, 404: {"model": Message}}
|
|
)
|
|
async def delete_friend(id: str, current_user: User = Depends(get_current_user)):
|
|
result = friends_collection.delete_one({"_id": ObjectId(id)})
|
|
if result.deleted_count == 0:
|
|
raise HTTPException(status_code=404, detail="Friend not found")
|
|
|
|
return {"message": "Friend deleted"}
|
|
|
|
@app.patch(
|
|
path="/friend/{id}/accept",
|
|
responses={401: {"model": Message}, 404: {"model": Message}}
|
|
)
|
|
async def accept_friend(id: str, current_user: User = Depends(get_current_user)):
|
|
result = friends_collection.update_one({"_id": ObjectId(id)}, {"$set": {"status": "accepted"}})
|
|
if result.matched_count == 0:
|
|
raise HTTPException(status_code=404, detail="Friend not found")
|
|
|
|
return {"message": "Friend request accepted"}
|
|
|
|
@app.post(
|
|
path="/friend/{id}/deny",
|
|
responses={401: {"model": Message}, 404: {"model": Message}}
|
|
)
|
|
async def deny_friend(id: str, current_user: User = Depends(get_current_user)):
|
|
result = friends_collection.update_one({"_id": ObjectId(id)}, {"$set": {"status": "denied"}})
|
|
if result.matched_count == 0:
|
|
raise HTTPException(status_code=404, detail="Friend not found")
|
|
|
|
return {"message": "Friend request denied"}
|
|
|
|
@app.get(
|
|
path="/friends",
|
|
response_model=FriendListDTO,
|
|
responses={401: {"model": Message}}
|
|
)
|
|
async def list_friends(current_user: User = Depends(get_current_user)):
|
|
return FriendListDTO(**friends_serialize(friends_collection.find({"user_id": current_user.uid}).to_list(), friends_collection.find({"friend_user_id": current_user.uid})))
|
|
|
|
@app.get(
|
|
path="/users",
|
|
responses={401: {"model": Message}, 422: {"model": Message}},
|
|
response_model=list[UserDTO]
|
|
)
|
|
async def search_users(name: str, current_user: User = Depends(get_current_user)):
|
|
try:
|
|
users = users_serialize(users_collection.find({"username": {"$regex": name, "$options": "i"}}).to_list())
|
|
except pymongo.errors.OperationFailure:
|
|
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Regex may be wrongly formatted")
|
|
|
|
return users |