You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
70 lines
2.4 KiB
70 lines
2.4 KiB
from datetime import timedelta
|
|
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
import pymongo
|
|
|
|
import app.config as config
|
|
from app.models import User, Token, HTTPError
|
|
from app.dto import UserRegisterDTO
|
|
from app.routes.utils import get_current_user, create_access_token
|
|
from app.utils import get_password_hash, verify_password
|
|
|
|
# Database setup
|
|
client = pymongo.MongoClient(config.MONGODB_URL, username=config.MONGODB_USERNAME, password=config.MONGODB_PASSWORD)
|
|
db = client[config.MONGODB_DATABASE]
|
|
|
|
users_collection = db["users"]
|
|
|
|
auth_router = APIRouter(
|
|
tags=["Auth"]
|
|
)
|
|
|
|
@auth_router.post(
|
|
path="/register",
|
|
response_model=Token,
|
|
responses={409: {"model": HTTPError}}
|
|
)
|
|
async def register(user: UserRegisterDTO):
|
|
user_exists = users_collection.find_one({"username": user.username})
|
|
if user_exists:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Username already used"
|
|
)
|
|
|
|
hashed_password = get_password_hash(user.password)
|
|
user_id = users_collection.insert_one({"username": user.username, "password": hashed_password})
|
|
|
|
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires)
|
|
|
|
return {"access_token": access_token, "token_type": "bearer", "user_id": str(user_id.inserted_id)}
|
|
|
|
@auth_router.post(
|
|
path="/login",
|
|
response_model=Token,
|
|
responses={401: {"model": HTTPError}}
|
|
)
|
|
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
|
|
user = users_collection.find_one({"username": form_data.username})
|
|
if not user or not verify_password(form_data.password, user["password"]):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(data={"sub": form_data.username}, expires_delta=access_token_expires)
|
|
|
|
return {"access_token": access_token, "token_type": "bearer", "user_id": str(user["_id"])}
|
|
|
|
|
|
@auth_router.get(
|
|
path="/logout",
|
|
responses={401: {"model": HTTPError}}
|
|
)
|
|
async def logout(current_user: User = Depends(get_current_user)):
|
|
return {"message": "Logged out"}
|