You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
api/app/routes/images.py

230 lines
8.1 KiB

import os
import hashlib
from typing import Optional
from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Depends, status
from fastapi.responses import FileResponse
from PIL import Image as PILImage
import magic
from bson import ObjectId
import bson
from datetime import datetime
from app.models import HTTPError, User
from app.models.image import Image
from app.dto.image import ImageUploadDTO, ImageCaptionDTO, ImageMetadataDTO
from .utils import get_current_user, objectid_misformatted
import app.config as config
# Database setup
from pymongo import MongoClient
client = MongoClient(config.MONGODB_URL, username=config.MONGODB_USERNAME, password=config.MONGODB_PASSWORD)
db = client[config.MONGODB_DATABASE]
images_collection = db["images"]
pins_collection = db["pins"]
pin_permissions_collection = db["pin_permissions"]
# Créer le dossier images s'il n'existe pas
os.makedirs(config.UPLOAD_DIR, exist_ok=True)
images_router = APIRouter(
prefix="/image",
tags=["Images"]
)
def check_image_permissions(image_id: str, current_user: User):
"""Vérifie si l'utilisateur a les permissions pour accéder à l'image"""
try:
image = images_collection.find_one({"_id": ObjectId(image_id)})
if not image:
raise HTTPException(status_code=404, detail="Image not found")
# Si l'image n'est pas associée à un pin, personne ne peut y accéder
if not image.get("pin_id"):
raise HTTPException(status_code=403, detail="Image is not associated with any pin")
# Récupérer le pin associé
pin = pins_collection.find_one({"_id": ObjectId(image["pin_id"])})
if not pin:
raise HTTPException(status_code=404, detail="Associated pin not found")
# Vérifier si l'utilisateur est le propriétaire du pin
if pin["user_id"] == current_user.uid:
return image
# Vérifier si le pin est partagé avec l'utilisateur
permission = pin_permissions_collection.find_one({
"pin_id": ObjectId(image["pin_id"]),
"user_id": current_user.uid
})
if not permission:
raise HTTPException(status_code=403, detail="You don't have permission to access this image")
return image
except bson.errors.InvalidId:
objectid_misformatted()
def process_image(file_path: str) -> tuple[str, str]:
"""Traite et optimise l'image"""
with PILImage.open(file_path) as img:
# Convertir en RGB si nécessaire (pour les images HEIC par exemple)
if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info):
bg = PILImage.new('RGB', img.size, (255, 255, 255))
bg.paste(img, mask=img.split()[-1])
img = bg
elif img.mode != 'RGB':
img = img.convert('RGB')
# Redimensionner si nécessaire pour le Full HD (1920x1080)
if img.size[0] > 1920 or img.size[1] > 1080:
img.thumbnail((1920, 1080), PILImage.Resampling.LANCZOS)
# Sauvegarder avec compression
img.save(file_path, 'JPEG', quality=85, optimize=True)
# Calculer le hash du fichier optimisé
with open(file_path, 'rb') as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
return file_hash, 'jpg'
@images_router.post(
path="/pin/{pin_id}/add",
responses={
401: {"model": HTTPError},
404: {"model": HTTPError},
413: {"model": HTTPError},
415: {"model": HTTPError}
}
)
async def add_image(
pin_id: Optional[str],
image: UploadFile = File(...),
metadata: ImageUploadDTO = Depends(),
current_user: User = Depends(get_current_user)
):
# Vérifier la taille du fichier
image_data = await image.read()
if len(image_data) > config.MAX_IMAGE_SIZE:
raise HTTPException(status_code=413, detail="Image too large (max 8MB)")
# Vérifier le type MIME
mime_type = magic.from_buffer(image_data, mime=True)
if mime_type not in config.ALLOWED_MIME_TYPES:
raise HTTPException(status_code=415, detail="Unsupported image type")
# Sauvegarder temporairement le fichier
temp_path = os.path.join(config.UPLOAD_DIR, f"temp_{image.filename}")
with open(temp_path, "wb") as f:
f.write(image_data)
try:
# Traiter l'image et obtenir son hash
file_hash, extension = process_image(temp_path)
final_path = os.path.join(config.UPLOAD_DIR, f"{file_hash}.{extension}")
# Si l'image n'existe pas déjà physiquement, la déplacer
if not os.path.exists(final_path):
os.rename(temp_path, final_path)
else:
os.remove(temp_path)
# Créer l'entrée en base de données
image_doc = {
"pin_id": ObjectId(pin_id) if pin_id != 'null' else None,
"image_hash": file_hash,
"metadata": {
"created_at": metadata.exif_date or datetime.now().isoformat(),
"original_filename": image.filename,
"mime_type": mime_type,
"size": len(image_data)
},
"caption": metadata.caption
}
image_id = images_collection.insert_one(image_doc).inserted_id
return {"id": str(image_id)}
except Exception as e:
# Nettoyer en cas d'erreur
if os.path.exists(temp_path):
os.remove(temp_path)
raise HTTPException(status_code=500, detail=str(e))
@images_router.get(
path="/{id}",
responses={401: {"model": HTTPError}, 403: {"model": HTTPError}, 404: {"model": HTTPError}}
)
async def get_image(id: str, current_user: User = Depends(get_current_user)):
image = check_image_permissions(id, current_user)
image_path = os.path.join(config.UPLOAD_DIR, f"{image['image_hash']}.jpg")
if not os.path.exists(image_path):
raise HTTPException(status_code=404, detail="Image file not found")
return FileResponse(image_path)
@images_router.delete(
path="/{id}",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}}
)
async def delete_image(id: str, current_user: User = Depends(get_current_user)):
image = check_image_permissions(id, current_user)
# Supprimer l'entrée en base
images_collection.delete_one({"_id": ObjectId(id)})
# Vérifier si d'autres images utilisent le même hash
other_images = images_collection.find_one({"image_hash": image["image_hash"]})
# Si plus personne n'utilise l'image, supprimer le fichier
if not other_images:
image_path = os.path.join(config.UPLOAD_DIR, f"{image['image_hash']}.jpg")
if os.path.exists(image_path):
os.remove(image_path)
return {"message": "Image deleted successfully"}
@images_router.patch(
path="/{id}/caption",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}}
)
async def update_caption(
id: str,
caption_data: ImageCaptionDTO,
current_user: User = Depends(get_current_user)
):
check_image_permissions(id, current_user)
images_collection.update_one(
{"_id": ObjectId(id)},
{"$set": {"caption": caption_data.caption}}
)
return {"message": "Caption updated successfully"}
@images_router.get(
path="/{id}/caption",
response_model=ImageCaptionDTO,
responses={401: {"model": HTTPError}, 403: {"model": HTTPError}, 404: {"model": HTTPError}}
)
async def get_caption(id: str, current_user: User = Depends(get_current_user)):
image = check_image_permissions(id, current_user)
return ImageCaptionDTO(caption=image.get("caption", ""))
@images_router.get(
path="/{id}/metadata",
response_model=ImageMetadataDTO,
responses={401: {"model": HTTPError}, 403: {"model": HTTPError}, 404: {"model": HTTPError}}
)
async def get_metadata(id: str, current_user: User = Depends(get_current_user)):
image = check_image_permissions(id, current_user)
return ImageMetadataDTO(
metadata=image.get("metadata", {}),
pin_id=str(image.get("pin_id")) if image.get("pin_id") else None
)