⬆️ Upgraded pydantic to latest version & migrated python-jose to joserfc - from authlib, a more secure lib

master
Alix JEUDI--LEMOINE 4 months ago
parent 26061d7366
commit 9903a78d0e

@ -4,7 +4,9 @@ from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from typing import Optional
from datetime import datetime, timedelta
from jose import JWTError, jwt
from joserfc import jwt
from joserfc.errors import JoseError
from joserfc.jwk import OctKey
from bson.objectid import ObjectId
from app.utils import get_password_hash, verify_password
@ -52,7 +54,8 @@ def create_access_token(data: dict, expires_delta: timedelta):
expire = datetime.now() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, config.SECRET_KEY, algorithm=config.ALGORITHM)
header = {"alg": config.ALGORITHM}
encoded_jwt = jwt.encode(header, to_encode, OctKey.import_key(config.SECRET_KEY))
return encoded_jwt
@ -64,12 +67,12 @@ async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
)
try:
payload = jwt.decode(token, config.SECRET_KEY, algorithms=[config.ALGORITHM])
username: str = payload.get("sub")
payload = jwt.decode(token, OctKey.import_key(config.SECRET_KEY))
username: str = payload.claims["sub"]
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
except JoseError:
raise credentials_exception
user = users_collection.find_one({"username": token_data.username})
@ -78,6 +81,18 @@ async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
return serializers.user_serialize(user)
# Exceptions
def friend_not_found():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Friend not found"
)
def objectid_misformatted():
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="The ObjectID is misformatted"
)
# Routes
@app.post(
@ -137,10 +152,7 @@ async def get_pin(id: str, current_user: User = Depends(get_current_user)):
try:
pin = pins_collection.find_one({"_id": ObjectId(id)})
except bson.errors.InvalidId:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="The ObjectID is misformatted"
)
objectid_misformatted()
if pin is None:
raise HTTPException(status_code=404, detail="Pin not found")
@ -155,10 +167,7 @@ async def update_pin(id: str, pin: PinDTO, current_user: User = Depends(get_curr
try:
result = pins_collection.update_one({"_id": ObjectId(id)}, {"$set": pin.model_dump()})
except bson.errors.InvalidId:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="The ObjectID is misformatted"
)
objectid_misformatted()
if result.matched_count == 0:
raise HTTPException(status_code=404, detail="Pin not found")
@ -181,11 +190,6 @@ async def list_pins(current_user: User = Depends(get_current_user)):
pins = serializers.pins_serialize(pins_collection.find().to_list())
return pins
def friend_not_found():
raise HTTPException(status_code=404, detail="Friend not found")
@app.get(
path="/friend/{id}",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}, 422: {"model": HTTPError}}
@ -194,10 +198,7 @@ async def get_friend(id: str, current_user: User = Depends(get_current_user)):
try:
friend = friends_collection.find_one({"_id": ObjectId(id)})
except bson.errors.InvalidId:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="The ObjectID is misformatted"
)
objectid_misformatted()
if friend is None: friend_not_found()
@ -229,10 +230,7 @@ async def delete_friend(id: str, current_user: User = Depends(get_current_user))
try:
result = friends_collection.delete_one({"_id": ObjectId(id)})
except bson.errors.InvalidId:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="The ObjectID is misformatted"
)
objectid_misformatted()
if result.deleted_count == 0: friend_not_found()
@ -246,10 +244,7 @@ async def accept_friend(id: str, current_user: User = Depends(get_current_user))
try:
result = friends_collection.update_one({"_id": ObjectId(id)}, {"$set": {"status": "accepted"}})
except bson.errors.InvalidId:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="The ObjectID is misformatted"
)
objectid_misformatted()
if result.matched_count == 0: friend_not_found()
@ -263,10 +258,7 @@ async def deny_friend(id: str, current_user: User = Depends(get_current_user)):
try:
result = friends_collection.update_one({"_id": ObjectId(id)}, {"$set": {"status": "denied"}})
except bson.errors.InvalidId:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="The ObjectID is misformatted"
)
objectid_misformatted()
if result.matched_count == 0: friend_not_found()

@ -1,5 +1,5 @@
fastapi[standard]==0.115.5
pydantic==2.9.2
pydantic==2.10.3
pymongo==4.10.1
uvicorn==0.32.0
python-jose==3.3.0
joserfc==1.0.1

@ -1,7 +1,7 @@
fastapi[standard]==0.115.5
pydantic==2.9.2
pydantic==2.10.3
pymongo==4.10.1
uvicorn==0.32.0
python-jose==3.3.0
joserfc==1.0.1
pytest==8.3.4
pytest-cov==6.0.0
Loading…
Cancel
Save