You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
95 lines
2.8 KiB
95 lines
2.8 KiB
from datetime import timedelta
|
|
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
import pymongo
|
|
|
|
import app.config as config
|
|
from app.models import User, Token, HTTPError
|
|
from app.dto import UserRegisterDTO
|
|
from app.routes.utils import get_current_user, create_access_token
|
|
from app.utils import get_password_hash, verify_password
|
|
|
|
# Database setup
|
|
client = pymongo.MongoClient(config.MONGODB_URL, username=config.MONGODB_USERNAME, password=config.MONGODB_PASSWORD)
|
|
db = client[config.MONGODB_DATABASE]
|
|
|
|
users_collection = db["users"]
|
|
|
|
auth_router = APIRouter(
|
|
tags=["Auth"]
|
|
)
|
|
|
|
@auth_router.post(
|
|
path="/register",
|
|
response_model=Token,
|
|
responses={409: {"model": HTTPError}}
|
|
)
|
|
async def register(user: UserRegisterDTO):
|
|
user_exists = users_collection.find_one({"username": user.username})
|
|
if user_exists:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Username already used"
|
|
)
|
|
|
|
hashed_password = get_password_hash(user.password)
|
|
user_id = users_collection.insert_one({
|
|
"username": user.username,
|
|
"password": hashed_password,
|
|
"is_admin": False
|
|
})
|
|
|
|
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={
|
|
"sub": user.username,
|
|
"is_admin": False
|
|
},
|
|
expires_delta=access_token_expires
|
|
)
|
|
|
|
return {
|
|
"access_token": access_token,
|
|
"token_type": "bearer",
|
|
"user_id": str(user_id.inserted_id),
|
|
"is_admin": False
|
|
}
|
|
|
|
@auth_router.post(
|
|
path="/login",
|
|
response_model=Token,
|
|
responses={401: {"model": HTTPError}}
|
|
)
|
|
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
|
|
user = users_collection.find_one({"username": form_data.username})
|
|
if not user or not verify_password(form_data.password, user["password"]):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={
|
|
"sub": form_data.username,
|
|
"is_admin": user.get("is_admin", False)
|
|
},
|
|
expires_delta=access_token_expires
|
|
)
|
|
|
|
return {
|
|
"access_token": access_token,
|
|
"token_type": "bearer",
|
|
"user_id": str(user["_id"]),
|
|
"is_admin": user.get("is_admin", False)
|
|
}
|
|
|
|
@auth_router.get(
|
|
path="/logout",
|
|
responses={401: {"model": HTTPError}}
|
|
)
|
|
async def logout(current_user: User = Depends(get_current_user)):
|
|
return {"message": "Logged out"}
|