from datetime import timedelta from fastapi.security import OAuth2PasswordRequestForm from fastapi import APIRouter, Depends, HTTPException, status import pymongo import app.config as config from app.models import User, Token, HTTPError from app.dto import UserRegisterDTO, RefreshTokenDTO from app.routes.utils import get_current_user, create_access_token, create_refresh_token, get_current_user_from_refresh_token from app.utils import get_password_hash, verify_password # Database setup client = pymongo.MongoClient(config.MONGODB_URL, username=config.MONGODB_USERNAME, password=config.MONGODB_PASSWORD) db = client[config.MONGODB_DATABASE] users_collection = db["users"] auth_router = APIRouter( tags=["Auth"] ) def create_tokens(user: User): access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={ "sub": user["username"], "is_admin": user["is_admin"] }, expires_delta=access_token_expires ) refresh_token_expires = timedelta(minutes=config.REFRESH_TOKEN_EXPIRE_MINUTES) refresh_token = create_refresh_token( data={ "sub": user["username"], "is_admin": user["is_admin"] }, expires_delta=refresh_token_expires ) return access_token, refresh_token @auth_router.post( path="/register", response_model=Token, responses={409: {"model": HTTPError}} ) async def register(user: UserRegisterDTO): user_exists = users_collection.find_one({"username": user.username}) if user_exists: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Username already used" ) hashed_password = get_password_hash(user.password) user = { "username": user.username, "password": hashed_password, "is_admin": False } user_id = users_collection.insert_one(user) access_token, refresh_token = create_tokens(user) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", "user_id": str(user_id.inserted_id), "is_admin": False } @auth_router.post( path="/login", response_model=Token, responses={401: {"model": HTTPError}} ) async def login(form_data: OAuth2PasswordRequestForm = Depends()): user = users_collection.find_one({"username": form_data.username}) if not user or not verify_password(form_data.password, user["password"]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token, refresh_token = create_tokens(user) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", "user_id": str(user["_id"]), "is_admin": user.get("is_admin", False) } @auth_router.post( path="/refresh-token", response_model=Token, responses={401: {"model": HTTPError}} ) async def refresh(refresh_data: RefreshTokenDTO): current_user = get_current_user_from_refresh_token(refresh_data.refresh_token) new_access_token, new_refresh_token = create_tokens(current_user) return { "access_token": new_access_token, "refresh_token": new_refresh_token, "token_type": "bearer", "user_id": str(current_user["_id"]), "is_admin": current_user["is_admin"] } @auth_router.get( path="/logout", responses={401: {"model": HTTPError}} ) async def logout(current_user: User = Depends(get_current_user)): return {"message": "Logged out"}