✨ New routes file for images (WIP)
continuous-integration/drone/push Build is failing
Details
continuous-integration/drone/push Build is failing
Details
parent
f6d747814b
commit
caa705546a
@ -0,0 +1,216 @@
|
||||
import os
|
||||
import hashlib
|
||||
from typing import Optional
|
||||
from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Depends, status
|
||||
from fastapi.responses import FileResponse
|
||||
from PIL import Image as PILImage
|
||||
import magic
|
||||
from bson import ObjectId
|
||||
import bson
|
||||
from datetime import datetime
|
||||
|
||||
from app.models import HTTPError, User
|
||||
from app.models.image import Image
|
||||
from app.dto.image import ImageUploadDTO, ImageUpdateCaptionDTO
|
||||
from .utils import get_current_user, objectid_misformatted
|
||||
import app.config as config
|
||||
|
||||
# Database setup
|
||||
from pymongo import MongoClient
|
||||
client = MongoClient(config.MONGODB_URL, username=config.MONGODB_USERNAME, password=config.MONGODB_PASSWORD)
|
||||
db = client[config.MONGODB_DATABASE]
|
||||
|
||||
images_collection = db["images"]
|
||||
pins_collection = db["pins"]
|
||||
pin_permissions_collection = db["pin_permissions"]
|
||||
|
||||
# Configuration
|
||||
UPLOAD_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "images")
|
||||
MAX_IMAGE_SIZE = 8 * 1024 * 1024 # 8 Mo
|
||||
ALLOWED_MIME_TYPES = [
|
||||
"image/jpeg",
|
||||
"image/png",
|
||||
"image/heic"
|
||||
]
|
||||
|
||||
# Créer le dossier images s'il n'existe pas
|
||||
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
||||
|
||||
images_router = APIRouter(
|
||||
prefix="/image",
|
||||
tags=["Images"]
|
||||
)
|
||||
|
||||
def check_image_permissions(image_id: str, current_user: User):
|
||||
"""Vérifie si l'utilisateur a les permissions pour accéder à l'image"""
|
||||
try:
|
||||
image = images_collection.find_one({"_id": ObjectId(image_id)})
|
||||
if not image:
|
||||
raise HTTPException(status_code=404, detail="Image not found")
|
||||
|
||||
# Si l'image n'est pas associée à un pin, n'importe qui peut y accéder
|
||||
if not image.get("pin_id"):
|
||||
return image
|
||||
|
||||
# Récupérer le pin associé
|
||||
pin = pins_collection.find_one({"_id": ObjectId(image["pin_id"])})
|
||||
if not pin:
|
||||
raise HTTPException(status_code=404, detail="Associated pin not found")
|
||||
|
||||
# Vérifier si l'utilisateur est le propriétaire du pin
|
||||
if pin["user_id"] == current_user.uid:
|
||||
return image
|
||||
|
||||
# Vérifier si le pin est partagé avec l'utilisateur
|
||||
permission = pin_permissions_collection.find_one({
|
||||
"pin_id": ObjectId(image["pin_id"]),
|
||||
"user_id": current_user.uid
|
||||
})
|
||||
|
||||
if not permission:
|
||||
raise HTTPException(status_code=403, detail="You don't have permission to access this image")
|
||||
|
||||
return image
|
||||
|
||||
except bson.errors.InvalidId:
|
||||
objectid_misformatted()
|
||||
|
||||
def process_image(file_path: str) -> tuple[str, str]:
|
||||
"""Traite et optimise l'image"""
|
||||
with PILImage.open(file_path) as img:
|
||||
# Convertir en RGB si nécessaire (pour les images HEIC par exemple)
|
||||
if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info):
|
||||
bg = PILImage.new('RGB', img.size, (255, 255, 255))
|
||||
bg.paste(img, mask=img.split()[-1])
|
||||
img = bg
|
||||
elif img.mode != 'RGB':
|
||||
img = img.convert('RGB')
|
||||
|
||||
# Redimensionner si nécessaire pour le Full HD (1920x1080)
|
||||
if img.size[0] > 1920 or img.size[1] > 1080:
|
||||
img.thumbnail((1920, 1080), PILImage.Resampling.LANCZOS)
|
||||
|
||||
# Sauvegarder avec compression
|
||||
img.save(file_path, 'JPEG', quality=85, optimize=True)
|
||||
|
||||
# Calculer le hash du fichier optimisé
|
||||
with open(file_path, 'rb') as f:
|
||||
file_hash = hashlib.sha256(f.read()).hexdigest()
|
||||
|
||||
return file_hash, 'jpg'
|
||||
|
||||
@images_router.post(
|
||||
path="/pin/{pin_id}/add",
|
||||
responses={
|
||||
401: {"model": HTTPError},
|
||||
404: {"model": HTTPError},
|
||||
413: {"model": HTTPError},
|
||||
415: {"model": HTTPError}
|
||||
}
|
||||
)
|
||||
async def add_image(
|
||||
pin_id: Optional[str],
|
||||
image: UploadFile = File(...),
|
||||
metadata: ImageUploadDTO = Depends(),
|
||||
current_user: User = Depends(get_current_user)
|
||||
):
|
||||
# Vérifier la taille du fichier
|
||||
image_data = await image.read()
|
||||
if len(image_data) > MAX_IMAGE_SIZE:
|
||||
raise HTTPException(status_code=413, detail="Image too large (max 8MB)")
|
||||
|
||||
# Vérifier le type MIME
|
||||
mime_type = magic.from_buffer(image_data, mime=True)
|
||||
if mime_type not in ALLOWED_MIME_TYPES:
|
||||
raise HTTPException(status_code=415, detail="Unsupported image type")
|
||||
|
||||
# Sauvegarder temporairement le fichier
|
||||
temp_path = os.path.join(UPLOAD_DIR, f"temp_{image.filename}")
|
||||
with open(temp_path, "wb") as f:
|
||||
f.write(image_data)
|
||||
|
||||
try:
|
||||
# Traiter l'image et obtenir son hash
|
||||
file_hash, extension = process_image(temp_path)
|
||||
final_path = os.path.join(UPLOAD_DIR, f"{file_hash}.{extension}")
|
||||
|
||||
# Si l'image n'existe pas déjà physiquement, la déplacer
|
||||
if not os.path.exists(final_path):
|
||||
os.rename(temp_path, final_path)
|
||||
else:
|
||||
os.remove(temp_path)
|
||||
|
||||
# Créer l'entrée en base de données
|
||||
image_doc = {
|
||||
"pin_id": ObjectId(pin_id) if pin_id != 'null' else None,
|
||||
"image_hash": file_hash,
|
||||
"metadata": {
|
||||
"created_at": metadata.exif_date or datetime.now().isoformat(),
|
||||
"original_filename": image.filename,
|
||||
"mime_type": mime_type,
|
||||
"size": len(image_data)
|
||||
},
|
||||
"caption": metadata.caption
|
||||
}
|
||||
|
||||
image_id = images_collection.insert_one(image_doc).inserted_id
|
||||
|
||||
return {"id": str(image_id)}
|
||||
|
||||
except Exception as e:
|
||||
# Nettoyer en cas d'erreur
|
||||
if os.path.exists(temp_path):
|
||||
os.remove(temp_path)
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@images_router.get(
|
||||
path="/{id}",
|
||||
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}}
|
||||
)
|
||||
async def get_image(id: str, current_user: User = Depends(get_current_user)):
|
||||
image = check_image_permissions(id, current_user)
|
||||
image_path = os.path.join(UPLOAD_DIR, f"{image['image_hash']}.jpg")
|
||||
|
||||
if not os.path.exists(image_path):
|
||||
raise HTTPException(status_code=404, detail="Image file not found")
|
||||
|
||||
return FileResponse(image_path)
|
||||
|
||||
@images_router.delete(
|
||||
path="/{id}",
|
||||
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}}
|
||||
)
|
||||
async def delete_image(id: str, current_user: User = Depends(get_current_user)):
|
||||
image = check_image_permissions(id, current_user)
|
||||
|
||||
# Supprimer l'entrée en base
|
||||
images_collection.delete_one({"_id": ObjectId(id)})
|
||||
|
||||
# Vérifier si d'autres images utilisent le même hash
|
||||
other_images = images_collection.find_one({"image_hash": image["image_hash"]})
|
||||
|
||||
# Si plus personne n'utilise l'image, supprimer le fichier
|
||||
if not other_images:
|
||||
image_path = os.path.join(UPLOAD_DIR, f"{image['image_hash']}.jpg")
|
||||
if os.path.exists(image_path):
|
||||
os.remove(image_path)
|
||||
|
||||
return {"message": "Image deleted successfully"}
|
||||
|
||||
@images_router.patch(
|
||||
path="/{id}/caption",
|
||||
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}}
|
||||
)
|
||||
async def update_caption(
|
||||
id: str,
|
||||
caption_data: ImageUpdateCaptionDTO,
|
||||
current_user: User = Depends(get_current_user)
|
||||
):
|
||||
image = check_image_permissions(id, current_user)
|
||||
|
||||
images_collection.update_one(
|
||||
{"_id": ObjectId(id)},
|
||||
{"$set": {"caption": caption_data.caption}}
|
||||
)
|
||||
|
||||
return {"message": "Caption updated successfully"}
|
Loading…
Reference in new issue