Added refresh token handling + updated models and authentication routes to include the refresh token.

master
Alix JEUDI--LEMOINE 2 days ago
parent 373d7aebe6
commit b9a7822d5c

@ -1,3 +1,4 @@
from .FriendAddDTO import FriendAddDTO from .FriendAddDTO import FriendAddDTO
from .user import UserDTO, UserRegisterDTO, UserAdminDTO from .user import UserDTO, UserRegisterDTO, UserAdminDTO
from .pin import PinDTO, PinShareDTO from .pin import PinDTO, PinShareDTO
from .token import RefreshTokenDTO

@ -0,0 +1,4 @@
from pydantic import BaseModel
class RefreshTokenDTO(BaseModel):
refresh_token: str

@ -1,6 +1,7 @@
from pydantic import BaseModel from pydantic import BaseModel
class Token(BaseModel): class Token(BaseModel):
refresh_token: str
access_token: str access_token: str
token_type: str token_type: str
user_id: str user_id: str

@ -6,8 +6,8 @@ import pymongo
import app.config as config import app.config as config
from app.models import User, Token, HTTPError from app.models import User, Token, HTTPError
from app.dto import UserRegisterDTO from app.dto import UserRegisterDTO, RefreshTokenDTO
from app.routes.utils import get_current_user, create_access_token from app.routes.utils import get_current_user, create_access_token, create_refresh_token, get_current_user_from_refresh_token
from app.utils import get_password_hash, verify_password from app.utils import get_password_hash, verify_password
# Database setup # Database setup
@ -20,6 +20,26 @@ auth_router = APIRouter(
tags=["Auth"] tags=["Auth"]
) )
def create_tokens(user: User):
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={
"sub": user["username"],
"is_admin": user["is_admin"]
},
expires_delta=access_token_expires
)
refresh_token_expires = timedelta(minutes=config.REFRESH_TOKEN_EXPIRE_MINUTES)
refresh_token = create_refresh_token(
data={
"sub": user["username"],
"is_admin": user["is_admin"]
},
expires_delta=refresh_token_expires
)
return access_token, refresh_token
@auth_router.post( @auth_router.post(
path="/register", path="/register",
response_model=Token, response_model=Token,
@ -34,23 +54,18 @@ async def register(user: UserRegisterDTO):
) )
hashed_password = get_password_hash(user.password) hashed_password = get_password_hash(user.password)
user_id = users_collection.insert_one({ user = {
"username": user.username, "username": user.username,
"password": hashed_password, "password": hashed_password,
"is_admin": False "is_admin": False
}) }
user_id = users_collection.insert_one(user)
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES) access_token, refresh_token = create_tokens(user)
access_token = create_access_token(
data={
"sub": user.username,
"is_admin": False
},
expires_delta=access_token_expires
)
return { return {
"access_token": access_token, "access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer", "token_type": "bearer",
"user_id": str(user_id.inserted_id), "user_id": str(user_id.inserted_id),
"is_admin": False "is_admin": False
@ -70,25 +85,37 @@ async def login(form_data: OAuth2PasswordRequestForm = Depends()):
headers={"WWW-Authenticate": "Bearer"}, headers={"WWW-Authenticate": "Bearer"},
) )
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES) access_token, refresh_token = create_tokens(user)
access_token = create_access_token(
data={
"sub": form_data.username,
"is_admin": user.get("is_admin", False)
},
expires_delta=access_token_expires
)
return { return {
"access_token": access_token, "access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer", "token_type": "bearer",
"user_id": str(user["_id"]), "user_id": str(user["_id"]),
"is_admin": user.get("is_admin", False) "is_admin": user.get("is_admin", False)
} }
@auth_router.post(
path="/refresh-token",
response_model=Token,
responses={401: {"model": HTTPError}}
)
async def refresh(refresh_data: RefreshTokenDTO):
current_user = get_current_user_from_refresh_token(refresh_data.refresh_token)
new_access_token, new_refresh_token = create_tokens(current_user)
return {
"access_token": new_access_token,
"refresh_token": new_refresh_token,
"token_type": "bearer",
"user_id": str(current_user["_id"]),
"is_admin": current_user["is_admin"]
}
@auth_router.get( @auth_router.get(
path="/logout", path="/logout",
responses={401: {"model": HTTPError}} responses={401: {"model": HTTPError}}
) )
async def logout(current_user: User = Depends(get_current_user)): async def logout(current_user: User = Depends(get_current_user)):
return {"message": "Logged out"} return {"message": "Logged out"}

Loading…
Cancel
Save