Compare commits

..

54 Commits

Author SHA1 Message Date
Alix JEUDI--LEMOINE e2db50f44f 🚑 Fix tests
continuous-integration/drone/push Build is passing Details
10 hours ago
Alix JEUDI--LEMOINE cd99f80d0a Added tests (+mock) for the push service + updated image tests (missing + large image).
continuous-integration/drone/push Build is failing Details
10 hours ago
Alix JEUDI--LEMOINE 43cd1b2a73 🐛 Fixed storage size value
continuous-integration/drone/push Build is passing Details
12 hours ago
Alix JEUDI--LEMOINE 2698bf4ece 🩹 Fixed bad image for pin in stub + renamed principal account
continuous-integration/drone/push Build is passing Details
12 hours ago
Alix JEUDI--LEMOINE e2d870d3d2 Edit stub for more "conventional" descriptions + delete some useless pins
continuous-integration/drone/push Build is passing Details
12 hours ago
Alexis Feron 84426a6678 🏷️ Fix type issue
continuous-integration/drone/push Build is passing Details
15 hours ago
Alexis Feron 2a8b412f23 🔒️ Fix user_id on the pin update
continuous-integration/drone/push Build is failing Details
15 hours ago
Alix JEUDI--LEMOINE d50b25fb26 Added an optional field for push subscriptions in the user model.
continuous-integration/drone/push Build is passing Details
1 day ago
Alix JEUDI--LEMOINE b9a7822d5c Added refresh token handling + updated models and authentication routes to include the refresh token.
1 day ago
Alix JEUDI--LEMOINE 373d7aebe6 🐛 Fixed access to user subscriptions in push notification subscription route.
1 day ago
Alix JEUDI--LEMOINE c3e4851200 Added refresh token expiration time management in configuration.
1 day ago
Alix JEUDI--LEMOINE bf0f053992 Added create refresh token function
1 day ago
Alix JEUDI--LEMOINE 163cbfd0ab Moving logic for push notifications to the service
continuous-integration/drone/push Build is passing Details
1 day ago
Alix JEUDI--LEMOINE 9c6e1acdb1 Removed the double (useless) check user in the database for the push subscription route
1 day ago
Alix JEUDI--LEMOINE d347fa25a3 Refactored the push notification service: integrated db to manage user subscriptions (+delete invalid subscriptions).
1 day ago
Alix JEUDI--LEMOINE 375df5dc0c 🚧 Default icon for push notifications.
continuous-integration/drone/push Build is passing Details
2 days ago
Alix JEUDI--LEMOINE 3f0462d84c Updated test dependencies: added pywebpush library for push notifications.
continuous-integration/drone/push Build is passing Details
2 days ago
Alix JEUDI--LEMOINE 64987b4e99 Added VAPID private key in the CI/CD configuration for push notifications.
continuous-integration/drone/push Build is failing Details
2 days ago
Alix JEUDI--LEMOINE f160e94a7a 🚧 WIP: Added functionality to send push notifications when adding a friend.
continuous-integration/drone/push Build is failing Details
2 days ago
Alix JEUDI--LEMOINE baf90ddf79 Implementation of the push notifications service, including sending notifications to specific subscribers and multiple subscribers, as well as creating standardized notification payloads.
2 days ago
Alix JEUDI--LEMOINE 514eb41f6c Added PushSubscription class to manage push notification subscriptions.
2 days ago
Alix JEUDI--LEMOINE 6ef3f1c6f7 Added a router for push notifications, including user subscription functionality.
2 days ago
Alix JEUDI--LEMOINE 506e879334 Added VAPID keys for Push notifications in the configuration
2 days ago
Alix JEUDI--LEMOINE 0b07d049e5 Added pywebpush dependency
2 days ago
Alix JEUDI--LEMOINE 24f35c05f1 Add tests for user search and retrieval, including unauthorized access and handling of invalid IDs
continuous-integration/drone/push Build is passing Details
3 days ago
Alix JEUDI--LEMOINE c685a48f24 Add tests for pin sharing functionality, including sharing, viewing, and deleting shares
3 days ago
Alix JEUDI--LEMOINE 9be3eb528a Add tests for image access permissions and metadata retrieval
3 days ago
Alix JEUDI--LEMOINE fc4fa3d6cc Add comprehensive admin tests for user management and statistics endpoints
3 days ago
Alix JEUDI--LEMOINE e9235e17c3 🐛 Improve error handling in token validation
3 days ago
Alix JEUDI--LEMOINE 0b6973c939 🐛 Validate pin existence before adding image and improve error handling
3 days ago
Alexis Feron 1b2df687c5 🚧 Try omit stub from coverage
continuous-integration/drone/push Build is passing Details
3 days ago
Alexis Feron b7c4108bed Omit stub from coverage
continuous-integration/drone/push Build is failing Details
3 days ago
Alexis Feron baa9105e79 🐛 Fix pin delete 404 error
continuous-integration/drone/push Build is passing Details
1 week ago
Alexis Feron 7fa6f76078 🐛 Fix share delete 422 error
continuous-integration/drone/push Build is passing Details
1 week ago
Alexis Feron 92b1cba4ab 🍻 Try update share deletion
continuous-integration/drone/push Build is passing Details
1 week ago
Maxence JOUANNET 707fd14607 add poi pin
continuous-integration/drone/push Build is passing Details
1 week ago
Maxence JOUANNET ae2a476333 is_poi adding
continuous-integration/drone/push Build is passing Details
1 week ago
Alexis Feron 86051c7c64 Add pin share delete
continuous-integration/drone/push Build is passing Details
1 week ago
Alix JEUDI--LEMOINE deaf1d9838 🚑 CI/CD down (containers deleted by admin)
continuous-integration/drone/push Build is passing Details
2 weeks ago
Alix JEUDI--LEMOINE 4ec91bc1d8 🔧 Updated paths for JSON files in stub.py to correct relative references.
continuous-integration/drone/push Build is passing Details
2 weeks ago
Alix JEUDI--LEMOINE 9c41e925d2 Added routes for user management: list of users and deletion of a user by UID.
continuous-integration/drone/push Build is passing Details
2 weeks ago
Alix JEUDI--LEMOINE c1c65cdc87 🔒 Updated password for test user (to allow login from frontend)
2 weeks ago
Alix JEUDI--LEMOINE ef749702c0 Updated stub, added JSON files for images and pins, as well as a function to import real data (+photos) from these files.
2 weeks ago
Alix JEUDI--LEMOINE 37dc2913d8 Added validation for user_id in PinDTO (for POIs)
continuous-integration/drone/push Build is passing Details
2 weeks ago
Alix JEUDI--LEMOINE 455b8b1071 Add POI gestion for pins
continuous-integration/drone/push Build is passing Details
2 weeks ago
Alexis Feron d9d08ae4c0 ️ Add username to get pin shares
continuous-integration/drone/push Build is passing Details
3 weeks ago
Alexis Feron 402411f7b9 Add a get route for pin shares
continuous-integration/drone/push Build is passing Details
3 weeks ago
Alix JEUDI--LEMOINE 2b37533d9d 🐛 Fix bug top users were not correct
continuous-integration/drone/push Build is passing Details
3 weeks ago
Alix JEUDI--LEMOINE dc3e7544b4 New admin router load/get/patch config + stats for administration panel
continuous-integration/drone/push Build is passing Details
3 weeks ago
Alix JEUDI--LEMOINE 2bd8ffd591 Add get_admin_user guard for routes
3 weeks ago
Alix JEUDI--LEMOINE 597cef0c22 Add is_admin attribute on User model
3 weeks ago
Alix JEUDI--LEMOINE 5b4615da67 🔧 Refactor of config to add it in DB for admin part
3 weeks ago
Alix JEUDI--LEMOINE 868b09f7ef 🦺 Validate fields in pin to avoid errors
continuous-integration/drone/push Build is passing Details
3 weeks ago
Alix JEUDI--LEMOINE 8a161cb2c7 🐛 Fix stub
continuous-integration/drone/push Build is passing Details
3 weeks ago

@ -35,7 +35,7 @@ steps:
CODEFIRST_CLIENTDRONE_ENV_ME_CONFIG_BASICAUTH_PASSWORD:
from_secret: SECRET_MONGODB_PASSWORD
ADMINS: alixjeudi--lemoine,alexisferon,mathisframit,maxencejouannet
depends_on: [ deploy-database ]
depends_on: [deploy-database]
- name: code-analysis
image: hub.codefirst.iut.uca.fr/alix.jeudi--lemoine/codefirst-dronesonarplugin-python312:latest
@ -49,12 +49,12 @@ steps:
commands:
- apt-get update && apt-get install -y libmagic1
- pip install -r tests/requirements.txt
- pytest
- pytest --cov=app --cov-config=.coveragerc
- /opt/sonar-scanner/bin/sonar-scanner -Dsonar.login=$PLUGIN_SONAR_TOKEN -Dsonar.projectKey=SAE3A_MemoryMap-api -Dsonar.coverage.exclusions="tests/**" -Dsonar.python.version=3 -Dsonar.python.coverage.reportPaths="coverage.xml"
settings:
sonar_token:
from_secret: SECRET_SONAR_TOKEN
depends_on: [ deploy-database ]
depends_on: [deploy-database]
- name: docker-build-and-push
image: plugins/docker
@ -68,7 +68,7 @@ steps:
from_secret: SECRET_REGISTRY_USERNAME
password:
from_secret: SECRET_REGISTRY_PASSWORD
depends_on: [ code-analysis ]
depends_on: [code-analysis]
- name: deploy-api
image: hub.codefirst.iut.uca.fr/thomas.bellembois/codefirst-dockerproxy-clientdrone:latest
@ -90,5 +90,7 @@ steps:
from_secret: SECRET_JWT_ALGORITHM
CODEFIRST_CLIENTDRONE_ENV_JWT_ACCESS_TOKEN_EXPIRE_MINUTES:
from_secret: SECRET_JWT_ACCESS_TOKEN_EXPIRE_MINUTES
CODEFIRST_CLIENTDRONE_ENV_VAPID_PRIVATE_KEY:
from_secret: SECRET_VAPID_PRIVATE_KEY
ADMINS: alixjeudi--lemoine,alexisferon,mathisframit,maxencejouannet
depends_on: [ docker-build-and-push ]
depends_on: [docker-build-and-push]

@ -15,6 +15,7 @@
[![Vulnerabilities](https://codefirst.iut.uca.fr/sonar/api/project_badges/measure?project=SAE3A_MemoryMap-api&token=65450ce2ffdd247afe93544fa91b41a082ad5217&metric=vulnerabilities)](https://codefirst.iut.uca.fr/sonar/dashboard?id=SAE3A_MemoryMap-api)
> Made with FastAPI (Python)
## Documentation

@ -7,18 +7,44 @@ MONGODB_PASSWORD = os.getenv("MONGODB_PASSWORD", "secret")
MONGODB_DATABASE = os.getenv("MONGODB_DATABASE", "memorymap")
# Constants for JWT
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "_2YfT44$xF.Tg_xI63UH3D7:N+>pZN2';j%>7H@?e0:Xor'pV[") # temporary of course :)
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "_2YfT44$xF.Tg_xI63UH3D7:N+>pZN2';j%>7H@?e0:Xor'pV[") # only for local development of course :)
ALGORITHM = os.getenv("JWT_ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("JWT_ACCESS_TOKEN_EXPIRE_MINUTES", 30)) # TODO: check what to add here / maybe need to evaluate criticity of that?
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("JWT_ACCESS_TOKEN_EXPIRE_MINUTES", 30))
REFRESH_TOKEN_EXPIRE_MINUTES = int(os.getenv("JWT_REFRESH_TOKEN_EXPIRE_MINUTES", 60 * 24 * 30)) # 30 days
# Constants for OAuth2
TOKEN_URL = "/api/v1/login" # Path to the auth
# Constants for images
UPLOAD_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "images")
MAX_IMAGE_SIZE = 8 * 1024 * 1024 # 8 Mo
ALLOWED_MIME_TYPES = [
# Constants for config
UPLOAD_DIR = os.getenv("UPLOAD_DIR", os.path.join(os.path.dirname(os.path.dirname(__file__)), "images"))
# Configuration par défaut du système
DEFAULT_CONFIG = {
"max_image_size": 8 * 1024 * 1024, # 8MB
"max_images_per_pin": 10,
"max_images_per_user": 100,
"allowed_image_types": [
"image/jpeg",
"image/png",
"image/heic"
]
"image/gif",
"image/webp"
],
"max_pins_per_user": 50,
"max_friends_per_user": 100
}
# Configuration actuelle (sera mise à jour au démarrage)
MAX_IMAGE_SIZE = DEFAULT_CONFIG["max_image_size"]
MAX_IMAGES_PER_PIN = DEFAULT_CONFIG["max_images_per_pin"]
MAX_IMAGES_PER_USER = DEFAULT_CONFIG["max_images_per_user"]
ALLOWED_MIME_TYPES = DEFAULT_CONFIG["allowed_image_types"]
MAX_PINS_PER_USER = DEFAULT_CONFIG["max_pins_per_user"]
MAX_FRIENDS_PER_USER = DEFAULT_CONFIG["max_friends_per_user"]
# Token privé pour les notifications Push
VAPID_PRIVATE_KEY = os.getenv("VAPID_PRIVATE_KEY")
# Claims VAPID pour les notifications Push
VAPID_CLAIMS = {
"sub": "mailto:admin@memorymap.fr"
}

@ -1,3 +1,4 @@
from .FriendAddDTO import FriendAddDTO
from .user import UserDTO, UserRegisterDTO
from .user import UserDTO, UserRegisterDTO, UserAdminDTO
from .pin import PinDTO, PinShareDTO
from .token import RefreshTokenDTO

@ -10,7 +10,8 @@ class PinDTO(BaseModel):
location: list
complete_address: str
files: List[str] = Field(default_factory=list)
user_id: str = None
is_poi: bool = False
user_id: Optional[str] = None
date: Optional[datetime] = None
@field_validator('files')
@ -23,5 +24,12 @@ class PinDTO(BaseModel):
raise ValueError(f"Invalid image ID format: {file_id}")
return files
@field_validator('user_id')
@classmethod
def validate_user_id(cls, v, info):
if not info.data.get('is_poi') and not v:
raise ValueError('user_id is required when is_poi is False ' + str(info.data))
return v
class PinShareDTO(BaseModel):
friend_id: str

@ -0,0 +1,4 @@
from pydantic import BaseModel
class RefreshTokenDTO(BaseModel):
refresh_token: str

@ -7,3 +7,8 @@ class UserDTO(BaseModel):
class UserRegisterDTO(BaseModel):
username: str
password: str
class UserAdminDTO(BaseModel):
uid: str
username: str
is_admin: bool

@ -7,7 +7,8 @@ from app.routes.friends import friends_router
from app.routes.users import users_router
from app.routes.pins import pins_router
from app.routes.images import images_router
from app.routes.admin import admin_router
from app.routes.push import push_router
# FastAPI app instance
app = FastAPI(
@ -37,3 +38,5 @@ app.include_router(friends_router)
app.include_router(users_router)
app.include_router(pins_router)
app.include_router(images_router)
app.include_router(admin_router)
app.include_router(push_router)

@ -4,3 +4,4 @@ from .token_data import TokenData
from .token import Token
from .user import User
from .HTTPError import HTTPError
from .push import PushSubscription

@ -0,0 +1,16 @@
from pydantic import BaseModel
from typing import List
from datetime import datetime
class SystemConfig(BaseModel):
max_image_size: int # en octets
max_images_per_pin: int
max_images_per_user: int
allowed_image_types: List[str] # types MIME
max_pins_per_user: int
max_friends_per_user: int
class DBConfig(BaseModel):
config: SystemConfig
updated_at: datetime
updated_by: str # ID de l'utilisateur qui a fait la dernière modification

@ -1,13 +1,28 @@
from typing import Optional, List
from pydantic import BaseModel
from pydantic import BaseModel, Field, field_validator
from datetime import datetime
class Pin(BaseModel):
id: Optional[str]
title: str
description: str
location: list
complete_address: str
title: str = Field(..., min_length=3)
description: str = Field(..., min_length=3)
location: list = Field(..., min_items=2)
complete_address: str = Field(..., min_length=3)
files: Optional[List[str]] = [] # Liste des IDs d'images
user_id: str
is_poi: bool = False
user_id: Optional[str] = None
date: Optional[datetime] = None
@field_validator('location')
@classmethod
def validate_location(cls, v):
if not v or len(v) == 0:
raise ValueError('La location ne peut pas être vide')
return v
@field_validator('user_id')
@classmethod
def validate_user_id(cls, v, info):
if not info.data.get('is_poi') and not v:
raise ValueError('user_id est requis lorsque is_poi est False')
return v

@ -0,0 +1,5 @@
from pydantic import BaseModel
class PushSubscription(BaseModel):
endpoint: str
keys: dict

@ -1,6 +1,8 @@
from pydantic import BaseModel
class Token(BaseModel):
refresh_token: str
access_token: str
token_type: str
user_id: str
is_admin: bool

@ -1,7 +1,10 @@
from pydantic import BaseModel, Field
from typing import Optional
class User(BaseModel):
uid: str = Field(..., alias="_id")
username: str
password: str
is_admin: bool = False
push_subscriptions: Optional[list[str]] = []

@ -0,0 +1,156 @@
from pywebpush import webpush, WebPushException
import json
from typing import List, Dict, Any
from urllib.parse import urlparse
from bson import ObjectId
import pymongo
import app.config as config
# Database setup
client = pymongo.MongoClient(config.MONGODB_URL, username=config.MONGODB_USERNAME, password=config.MONGODB_PASSWORD)
db = client[config.MONGODB_DATABASE]
users_collection = db["users"]
def get_audience(endpoint: str) -> str:
parsed = urlparse(endpoint)
return f"{parsed.scheme}://{parsed.netloc}"
class PushService:
def __init__(self):
self.vapid_private_key = config.VAPID_PRIVATE_KEY
self.vapid_claims = config.VAPID_CLAIMS
async def send_notification(self, user_id: ObjectId, payload: Dict[str, Any]) -> bool:
"""
Envoie une notification push à un abonné spécifique.
Args:
user_id: L'ID de l'utilisateur à qui envoyer la notification
payload: Le contenu de la notification
Returns:
bool: True si l'envoi a réussi, False sinon
"""
try:
# Récupérer les abonnements push de l'utilisateur
user = users_collection.find_one({"_id": user_id}, {"push_subscriptions": 1})
if not user:
return False
subscriptions = user.get("push_subscriptions", [])
if not subscriptions:
return False
# Pour chaque abonnement, envoyer la notification
for subscription in subscriptions:
await self.send_notification_to_subscription(subscription, payload, user_id)
except json.JSONDecodeError as e:
return False
except Exception as e:
return False
async def send_notification_to_subscription(self, subscription: str, payload: Dict[str, Any], user_id: ObjectId) -> bool:
try:
subscription_dict = json.loads(subscription)
if subscription_dict and "endpoint" in subscription_dict:
vapid_claims = self.vapid_claims
vapid_claims["aud"] = get_audience(subscription_dict["endpoint"])
webpush(
subscription_info=subscription_dict,
data=json.dumps(payload),
vapid_private_key=self.vapid_private_key,
vapid_claims=vapid_claims,
ttl=2419200 # Temps durant lequel la notification est conservée sur le serveur si l'appareil est hors ligne, ici le max (4 semaines)
)
return True
else:
await self.delete_subscription(subscription, user_id)
return False
except WebPushException as e:
if int(e.response.status_code) in [404, 410]:
# Suppression de l'abonnement de la base de données
deleted = await self.delete_subscription(subscription, user_id)
if not deleted:
return False
return True
return False
except Exception as e:
return False
async def send_notification_to_all(self, subscriptions: List[Dict[str, Any]], payload: Dict[str, Any]) -> Dict[str, int]:
"""
Envoie une notification à plusieurs abonnés.
Args:
subscriptions: Liste des informations de souscription
payload: Le contenu de la notification
Returns:
Dict contenant le nombre de succès et d'échecs
"""
results = {
"success": 0,
"failed": 0
}
for subscription in subscriptions:
success = await self.send_notification(subscription, payload)
if success:
results["success"] += 1
else:
results["failed"] += 1
return results
def create_notification_payload(self, title: str, body: str) -> Dict[str, Any]:
"""
Crée un payload de notification standardisé.
Args:
title: Titre de la notification
body: Corps du message
icon: URL de l'icône (optionnel)
Returns:
Dict contenant le payload formaté
"""
payload = {
"notification": {
"title": title,
"body": body,
"icon": "assets/icon-128x128.png",
"image": "assets/icon-128x128.png",
"data": {
"onActionClick": {
"default": {"operation": "openWindow", "url": "/map"}
}
}
},
}
return payload
async def delete_subscription(self, subscription: str, user_id: ObjectId) -> bool:
"""
Supprime une souscription push de la base de données.
Args:
user_id: L'ID de l'utilisateur
subscription: La souscription à supprimer
Returns:
bool: True si la suppression a réussi, False sinon
"""
try:
users_collection.update_one({"_id": user_id}, {"$pull": {"push_subscriptions": subscription}})
return True
except Exception as e:
return False
# Instance singleton du service
push_service = PushService()

@ -0,0 +1,237 @@
from fastapi import APIRouter, HTTPException, status, Depends
from bson import ObjectId
import pymongo
from datetime import datetime, timedelta
import os
import app.config as config
from app.models import User, HTTPError
from app.models.config import SystemConfig, DBConfig
from app.routes.auth import users_collection
from app.routes.utils import get_admin_user
from app.dto import UserAdminDTO
# Database setup
client = pymongo.MongoClient(config.MONGODB_URL, username=config.MONGODB_USERNAME, password=config.MONGODB_PASSWORD)
db = client[config.MONGODB_DATABASE]
pins_collection = db["pins"]
images_collection = db["images"]
friends_collection = db["friends"]
pin_permissions_collection = db["pin_permissions"]
config_collection = db["config"]
admin_router = APIRouter(
prefix="/admin",
tags=["Admin"]
)
def serialize_mongo_doc(doc):
"""Convertit un document MongoDB en dictionnaire sérialisable"""
if doc is None:
return None
result = {}
for key, value in doc.items():
if isinstance(value, ObjectId):
result[key] = str(value)
elif isinstance(value, dict):
result[key] = serialize_mongo_doc(value)
elif isinstance(value, list):
result[key] = [serialize_mongo_doc(item) if isinstance(item, dict) else str(item) if isinstance(item, ObjectId) else item for item in value]
else:
result[key] = value
return result
def load_config():
"""Charge la configuration depuis la base de données ou utilise les valeurs par défaut"""
db_config = config_collection.find_one({"_id": "system_config"})
if db_config:
# Mettre à jour les variables de configuration
config.MAX_IMAGE_SIZE = db_config["config"]["max_image_size"]
config.MAX_IMAGES_PER_PIN = db_config["config"]["max_images_per_pin"]
config.MAX_IMAGES_PER_USER = db_config["config"]["max_images_per_user"]
config.ALLOWED_MIME_TYPES = db_config["config"]["allowed_image_types"]
config.MAX_PINS_PER_USER = db_config["config"]["max_pins_per_user"]
config.MAX_FRIENDS_PER_USER = db_config["config"]["max_friends_per_user"]
else:
# Créer la configuration par défaut dans la base de données
default_config = DBConfig(
config=SystemConfig(**config.DEFAULT_CONFIG),
updated_at=datetime.now(),
updated_by="system"
)
config_collection.insert_one({
"_id": "system_config",
**default_config.model_dump()
})
# Charger la configuration au démarrage
load_config()
@admin_router.get(
path="/stats",
responses={401: {"model": HTTPError}, 403: {"model": HTTPError}}
)
async def get_stats(admin_user: User = Depends(get_admin_user)):
# Statistiques générales
total_users = users_collection.count_documents({})
total_pins = pins_collection.count_documents({})
total_images = images_collection.count_documents({})
total_friends = friends_collection.count_documents({"status": "accepted"})
# Statistiques des 30 derniers jours
thirty_days_ago = datetime.now() - timedelta(days=30)
new_users = users_collection.count_documents({
"created_at": {"$gte": thirty_days_ago}
})
new_pins = pins_collection.count_documents({
"created_at": {"$gte": thirty_days_ago}
})
new_images = images_collection.count_documents({
"metadata.created_at": {"$gte": thirty_days_ago.isoformat()}
})
# Top utilisateurs
top_users = list(users_collection.aggregate([
{"$lookup": {
"from": "pins",
"let": { "user_id": { "$toString": "$_id" } },
"pipeline": [
{ "$match": { "$expr": { "$eq": ["$user_id", "$$user_id"] } } }
],
"as": "pins"
}},
{"$project": {
"username": 1,
"pin_count": {"$size": "$pins"}
}},
{"$sort": {"pin_count": -1}},
{"$limit": 5}
]))
# Top pins (les plus partagés)
top_pins = list(pin_permissions_collection.aggregate([
{"$group": {
"_id": "$pin_id",
"share_count": {"$sum": 1}
}},
{"$sort": {"share_count": -1}},
{"$limit": 5},
{"$lookup": {
"from": "pins",
"localField": "_id",
"foreignField": "_id",
"as": "pin_info"
}},
{"$unwind": "$pin_info"},
{"$project": {
"pin_id": "$_id",
"share_count": 1,
"title": "$pin_info.title"
}}
]))
# Statistiques de stockage
images_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../images'))
total_storage = 0
if os.path.isdir(images_dir):
for filename in os.listdir(images_dir):
file_path = os.path.join(images_dir, filename)
if os.path.isfile(file_path):
total_storage += os.path.getsize(file_path)
return {
"general": {
"total_users": total_users,
"total_pins": total_pins,
"total_images": total_images,
"total_friends": total_friends,
"total_storage_bytes": total_storage
},
"last_30_days": {
"new_users": new_users,
"new_pins": new_pins,
"new_images": new_images
},
"top_users": [serialize_mongo_doc(user) for user in top_users],
"top_shared_pins": [serialize_mongo_doc(pin) for pin in top_pins]
}
@admin_router.get(
path="/config",
response_model=SystemConfig,
responses={401: {"model": HTTPError}, 403: {"model": HTTPError}}
)
async def get_config(admin_user: User = Depends(get_admin_user)):
db_config = config_collection.find_one({"_id": "system_config"})
if not db_config:
return SystemConfig(**config.DEFAULT_CONFIG)
return SystemConfig(**db_config["config"])
@admin_router.patch(
path="/config",
response_model=SystemConfig,
responses={401: {"model": HTTPError}, 403: {"model": HTTPError}}
)
async def update_config(
new_config: SystemConfig,
admin_user: User = Depends(get_admin_user)
):
# Mettre à jour la configuration en base de données
db_config = DBConfig(
config=new_config,
updated_at=datetime.now(),
updated_by=admin_user.uid
)
config_collection.update_one(
{"_id": "system_config"},
{"$set": db_config.model_dump()},
upsert=True
)
# Mettre à jour les variables de configuration
config.MAX_IMAGE_SIZE = new_config.max_image_size
config.MAX_IMAGES_PER_PIN = new_config.max_images_per_pin
config.MAX_IMAGES_PER_USER = new_config.max_images_per_user
config.ALLOWED_MIME_TYPES = new_config.allowed_image_types
config.MAX_PINS_PER_USER = new_config.max_pins_per_user
config.MAX_FRIENDS_PER_USER = new_config.max_friends_per_user
return new_config
@admin_router.get(
path="/users",
responses={401: {"model": HTTPError}, 403: {"model": HTTPError}},
response_model=list[UserAdminDTO]
)
async def list_users(admin_user: User = Depends(get_admin_user)):
"""Liste tous les utilisateurs (sans le mot de passe) - Route admin uniquement"""
users = users_collection.find({}, {"password": 0}) # Exclure le mot de passe
users_list = []
for user in users:
user["uid"] = str(user["_id"])
user = UserAdminDTO(**user)
users_list.append(user)
return users_list
@admin_router.delete(
path="/user/{uid}",
responses={401: {"model": HTTPError}, 403: {"model": HTTPError}, 404: {"model": HTTPError}, 400: {"model": HTTPError}}
)
async def delete_user(uid: str, admin_user: User = Depends(get_admin_user)):
try:
ObjectId(uid)
except:
raise HTTPException(status_code=400, detail="UID invalide")
user = users_collection.find_one({"_id": ObjectId(uid)})
if not user:
raise HTTPException(status_code=404, detail="Utilisateur non trouvé")
users_collection.delete_one({"_id": ObjectId(uid)})
return {"message": "Utilisateur supprimé avec succès"}

@ -6,8 +6,8 @@ import pymongo
import app.config as config
from app.models import User, Token, HTTPError
from app.dto import UserRegisterDTO
from app.routes.utils import get_current_user, create_access_token
from app.dto import UserRegisterDTO, RefreshTokenDTO
from app.routes.utils import get_current_user, create_access_token, create_refresh_token, get_current_user_from_refresh_token
from app.utils import get_password_hash, verify_password
# Database setup
@ -20,6 +20,26 @@ auth_router = APIRouter(
tags=["Auth"]
)
def create_tokens(user: User):
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={
"sub": user["username"],
"is_admin": user["is_admin"]
},
expires_delta=access_token_expires
)
refresh_token_expires = timedelta(minutes=config.REFRESH_TOKEN_EXPIRE_MINUTES)
refresh_token = create_refresh_token(
data={
"sub": user["username"],
"is_admin": user["is_admin"]
},
expires_delta=refresh_token_expires
)
return access_token, refresh_token
@auth_router.post(
path="/register",
response_model=Token,
@ -34,12 +54,22 @@ async def register(user: UserRegisterDTO):
)
hashed_password = get_password_hash(user.password)
user_id = users_collection.insert_one({"username": user.username, "password": hashed_password})
user = {
"username": user.username,
"password": hashed_password,
"is_admin": False
}
user_id = users_collection.insert_one(user)
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires)
access_token, refresh_token = create_tokens(user)
return {"access_token": access_token, "token_type": "bearer", "user_id": str(user_id.inserted_id)}
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"user_id": str(user_id.inserted_id),
"is_admin": False
}
@auth_router.post(
path="/login",
@ -55,15 +85,29 @@ async def login(form_data: OAuth2PasswordRequestForm = Depends()):
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": form_data.username}, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer", "user_id": str(user["_id"])}
access_token, refresh_token = create_tokens(user)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"user_id": str(user["_id"]),
"is_admin": user.get("is_admin", False)
}
@auth_router.get(
path="/logout",
@auth_router.post(
path="/refresh-token",
response_model=Token,
responses={401: {"model": HTTPError}}
)
async def logout(current_user: User = Depends(get_current_user)):
return {"message": "Logged out"}
async def refresh(refresh_data: RefreshTokenDTO):
current_user = get_current_user_from_refresh_token(refresh_data.refresh_token)
new_access_token, new_refresh_token = create_tokens(current_user)
return {
"access_token": new_access_token,
"refresh_token": new_refresh_token,
"token_type": "bearer",
"user_id": str(current_user["_id"]),
"is_admin": current_user["is_admin"]
}

@ -5,9 +5,10 @@ from fastapi.params import Depends
import pymongo
from app.dto import FriendAddDTO
from app.models import HTTPError, User, Friend
from app.models import HTTPError, User, Friend, PushSubscription
from .utils import friend_not_found, get_current_user, objectid_misformatted
import app.config as config
from app.push_service import push_service
# Best workaround found for _id typed as ObjectId (creating Exception bcause JSON doesn't support custom types countrary to BSON, used by Mongo)
# also allows to create DTOs at the time, but not at it's best (project structure is chaotic FTM :s)
@ -60,8 +61,11 @@ async def add_friend(friend_to_add: FriendAddDTO, current_user: User = Depends(g
detail="Friend already exists"
)
friend_user_id = ObjectId(friend["friend_user_id"])
friend_user = users_collection.find_one({"_id": friend_user_id})
# Test if user exists
if not users_collection.find_one({"_id": ObjectId(friend["friend_user_id"])}):
if not friend_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
@ -70,6 +74,14 @@ async def add_friend(friend_to_add: FriendAddDTO, current_user: User = Depends(g
friend["user_id"] = current_user.uid
friend["status"] = "pending"
friend_id = friends_collection.insert_one(friend).inserted_id
# Send notification to friend
payload = push_service.create_notification_payload(
title="Nouvelle demande d'ami",
body=f"{current_user.username} vous a envoyé une demande d'ami."
)
await push_service.send_notification(friend_user_id, payload);
return {"id": str(friend_id)}
@friends_router.delete(

@ -131,6 +131,10 @@ async def add_image(
else:
os.remove(temp_path)
if not pin_id == 'null':
if(pins_collection.find_one({"_id": ObjectId(pin_id)}) is None):
raise HTTPException(status_code=404, detail="Pin not found")
# Créer l'entrée en base de données
image_doc = {
"pin_id": ObjectId(pin_id) if pin_id != 'null' else None,
@ -148,8 +152,8 @@ async def add_image(
return {"id": str(image_id)}
except Exception as e:
# Nettoyer en cas d'erreur
except (OSError, IOError) as e:
# Nettoyer en cas d'erreur liée au système de fichiers
if os.path.exists(temp_path):
os.remove(temp_path)
raise HTTPException(status_code=500, detail=str(e))
@ -213,7 +217,7 @@ async def update_caption(
)
async def get_caption(id: str, current_user: User = Depends(get_current_user)):
image = check_image_permissions(id, current_user)
return ImageCaptionDTO(caption=image.get("caption", ""))
return ImageCaptionDTO(caption=image.get("caption", " "))
@images_router.get(
path="/{id}/metadata",

@ -64,8 +64,13 @@ async def update_pin(id: str, pin: PinDTO, current_user: User = Depends(get_curr
existing_pin = pins_collection.find_one({"_id": ObjectId(id)})
check_pin_is_null(existing_pin)
# Vérifier si le pin est un POI, si oui, il ne être modifié que si l'utilisateur est administrateur
if existing_pin.get("is_poi", False):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="You don't have permission to edit this pin")
else:
# Vérifier si l'utilisateur a la permission de modifier le pin
if existing_pin["user_id"] != current_user.uid:
if existing_pin.get("user_id", None) != current_user.uid:
permission = pin_permissions_collection.find_one({
"pin_id": ObjectId(id),
"user_id": current_user.uid,
@ -74,8 +79,8 @@ async def update_pin(id: str, pin: PinDTO, current_user: User = Depends(get_curr
if not permission:
raise HTTPException(status_code=403, detail="You don't have permission to edit this pin")
# Ajouter l'ID de l'utilisateur au PinDTO
pin.user_id = current_user.uid
# Garder le même user_id
pin.user_id = existing_pin["user_id"]
# Mettre à jour le pin
pins_collection.update_one({"_id": ObjectId(id)}, {"$set": pin.model_dump()})
@ -97,7 +102,9 @@ async def update_pin(id: str, pin: PinDTO, current_user: User = Depends(get_curr
responses={401: {"model": HTTPError}}
)
async def add_pin(pin: PinDTO, current_user: User = Depends(get_current_user)):
if not pin.is_poi:
pin.user_id = current_user.uid
pin_id = pins_collection.insert_one(pin.model_dump()).inserted_id
# Mettre à jour les images avec le pin_id
@ -122,7 +129,8 @@ async def list_pins(current_user: User = Depends(get_current_user)):
pins = pins_collection.find({
"$or": [
{"user_id": current_user.uid}, # Pins de l'utilisateur
{"_id": {"$in": shared_pin_ids}} # Pins partagés avec l'utilisateur
{"_id": {"$in": shared_pin_ids}}, # Pins partagés avec l'utilisateur
{"is_poi": True}
]
})
@ -190,6 +198,14 @@ async def delete_pin(id: str, current_user: User = Depends(get_current_user)):
pin = pins_collection.find_one({"_id": ObjectId(id)})
check_pin_is_null(pin)
# Vérifier si le pin est un POI, si oui, il ne peut être supprimé que si l'utilisateur est administrateur
if pin["is_poi"]:
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="You don't have permission to delete this pin")
else:
pins_collection.delete_one({"_id": ObjectId(id)})
return {"message": "POI deleted successfully"}
# Si l'utilisateur est le propriétaire, supprimer le pin et toutes ses permissions
if pin["user_id"] == current_user.uid:
pins_collection.delete_one({"_id": ObjectId(id)})
@ -214,3 +230,76 @@ async def delete_pin(id: str, current_user: User = Depends(get_current_user)):
except bson.errors.InvalidId:
objectid_misformatted()
@pins_router.get(
path="/{id}/shares",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}, 422: {"model": HTTPError}, 403: {"model": HTTPError}}
)
async def get_pin_shares(id: str, current_user: User = Depends(get_current_user)):
try:
# Vérifier si le pin existe et appartient à l'utilisateur courant
pin = pins_collection.find_one({"_id": ObjectId(id)})
check_pin_is_null(pin)
if pin["user_id"] != current_user.uid:
raise HTTPException(status_code=403, detail="You can only view shares of your own pins")
# Récupérer toutes les permissions de partage pour ce pin
shares = pin_permissions_collection.find({"pin_id": ObjectId(id)})
# Transformer les résultats en liste de dictionnaires
shares_list = []
for share in shares:
# Récupérer les informations de l'utilisateur
user = db["users"].find_one({"_id": ObjectId(share["user_id"])})
shares_list.append({
"user_id": str(share["user_id"]),
"username": user["username"] if user else None,
"can_edit": share["can_edit"],
"can_delete": share["can_delete"]
})
return {"shares": shares_list}
except bson.errors.InvalidId:
objectid_misformatted()
@pins_router.delete(
path="/{pin_id}/share/{friend_id}",
responses={401: {"model": HTTPError}, 404: {"model": HTTPError}, 422: {"model": HTTPError}, 403: {"model": HTTPError}}
)
async def delete_pin_share(pin_id: str, friend_id: str, current_user: User = Depends(get_current_user)):
try:
# Vérifier si le pin existe
pin = pins_collection.find_one({"_id": ObjectId(pin_id)})
check_pin_is_null(pin)
# Vérifier si l'utilisateur est le propriétaire du pin
is_owner = pin["user_id"] == current_user.uid
# Vérifier si l'utilisateur est celui qui a le partage
is_shared_user = friend_id == current_user.uid
if not (is_owner or is_shared_user):
raise HTTPException(
status_code=403,
detail="You don't have permission to remove this share"
)
# Supprimer la permission de partage
result = pin_permissions_collection.delete_one({
"pin_id": ObjectId(pin_id),
"user_id": friend_id
})
if result.deleted_count == 0:
raise HTTPException(
status_code=404,
detail="Share not found"
)
return {"message": "Share removed successfully"}
except bson.errors.InvalidId:
objectid_misformatted()

@ -0,0 +1,50 @@
# Push notifications
from fastapi import APIRouter, Depends, HTTPException, status, Body
import pymongo
import app.config as config
from app.models import User, HTTPError, PushSubscription
from app.routes.utils import get_current_user
# Database setup
client = pymongo.MongoClient(config.MONGODB_URL, username=config.MONGODB_USERNAME, password=config.MONGODB_PASSWORD)
db = client[config.MONGODB_DATABASE]
users_collection = db["users"]
push_router = APIRouter(
tags=["Push"]
)
@push_router.post(
path="/push/subscribe",
responses={401: {"model": HTTPError}}
)
async def subscribe(subscription: PushSubscription, current_user: User = Depends(get_current_user)):
# Convert the subscription to a JSON string
subscription_str = subscription.model_dump_json()
# Check if the subscription is already in the database
if users_collection.find_one({"push_subscriptions": subscription_str}):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Subscription already exists"
)
# Check if already has a subscription
if current_user.push_subscriptions:
# Add another subscription
users_collection.update_one(
{"username": current_user.username},
{"$push": {"push_subscriptions": subscription_str}}
)
else:
# Add the subscription
users_collection.update_one(
{"username": current_user.username},
{"$set": {"push_subscriptions": [subscription_str]}}
)
return {
"message": "Push subscription successful"
}

@ -18,13 +18,13 @@ users_collection = db["users"]
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=config.TOKEN_URL)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
credentials_exception = HTTPException(
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
try:
payload = jwt.decode(token, OctKey.import_key(config.SECRET_KEY))
username: str = payload.claims["sub"]
@ -32,7 +32,7 @@ async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
if username is None or int(datetime.now().timestamp()) > expire_date:
raise credentials_exception
token_data = TokenData(username=username)
except JoseError:
except:
raise credentials_exception
user = users_collection.find_one({"username": token_data.username})
@ -41,10 +41,34 @@ async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
return user_serialize(user)
def get_current_user_from_refresh_token(refresh_token: str) -> User:
try:
payload = jwt.decode(refresh_token, OctKey.import_key(config.SECRET_KEY))
username: str = payload.claims["sub"]
expire_date = payload.claims["exp"]
if username is None or int(datetime.now().timestamp()) > expire_date:
raise credentials_exception
except:
raise credentials_exception
user = users_collection.find_one({"username": username})
if user is None:
raise credentials_exception
return user
async def get_admin_user(current_user: User = Depends(get_current_user)):
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have admin privileges"
)
return current_user
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.now() + expires_delta
expire = datetime.now().timestamp() + expires_delta.total_seconds()
to_encode.update({"exp": expire})
header = {"alg": config.ALGORITHM}
@ -52,6 +76,17 @@ def create_access_token(data: dict, expires_delta: timedelta):
return encoded_jwt
def create_refresh_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.now().timestamp() + expires_delta.total_seconds()
to_encode.update({"exp": expire, "type": "refresh"})
header = {"alg": config.ALGORITHM}
encoded_jwt = jwt.encode(header, to_encode, OctKey.import_key(config.SECRET_KEY))
return encoded_jwt
# Exceptions
def friend_not_found():
raise HTTPException(

@ -10,7 +10,8 @@ def pin_serialize(pin: list) -> Pin:
"complete_address": pin["complete_address"],
"files": pin["files"],
"user_id": pin["user_id"],
"date": pin.get("date")
"date": pin.get("date"),
"is_poi": pin.get("is_poi", False)
})
def pins_serialize(pins: list) -> list:

@ -15,5 +15,6 @@ def user_serialize(user) -> User:
return User(
_id=str(user['_id']),
username=user['username'],
password=user['password']
password=user['password'],
is_admin=user['is_admin']
)

@ -1,12 +1,13 @@
import pymongo
from datetime import datetime
from config import MONGODB_URL, MONGODB_USERNAME, MONGODB_PASSWORD, MONGODB_DATABASE
from utils import get_password_hash
from PIL import Image
import pymongo
import io
import os
import uuid
import hashlib
import json
client = pymongo.MongoClient(MONGODB_URL, username=MONGODB_USERNAME, password=MONGODB_PASSWORD)
db = client[MONGODB_DATABASE]
@ -16,6 +17,12 @@ IMAGES_DIR = "images"
if not os.path.exists(IMAGES_DIR):
os.makedirs(IMAGES_DIR)
def convert_mongo_date(date_obj):
"""Convertit un objet date MongoDB en datetime Python"""
if isinstance(date_obj, dict) and '$date' in date_obj:
return datetime.fromisoformat(date_obj['$date'].replace('Z', '+00:00'))
return date_obj
def create_test_image(color='red', size=(100, 100)):
img = Image.new('RGB', size, color=color)
img_byte_arr = io.BytesIO()
@ -47,150 +54,168 @@ def process_image(file_path: str) -> tuple[str, str]:
return file_hash, 'jpg'
def populate_data():
users_collection = db["users"]
def create_test_data(user_id):
"""Crée des données de test pour l'utilisateur spécifié"""
pins_collection = db["pins"]
images_collection = db["images"]
# Créer les utilisateurs
user1_id = users_collection.insert_one({
"username": "string",
"password": get_password_hash("string")
}).inserted_id
user2_id = users_collection.insert_one({
"username": "test",
"password": get_password_hash("test")
}).inserted_id
# Créer d'abord les pins
pin_a = pins_collection.insert_one({
"title": "Tour Eiffel",
"description": "Description A",
"location": [48.858296, 2.294526],
# Créer des pins de test
test_pins = [
{
"title": "Test Pin 1",
"description": "Description du test 1",
"complete_address": "123 Test Street, Test City",
"location": [48.8566, 2.3522],
"files": [],
"user_id": str(user1_id)
})
pin_b = pins_collection.insert_one({
"title": "Mont St Michel",
"description": "Description B",
"location": [48.636111, -1.511389],
"user_id": str(user_id),
"is_poi": False
},
{
"title": "Test Pin 2",
"description": "Description du test 2",
"complete_address": "456 Test Avenue, Test Town",
"location": [45.7640, 4.8357],
"files": [],
"user_id": str(user1_id)
})
pin_x = pins_collection.insert_one({
"title": "Eiffel Tower",
"description": "Description X",
"location": [48.858296, 2.294526],
"user_id": str(user_id),
"is_poi": False
},
{
"title": "Le Mont-Saint-Michel",
"description": "Le Mont-Saint-Michel est une commune française située dans la Manche en Normandie. Elle tire son nom de l'îlot rocheux consacré à saint Michel où sélève aujourdhui labbaye du Mont-Saint-Michel.",
"complete_address": "Mont Saint-Michel, Terrasse de l'Abside, Le Mont-Saint-Michel, Avranches, Manche, Normandie, France métropolitaine, 50170, France",
"location": [48.6359541,-1.5114600],
"files": [],
"user_id": str(user2_id)
})
"user_id": str(user_id),
"is_poi": True
},
]
pin_y = pins_collection.insert_one({
"title": "Mont Saint Michel",
"description": "Description Y",
"location": [48.636111, -1.511389],
"files": [],
"user_id": str(user2_id)
})
# Insérer les pins et créer les images associées
for pin_data in test_pins:
pin_id = pins_collection.insert_one(pin_data).inserted_id
# Créer les images de test
image_a = create_test_image(color='red')
image_b = create_test_image(color='blue')
image_x = create_test_image(color='green')
image_y = create_test_image(color='yellow')
# Créer une image de test
image = create_test_image(color='blue')
temp_path = os.path.join(IMAGES_DIR, f"temp_{pin_id}.jpg")
# Sauvegarder temporairement les images
temp_paths = []
for img, name in [(image_a, 'a'), (image_b, 'b'), (image_x, 'x'), (image_y, 'y')]:
temp_path = os.path.join(IMAGES_DIR, f"temp_{name}.jpg")
with open(temp_path, 'wb') as f:
f.write(img.getvalue())
temp_paths.append(temp_path)
f.write(image.getvalue())
# Traiter les images et obtenir leurs hashes
image_hashes = []
for temp_path in temp_paths:
# Traiter l'image
file_hash, extension = process_image(temp_path)
final_path = os.path.join(IMAGES_DIR, f"{file_hash}.{extension}")
# Si l'image n'existe pas déjà physiquement, la déplacer
if not os.path.exists(final_path):
os.rename(temp_path, final_path)
else:
os.remove(temp_path)
image_hashes.append(file_hash)
# Insérer les métadonnées des images dans la base de données avec leur pin_id
image_a_id = images_collection.insert_one({
"pin_id": pin_a.inserted_id,
"image_hash": image_hashes[0],
# Créer l'entrée dans la collection images
image_id = images_collection.insert_one({
"pin_id": pin_id,
"image_hash": file_hash,
"metadata": {
"created_at": datetime.now().isoformat(),
"original_filename": "test_a.jpg",
"original_filename": f"test_{pin_id}.jpg",
"mime_type": "image/jpeg",
"size": len(image_a.getvalue())
"size": len(image.getvalue())
},
"caption": "Tour Eiffel"
"caption": None
}).inserted_id
image_b_id = images_collection.insert_one({
"pin_id": pin_b.inserted_id,
"image_hash": image_hashes[1],
"metadata": {
"created_at": datetime.now().isoformat(),
"original_filename": "test_b.jpg",
"mime_type": "image/jpeg",
"size": len(image_b.getvalue())
},
"caption": "Mont St Michel"
}).inserted_id
# Mettre à jour le pin avec l'ID de l'image
pins_collection.update_one(
{"_id": pin_id},
{"$set": {"files": [str(image_id)]}}
)
image_x_id = images_collection.insert_one({
"pin_id": pin_x.inserted_id,
"image_hash": image_hashes[2],
"metadata": {
"created_at": datetime.now().isoformat(),
"original_filename": "test_x.jpg",
"mime_type": "image/jpeg",
"size": len(image_x.getvalue())
},
"caption": "Eiffel Tower"
}).inserted_id
def import_real_data(user_id):
"""Importe les données réelles depuis les fichiers JSON"""
pins_collection = db["pins"]
images_collection = db["images"]
image_y_id = images_collection.insert_one({
"pin_id": pin_y.inserted_id,
"image_hash": image_hashes[3],
"metadata": {
"created_at": datetime.now().isoformat(),
"original_filename": "test_y.jpg",
"mime_type": "image/jpeg",
"size": len(image_y.getvalue())
},
"caption": "Mont Saint Michel"
}).inserted_id
# Charger les données JSON
with open('memorymap.pins.json', 'r', encoding='utf-8') as f:
pins_data = json.load(f)
# Mettre à jour les pins avec les IDs des images
pins_collection.update_one(
{"_id": pin_a.inserted_id},
{"$set": {"files": [str(image_a_id)]}}
)
pins_collection.update_one(
{"_id": pin_b.inserted_id},
{"$set": {"files": [str(image_b_id)]}}
)
pins_collection.update_one(
{"_id": pin_x.inserted_id},
{"$set": {"files": [str(image_x_id)]}}
)
with open('memorymap.images.json', 'r', encoding='utf-8') as f:
images_data = json.load(f)
# Créer un mapping des anciens IDs vers les nouveaux
pin_id_mapping = {}
image_id_mapping = {}
# Insérer les pins
for pin in pins_data:
old_pin_id = pin['_id']['$oid']
# Créer une copie du pin sans le champ _id
new_pin = {k: v for k, v in pin.items() if k != '_id'}
new_pin['user_id'] = str(user_id)
new_pin['files'] = [] # Réinitialiser les fichiers
# Convertir la date si elle existe
if 'date' in new_pin:
new_pin['date'] = convert_mongo_date(new_pin['date'])
new_pin_id = pins_collection.insert_one(new_pin).inserted_id
pin_id_mapping[old_pin_id] = new_pin_id
# Insérer les images
for image in images_data:
old_image_id = image['_id']['$oid']
old_pin_id = image['pin_id']['$oid']
# Vérifier si le pin existe
if old_pin_id not in pin_id_mapping:
print(f"Attention: Pin {old_pin_id} référencé par l'image {old_image_id} n'existe pas dans les données")
continue
# Vérifier si l'image physique existe
image_path = os.path.join(IMAGES_DIR, f"{image['image_hash']}.jpg")
if not os.path.exists(image_path):
print(f"Attention: Image manquante pour le hash {image['image_hash']}")
continue
# Créer la nouvelle entrée d'image sans le champ _id
new_image = {k: v for k, v in image.items() if k not in ['_id', 'pin_id']}
new_image['pin_id'] = pin_id_mapping[old_pin_id]
# Convertir les dates dans les métadonnées
if 'metadata' in new_image and 'created_at' in new_image['metadata']:
new_image['metadata']['created_at'] = convert_mongo_date(new_image['metadata']['created_at'])
new_image_id = images_collection.insert_one(new_image).inserted_id
image_id_mapping[old_image_id] = new_image_id
# Mettre à jour le pin avec le nouvel ID d'image
pins_collection.update_one(
{"_id": pin_y.inserted_id},
{"$set": {"files": [str(image_y_id)]}}
{"_id": pin_id_mapping[old_pin_id]},
{"$push": {"files": str(new_image_id)}}
)
def populate_data():
users_collection = db["users"]
# Créer les utilisateurs
user1_id = users_collection.insert_one({
"username": "Alix",
"password": get_password_hash("alixalix"),
"is_admin": True
}).inserted_id
user2_id = users_collection.insert_one({
"username": "test",
"password": get_password_hash("testtest"),
"is_admin": False
}).inserted_id
# Créer les données de test pour l'utilisateur "test"
create_test_data(user2_id)
# Importer les données réelles pour l'utilisateur "Alix"
import_real_data(user1_id)
# Créer une relation d'amitié
friends_collection = db["friends"]
friends_collection.insert_one({
"user_id": str(user1_id),
@ -200,4 +225,4 @@ def populate_data():
if __name__ == "__main__":
populate_data()
print("Data inserted.")
print("Données insérées avec succès.")

Binary file not shown.

After

Width:  |  Height:  |  Size: 263 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 127 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 162 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 175 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 381 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 361 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 177 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 265 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 201 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 198 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 215 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 230 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 334 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 311 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 149 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 537 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 245 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 360 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 361 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 142 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 614 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 277 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 610 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 124 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 319 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 280 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 290 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 358 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 361 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 79 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 214 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 84 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 144 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 770 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 182 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 325 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 213 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 216 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 256 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 465 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 176 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 213 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 179 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 363 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 120 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 546 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 143 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 252 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 418 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 373 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 212 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 230 KiB

@ -0,0 +1,704 @@
[{
"_id": {
"$oid": "6841f38f80cc1ed56dac3f5f"
},
"pin_id": {
"$oid": "6841f38f80cc1ed56dac3f60"
},
"image_hash": "01b636ab8a23d741fed4a15181b7beecad103ef8e89e52eed6c7b6e5901bb2be",
"metadata": {
"created_at": "2025-06-05T21:44:15.783516",
"original_filename": "IMG_4501.jpeg",
"mime_type": "image/jpeg",
"size": 358243
},
"caption": null
},
{
"_id": {
"$oid": "6841f48d80cc1ed56dac3f68"
},
"pin_id": {
"$oid": "6841f48d80cc1ed56dac3f6a"
},
"image_hash": "1671dccceecc787377bccc566ecaa51f46a2dbace03419dfebcb969d4f9b2927",
"metadata": {
"created_at": "2025-06-05T21:48:29.590597",
"original_filename": "IMG_1692.jpeg",
"mime_type": "image/jpeg",
"size": 240574
},
"caption": null
},
{
"_id": {
"$oid": "6841f48d80cc1ed56dac3f69"
},
"pin_id": {
"$oid": "6841f48d80cc1ed56dac3f6a"
},
"image_hash": "fc5925a334b0dccd763574369fa65767ba762bf9e2543a700bc2d4d350d051e6",
"metadata": {
"created_at": "2025-06-05T21:48:29.616656",
"original_filename": "IMG_1688.jpeg",
"mime_type": "image/jpeg",
"size": 400886
},
"caption": null
},
{
"_id": {
"$oid": "6841f50a80cc1ed56dac3f6b"
},
"pin_id": {
"$oid": "6841f50a80cc1ed56dac3f6d"
},
"image_hash": "41e5299a241b70a71509369876470404866d1ba0035ac049d24dab08ed358809",
"metadata": {
"created_at": "2025-06-05T21:50:34.571495",
"original_filename": "IMG_1656.jpeg",
"mime_type": "image/jpeg",
"size": 431025
},
"caption": null
},
{
"_id": {
"$oid": "6841f50a80cc1ed56dac3f6c"
},
"pin_id": {
"$oid": "6841f50a80cc1ed56dac3f6d"
},
"image_hash": "ddf6b734a5a1753c98f5b5b6626136836c6129b9e12871c8c9ead79b0cbd889d",
"metadata": {
"created_at": "2025-06-05T21:50:34.595828",
"original_filename": "IMG_1680.jpeg",
"mime_type": "image/jpeg",
"size": 362255
},
"caption": null
},
{
"_id": {
"$oid": "6841f57d80cc1ed56dac3f6e"
},
"pin_id": {
"$oid": "6841f57d80cc1ed56dac3f73"
},
"image_hash": "7e54719218c2c6e49080975528e45d7cc9523b4fcdd2233e2f1c9df296e96e37",
"metadata": {
"created_at": "2025-06-05T21:52:29.154601",
"original_filename": "IMG_1586.jpeg",
"mime_type": "image/jpeg",
"size": 338908
},
"caption": null
},
{
"_id": {
"$oid": "6841f57d80cc1ed56dac3f6f"
},
"pin_id": {
"$oid": "6841f57d80cc1ed56dac3f73"
},
"image_hash": "db7bcb326efa37ac58ea047273379c4e304d2efc18abbcb423e3aa8ab7f5d474",
"metadata": {
"created_at": "2025-06-05T21:52:29.205985",
"original_filename": "IMG_1587.jpeg",
"mime_type": "image/jpeg",
"size": 413786
},
"caption": null
},
{
"_id": {
"$oid": "6841f57d80cc1ed56dac3f70"
},
"pin_id": {
"$oid": "6841f57d80cc1ed56dac3f73"
},
"image_hash": "57880118b31a7ee51c8b533de4fd40ea0679fa592244c7023110804837a5e7df",
"metadata": {
"created_at": "2025-06-05T21:52:29.249415",
"original_filename": "IMG_1589.jpeg",
"mime_type": "image/jpeg",
"size": 346654
},
"caption": null
},
{
"_id": {
"$oid": "6841f57d80cc1ed56dac3f71"
},
"pin_id": {
"$oid": "6841f57d80cc1ed56dac3f73"
},
"image_hash": "4ccb2c2df9945d2cdf6b40347d0102049153f87131f22e5775946488c7b5edd2",
"metadata": {
"created_at": "2025-06-05T21:52:29.301144",
"original_filename": "IMG_1577.jpeg",
"mime_type": "image/jpeg",
"size": 767053
},
"caption": null
},
{
"_id": {
"$oid": "6841f57d80cc1ed56dac3f72"
},
"pin_id": {
"$oid": "6841f57d80cc1ed56dac3f73"
},
"image_hash": "00672bc3e6a2964446675878ae75ffb8c02b450af6a03388df43d717d6cfe670",
"metadata": {
"created_at": "2025-06-05T21:52:29.349741",
"original_filename": "IMG_1592.jpeg",
"mime_type": "image/jpeg",
"size": 603707
},
"caption": null
},
{
"_id": {
"$oid": "6841f65c80cc1ed56dac3f74"
},
"pin_id": {
"$oid": "6841f65c80cc1ed56dac3f75"
},
"image_hash": "927a702f4bac3e255f13c50c26af14c450622156c0946e2e357b663e7ed153e4",
"metadata": {
"created_at": "2025-06-05T21:56:12.699721",
"original_filename": "IMG_0435.jpeg",
"mime_type": "image/jpeg",
"size": 526161
},
"caption": null
},
{
"_id": {
"$oid": "6841f7c880cc1ed56dac3f76"
},
"pin_id": {
"$oid": "6841f7c980cc1ed56dac3f7a"
},
"image_hash": "50a2de8ae288097e6973a268710764cfb930f4d7445d3dfebbc8f7eaf1200a8c",
"metadata": {
"created_at": "2025-06-05T22:02:16.767671",
"original_filename": "IMG_3749.jpeg",
"mime_type": "image/jpeg",
"size": 1631876
},
"caption": null
},
{
"_id": {
"$oid": "6841f7c880cc1ed56dac3f77"
},
"pin_id": {
"$oid": "6841f7c980cc1ed56dac3f7a"
},
"image_hash": "f298db82a45027f82b9853b49ec07477043f17f0ab6e793057e18caf7b8a805d",
"metadata": {
"created_at": "2025-06-05T22:02:16.948409",
"original_filename": "IMG_3783.jpeg",
"mime_type": "image/jpeg",
"size": 2370062
},
"caption": null
},
{
"_id": {
"$oid": "6841f7c980cc1ed56dac3f78"
},
"pin_id": {
"$oid": "6841f7c980cc1ed56dac3f7a"
},
"image_hash": "8c1e4bcc888c1b381d2d5db20cb4172703238cc06554df7c08ffce0682b59673",
"metadata": {
"created_at": "2025-06-05T22:02:17.131779",
"original_filename": "IMG_3787.jpeg",
"mime_type": "image/jpeg",
"size": 1954597
},
"caption": null
},
{
"_id": {
"$oid": "6841f7c980cc1ed56dac3f79"
},
"pin_id": {
"$oid": "6841f7c980cc1ed56dac3f7a"
},
"image_hash": "e423aaba1f53f81e915c376b91e611bf652ad14387d12aae597b9223b5bcf212",
"metadata": {
"created_at": "2025-06-05T22:02:17.384804",
"original_filename": "IMG_3735.jpeg",
"mime_type": "image/jpeg",
"size": 3405094
},
"caption": null
},
{
"_id": {
"$oid": "6841f8ce80cc1ed56dac3f7b"
},
"pin_id": {
"$oid": "6841f8cf80cc1ed56dac3f81"
},
"image_hash": "4c073626050d8374800e5ad29e568841280d09423f75a4c2776eca3b1a5cc5b2",
"metadata": {
"created_at": "2025-06-05T22:06:38.835156",
"original_filename": "20180729_103510.jpeg",
"mime_type": "image/jpeg",
"size": 347099
},
"caption": null
},
{
"_id": {
"$oid": "6841f8ce80cc1ed56dac3f7c"
},
"pin_id": {
"$oid": "6841f8cf80cc1ed56dac3f81"
},
"image_hash": "ea7d806ffb3ba1309a8b9f1f3fda19f33059efb54bad8318adb1bede9aa93f66",
"metadata": {
"created_at": "2025-06-05T22:06:38.873264",
"original_filename": "20180729_104211.jpeg",
"mime_type": "image/jpeg",
"size": 309158
},
"caption": null
},
{
"_id": {
"$oid": "6841f8ce80cc1ed56dac3f7d"
},
"pin_id": {
"$oid": "6841f8cf80cc1ed56dac3f81"
},
"image_hash": "bf4d6f24522498eb0eadc325c17a07e8814aae82583a593f7441c97306be7620",
"metadata": {
"created_at": "2025-06-05T22:06:38.903831",
"original_filename": "20180729_103127.jpeg",
"mime_type": "image/jpeg",
"size": 327925
},
"caption": null
},
{
"_id": {
"$oid": "6841f8ce80cc1ed56dac3f7e"
},
"pin_id": {
"$oid": "6841f8cf80cc1ed56dac3f81"
},
"image_hash": "e4650203b426e789242b163c2ba8c71f4d5d88f11571985947cbb52f1f19d74d",
"metadata": {
"created_at": "2025-06-05T22:06:38.940619",
"original_filename": "20180729_110153.jpeg",
"mime_type": "image/jpeg",
"size": 253176
},
"caption": null
},
{
"_id": {
"$oid": "6841f8ce80cc1ed56dac3f7f"
},
"pin_id": {
"$oid": "6841f8cf80cc1ed56dac3f81"
},
"image_hash": "39b8d859747f7844348a24a90335fa7b49207fd7dd4a3e98d09b63564f773bd1",
"metadata": {
"created_at": "2025-06-05T22:06:38.964282",
"original_filename": "20180729_103239.jpeg",
"mime_type": "image/jpeg",
"size": 262082
},
"caption": null
},
{
"_id": {
"$oid": "6841f8cf80cc1ed56dac3f80"
},
"pin_id": {
"$oid": "6841f8cf80cc1ed56dac3f81"
},
"image_hash": "a8b9ea07664a6b1f6abd53e51e792a69a73cc6728eec687d3bc65a1b4b23447d",
"metadata": {
"created_at": "2025-06-05T22:06:39.002779",
"original_filename": "20180729_104641.jpeg",
"mime_type": "image/jpeg",
"size": 393910
},
"caption": null
},
{
"_id": {
"$oid": "6841f9ed80cc1ed56dac3f84"
},
"pin_id": {
"$oid": "6841f9ed80cc1ed56dac3f85"
},
"image_hash": "cd310fd644683dc2fc55f8a18ddf32d236868b434e2bb65380a944551b7eae47",
"metadata": {
"created_at": "2025-06-05T22:11:25.214256",
"original_filename": "IMG_2939.jpeg",
"mime_type": "image/jpeg",
"size": 520669
},
"caption": null
},
{
"_id": {
"$oid": "6841fb7280cc1ed56dac3f86"
},
"pin_id": {
"$oid": "6841fb7380cc1ed56dac3f89"
},
"image_hash": "807c68131f13f61d769dc00b5ee644d3a7748f380b3d729d22dcb45e47765e0e",
"metadata": {
"created_at": "2025-06-05T22:17:54.634226",
"original_filename": "20160505_162036.jpg",
"mime_type": "image/jpeg",
"size": 2873133
},
"caption": null
},
{
"_id": {
"$oid": "6841fb7280cc1ed56dac3f87"
},
"pin_id": {
"$oid": "6841fb7380cc1ed56dac3f89"
},
"image_hash": "ea29b612fdd7d0a7d2ca51ffc256c647417f5456280559f8d11d984a6bf2c1dd",
"metadata": {
"created_at": "2025-06-05T22:17:54.949474",
"original_filename": "20160505_162040.jpg",
"mime_type": "image/jpeg",
"size": 2439764
},
"caption": null
},
{
"_id": {
"$oid": "6841fb7380cc1ed56dac3f88"
},
"pin_id": {
"$oid": "6841fb7380cc1ed56dac3f89"
},
"image_hash": "a691d6845bf681d6d1134a0da670f77d8d17e99dbe6a0d80dc97e86010fcae7d",
"metadata": {
"created_at": "2025-06-05T22:17:55.268115",
"original_filename": "20160505_162018.jpg",
"mime_type": "image/jpeg",
"size": 3794766
},
"caption": null
},
{
"_id": {
"$oid": "6841fc4580cc1ed56dac3f8a"
},
"pin_id": {
"$oid": "6841fc4580cc1ed56dac3f8b"
},
"image_hash": "88cabe22af1679d98959ab7985b3fadee2ca11cb4bd37a6123a5f0c98792a728",
"metadata": {
"created_at": "2025-06-05T22:21:25.299915",
"original_filename": "20160504_112522.jpg",
"mime_type": "image/jpeg",
"size": 3322923
},
"caption": null
},
{
"_id": {
"$oid": "6841fc8a80cc1ed56dac3f8c"
},
"pin_id": {
"$oid": "6841fc8b80cc1ed56dac3f8d"
},
"image_hash": "d7d4a9b7b5d791fca1dcbcbfb562144c50117bd30067bd4f7526a3e76bad3e24",
"metadata": {
"created_at": "2025-06-05T22:22:34.918756",
"original_filename": "20160504_114309.jpg",
"mime_type": "image/jpeg",
"size": 1740532
},
"caption": null
},
{
"_id": {
"$oid": "6841fd8080cc1ed56dac3f8e"
},
"pin_id": {
"$oid": "6841fd8080cc1ed56dac3f92"
},
"image_hash": "a5f875e95b5ab035f4598ef7805316f32abb05a493cbc6036d7d6691865ad843",
"metadata": {
"created_at": "2025-06-05T22:26:40.098674",
"original_filename": "20170726_181645.jpg",
"mime_type": "image/jpeg",
"size": 1640943
},
"caption": null
},
{
"_id": {
"$oid": "6841fd8080cc1ed56dac3f8f"
},
"pin_id": {
"$oid": "6841fd8080cc1ed56dac3f92"
},
"image_hash": "f5375009d09889976a1897068c7e34423545cc5a2fa91ab72e73625104e4f952",
"metadata": {
"created_at": "2025-06-05T22:26:40.430777",
"original_filename": "20170725_205312.jpg",
"mime_type": "image/jpeg",
"size": 2411086
},
"caption": null
},
{
"_id": {
"$oid": "6841fd8080cc1ed56dac3f90"
},
"pin_id": {
"$oid": "6841fd8080cc1ed56dac3f92"
},
"image_hash": "a63f829f20701b1a553c78c1f9a1e05740b489340b8b348bbb173a14e5e428c0",
"metadata": {
"created_at": "2025-06-05T22:26:40.549402",
"original_filename": "20170725_205209.jpg",
"mime_type": "image/jpeg",
"size": 2451063
},
"caption": null
},
{
"_id": {
"$oid": "6841fd8080cc1ed56dac3f91"
},
"pin_id": {
"$oid": "6841fd8080cc1ed56dac3f92"
},
"image_hash": "5b7af96f1f84764cf1a0c8b304f97628341bad58ed8ec867c042fdda091a6128",
"metadata": {
"created_at": "2025-06-05T22:26:40.845842",
"original_filename": "20170726_181637.jpg",
"mime_type": "image/jpeg",
"size": 4073946
},
"caption": null
},
{
"_id": {
"$oid": "6841fdf680cc1ed56dac3f93"
},
"pin_id": {
"$oid": "6841fdf680cc1ed56dac3f94"
},
"image_hash": "a29957bc271ecd29185f6355c7f3c5b29b748904f828fd7ff9c7b4014d9e5bb7",
"metadata": {
"created_at": "2025-06-05T22:28:38.447975",
"original_filename": "IMG_4188.jpeg",
"mime_type": "image/jpeg",
"size": 2339918
},
"caption": null
},
{
"_id": {
"$oid": "6841fe3080cc1ed56dac3f95"
},
"pin_id": {
"$oid": "6841fe3180cc1ed56dac3f9a"
},
"image_hash": "c3672d29497d396ff0dc765ace48c692ab82b39fb524dfe2775ea20027a85f9d",
"metadata": {
"created_at": "2025-06-05T22:29:36.829260",
"original_filename": "IMG_4248.jpeg",
"mime_type": "image/jpeg",
"size": 325521
},
"caption": null
},
{
"_id": {
"$oid": "6841fe3080cc1ed56dac3f96"
},
"pin_id": {
"$oid": "6841fe3180cc1ed56dac3f9a"
},
"image_hash": "46f334e0033e95c35fdbf6374784813a323742bd878bdafec320b9a8ca6c6e50",
"metadata": {
"created_at": "2025-06-05T22:29:36.874064",
"original_filename": "IMG_4212.jpeg",
"mime_type": "image/jpeg",
"size": 516483
},
"caption": null
},
{
"_id": {
"$oid": "6841fe3080cc1ed56dac3f97"
},
"pin_id": {
"$oid": "6841fe3180cc1ed56dac3f9a"
},
"image_hash": "5e9a70097040e5405631046e9597245cb62e0f60be9016099e15414388f8ab4f",
"metadata": {
"created_at": "2025-06-05T22:29:36.930792",
"original_filename": "IMG_4211.jpeg",
"mime_type": "image/jpeg",
"size": 622771
},
"caption": null
},
{
"_id": {
"$oid": "6841fe3080cc1ed56dac3f98"
},
"pin_id": {
"$oid": "6841fe3180cc1ed56dac3f9a"
},
"image_hash": "4244aad23e2e06191494367b03b53b38d5372de96e52e1a0a0f7df56c5eb5141",
"metadata": {
"created_at": "2025-06-05T22:29:36.976214",
"original_filename": "IMG_4253.jpeg",
"mime_type": "image/jpeg",
"size": 522181
},
"caption": null
},
{
"_id": {
"$oid": "6841fe3180cc1ed56dac3f99"
},
"pin_id": {
"$oid": "6841fe3180cc1ed56dac3f9a"
},
"image_hash": "eeac89acf75d0bc20a901aeaf65487e4d0c9930bc2369ce6c9c661a93bf36110",
"metadata": {
"created_at": "2025-06-05T22:29:37.025798",
"original_filename": "IMG_4214.jpeg",
"mime_type": "image/jpeg",
"size": 636483
},
"caption": null
},
{
"_id": {
"$oid": "6841feaa80cc1ed56dac3f9b"
},
"pin_id": {
"$oid": "6841feaa80cc1ed56dac3f9c"
},
"image_hash": "8145e62d9b41bc37f6c6ef205f98a240d19be9a9b91475dd58577ee6d30a2202",
"metadata": {
"created_at": "2025-06-05T22:31:38.152809",
"original_filename": "IMG_4203.jpeg",
"mime_type": "image/jpeg",
"size": 495004
},
"caption": null
},
{
"_id": {
"$oid": "6841ff3c80cc1ed56dac3f9d"
},
"pin_id": {
"$oid": "6841ff3c80cc1ed56dac3f9e"
},
"image_hash": "9f2a06c6c3bf044511d91f9051259de2180777e83f9e493e474fb4d90476f76d",
"metadata": {
"created_at": "2025-06-05T22:34:04.288268",
"original_filename": "IMG_3424.jpeg",
"mime_type": "image/jpeg",
"size": 679889
},
"caption": null
},
{
"_id": {
"$oid": "6841ffba80cc1ed56dac3f9f"
},
"pin_id": {
"$oid": "6841ffbb80cc1ed56dac3fa4"
},
"image_hash": "1cb9109dca115180704859e78789486d2d62646131b4835f3576839ce524c529",
"metadata": {
"created_at": "2025-06-05T22:36:10.881837",
"original_filename": "IMG_2865.jpeg",
"mime_type": "image/jpeg",
"size": 351937
},
"caption": null
},
{
"_id": {
"$oid": "6841ffba80cc1ed56dac3fa0"
},
"pin_id": {
"$oid": "6841ffbb80cc1ed56dac3fa4"
},
"image_hash": "48409673794a95c35eee6871f058dabcee8ddceb4d90423e7700289ccc3d5451",
"metadata": {
"created_at": "2025-06-05T22:36:10.933911",
"original_filename": "IMG_2872.jpeg",
"mime_type": "image/jpeg",
"size": 425206
},
"caption": null
},
{
"_id": {
"$oid": "6841ffba80cc1ed56dac3fa1"
},
"pin_id": {
"$oid": "6841ffbb80cc1ed56dac3fa4"
},
"image_hash": "df1d10303b997a40bbff64eb1cb060abf0721dd6803ba6e3e824486c3463eeb8",
"metadata": {
"created_at": "2025-06-05T22:36:10.979929",
"original_filename": "IMG_2869.jpeg",
"mime_type": "image/jpeg",
"size": 373956
},
"caption": null
},
{
"_id": {
"$oid": "6841ffbb80cc1ed56dac3fa2"
},
"pin_id": {
"$oid": "6841ffbb80cc1ed56dac3fa4"
},
"image_hash": "9f6369cc31b62fc6845368c0cebfc900186ca9e371dc7e27cffbfec0b5d78dc5",
"metadata": {
"created_at": "2025-06-05T22:36:11.010414",
"original_filename": "IMG_2828.jpeg",
"mime_type": "image/jpeg",
"size": 427094
},
"caption": null
},
{
"_id": {
"$oid": "6841ffbb80cc1ed56dac3fa3"
},
"pin_id": {
"$oid": "6841ffbb80cc1ed56dac3fa4"
},
"image_hash": "b0012c31496b140cb8a088d5a112e2f86cb8b7e2d912f870ff113cc2a2aba155",
"metadata": {
"created_at": "2025-06-05T22:36:11.041962",
"original_filename": "IMG_2825.jpeg",
"mime_type": "image/jpeg",
"size": 487837
},
"caption": null
}]

@ -0,0 +1,367 @@
[{
"_id": {
"$oid": "6841f38f80cc1ed56dac3f60"
},
"title": "Le PC",
"description": "Wow c'est multicolore",
"location": [
45.76651388888889,
3.1059083333333333
],
"complete_address": "33, Boulevard Paul Pochet-Lagaye, La Chaux, Clermont-Ferrand, Puy-de-Dôme, Auvergne-Rhône-Alpes, France métropolitaine, 63000, France",
"files": [
"6841f38f80cc1ed56dac3f5f"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2022-09-04T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841f48d80cc1ed56dac3f6a"
},
"title": "Au bord de Porto-Vecchio",
"description": "Un petit panorama parce que l'endroit est magnifique",
"location": [
41.66112777777778,
9.198280555555556
],
"complete_address": "Réservoir de l'Ospedale, D 368, L'Ospedale, Porto-Vecchio, Sartène, Corse-du-Sud, Corse, France métropolitaine, 20137, France",
"files": [
"6841f48d80cc1ed56dac3f68",
"6841f48d80cc1ed56dac3f69"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2024-04-12T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841f50a80cc1ed56dac3f6d"
},
"title": "Île Lavezzi",
"description": "Carte postale en vrai : bienvenue aux îles Lavezzi 🌊☀️",
"location": [
41.33654722222222,
9.258111111111111
],
"complete_address": "Cimetière de Furcone, Retour Porto Vecchio, Bonifacio / Bunifaziu, Sartène, Corse-du-Sud, Corse, France métropolitaine, 20169, France",
"files": [
"6841f50a80cc1ed56dac3f6b",
"6841f50a80cc1ed56dac3f6c"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2024-04-15T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841f57d80cc1ed56dac3f73"
},
"title": "Campomoro",
"description": "Un peu de nourriture quand même",
"location": [
41.62923055555556,
8.816047222222222
],
"complete_address": "Chiosello, Campomoro, Belvédère-Campomoro, Sartène, Corse-du-Sud, Corse, France métropolitaine, 20110, France",
"files": [
"6841f57d80cc1ed56dac3f71",
"6841f57d80cc1ed56dac3f6f",
"6841f57d80cc1ed56dac3f70",
"6841f57d80cc1ed56dac3f72",
"6841f57d80cc1ed56dac3f6e"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2024-04-17T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841f65c80cc1ed56dac3f75"
},
"title": "CarLuks",
"description": "LiPad sur roue qui sort de la concession, une accélération et hop, en panne 🤭",
"location": [
48.962386111111115,
1.8019194444444444
],
"complete_address": "Rue des Maraîchers, Mézières-sur-Seine, Mantes-la-Jolie, Yvelines, Île-de-France, France métropolitaine, 78970, France",
"files": [
"6841f65c80cc1ed56dac3f74"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2023-11-19T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841f7c980cc1ed56dac3f7a"
},
"title": "El Mouradi Palace",
"description": "Les vacances bien reposantes",
"location": [
35.91128611111111,
10.581647222222221
],
"complete_address": "El Mouradi Palm Marina, Rue Henry Dunant, Zitouna 2, El Kantaoui, Délégation Hammam Sousse, Gouvernorat Sousse, 4089, Tunisie",
"files": [
"6841f7c880cc1ed56dac3f77",
"6841f7c980cc1ed56dac3f78",
"6841f7c880cc1ed56dac3f76",
"6841f7c980cc1ed56dac3f79"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2022-08-09T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841f8cf80cc1ed56dac3f81"
},
"title": "Louxor",
"description": "Ben dit donc, sacrés cailloux !!",
"location": [
25.718888888888888,
32.657222222222224
],
"complete_address": "Kiosque de Taharqa, طريق كوبرى ابو على, الكرنك القديم, Louxor, 81693, Égypte",
"files": [
"6841f8ce80cc1ed56dac3f7c",
"6841f8ce80cc1ed56dac3f7b",
"6841f8ce80cc1ed56dac3f7d",
"6841f8ce80cc1ed56dac3f7f",
"6841f8cf80cc1ed56dac3f80",
"6841f8ce80cc1ed56dac3f7e"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2018-07-25T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841f9ed80cc1ed56dac3f85"
},
"title": "Fontaine de Trevi",
"description": "Un jeton, un vœu… et une promesse de revenir. ✨",
"location": [
41.90067777777778,
12.483297222222223
],
"complete_address": "Hotel Fontana, 96, Piazza di Trevi, Trevi, Municipio Roma I, Rome, Roma Capitale, Latium, 00187, Italie",
"files": [
"6841f9ed80cc1ed56dac3f84"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2020-08-09T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841fb7380cc1ed56dac3f89"
},
"title": "Sanctuaire dAsclépios",
"description": "La mémoire sefface, pierre après pierre. Mais le vent, lui, se souvient.",
"location": [
"37.5960027",
"23.0794219"
],
"complete_address": "Théâtre d'Épidaure, Κορίνθου - Επιδαύρου, Asklipieio, Municipal Unit of Asklipieio, Épidaure, Argolide, Péloponnèse Région, Péloponnèse, Grèce occidentale et Îles Ioniennes, 210 52, Grèce",
"files": [
"6841fb7280cc1ed56dac3f86",
"6841fb7280cc1ed56dac3f87",
"6841fb7380cc1ed56dac3f88"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2016-05-05T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841fc4580cc1ed56dac3f8b"
},
"title": "Odéon d'Hérode Atticus",
"description": "Impossible de ne pas sarrêter devant ce chef-dœuvre.",
"location": [
"37.9708191",
"23.7245465"
],
"complete_address": "Odéon d'Hérode Atticus, Περίπατος Ακρόπολης (Νότια κλιτύς), Asterokopío, Anafiótika, Πετράλωνα, 3rd District of Athens, Athènes, Municipality of Athens, Regional Unit of Central Athens, Attique, 117 42, Grèce",
"files": [
"6841fc4580cc1ed56dac3f8a"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2016-05-04T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841fc8b80cc1ed56dac3f8d"
},
"title": "Le Parthenon",
"description": "(enfin, ce quil en reste)",
"location": [
"37.9715033",
"23.7266177"
],
"complete_address": "Parthenon, Περίπατος Ακρόπολης (Νότια κλιτύς), Asterokopío, Anafiótika, Πετράλωνα, 3η Κοινότητα Αθηνών, Athènes, Δήμος Αθηναίων, Περιφερειακή Ενότητα Κεντρικού Τομέα Αθηνών, Attique, 117 42, Grèce",
"files": [
"6841fc8a80cc1ed56dac3f8c"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2016-05-04T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841fd8080cc1ed56dac3f92"
},
"title": "Antalya",
"description": "Cet hôtel était vraiment dingue 😜 ",
"location": [
"36.9279654",
"30.7276865"
],
"complete_address": "Antalya, Région méditerranéenne, Turquie",
"files": [
"6841fd8080cc1ed56dac3f91",
"6841fd8080cc1ed56dac3f8e",
"6841fd8080cc1ed56dac3f8f",
"6841fd8080cc1ed56dac3f90"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2017-07-25T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841fdf680cc1ed56dac3f94"
},
"title": "Les Grands Buffets",
"description": "Cest vraiment à faire une fois dans sa vie ",
"location": [
43.17416111111111,
2.9943416666666667
],
"complete_address": "Espace de Liberté du Grand Narbonne, 11, Giratoire de la Liberté, Pastouret, Narbonne, Aude, Occitanie, France métropolitaine, 11100, France",
"files": [
"6841fdf680cc1ed56dac3f93"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2025-04-30T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841fe3180cc1ed56dac3f9a"
},
"title": "Minerve",
"description": "Un des plus beaux villages de France, très très sympathique !",
"location": [
43.353405555555554,
2.7475972222222222
],
"complete_address": "Rue de Caire, Minerve, Béziers, Hérault, Occitanie, France métropolitaine, 34210, France",
"files": [
"6841fe3080cc1ed56dac3f96",
"6841fe3080cc1ed56dac3f97",
"6841fe3180cc1ed56dac3f99",
"6841fe3080cc1ed56dac3f95",
"6841fe3080cc1ed56dac3f98"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2025-05-02T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841feaa80cc1ed56dac3f9c"
},
"title": "Le Gouffre de lœil doux",
"description": "Un peu long avec la poussette, sous la pluie et avec linvasion de moustique mais ça valait le coup 😍",
"location": [
43.19188611111111,
3.1860138888888887
],
"complete_address": "Gouffre de l'Œil Doux, Résidence Plein Soleil, Saint-Pierre-la-Mer, Fleury, Narbonne, Aude, Occitanie, France métropolitaine, 11560, France",
"files": [
"6841feaa80cc1ed56dac3f9b"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2025-05-01T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841ff3c80cc1ed56dac3f9e"
},
"title": "Mère et fille",
"description": "La descendance est assurée !",
"location": [
"45.6140030",
"5.1580574"
],
"complete_address": "Villefontaine, La Tour-du-Pin, Isère, Auvergne-Rhône-Alpes, France métropolitaine, 38090, France",
"files": [
"6841ff3c80cc1ed56dac3f9d"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2025-03-01T00:00:00.000Z"
}
},
{
"_id": {
"$oid": "6841ffbb80cc1ed56dac3fa4"
},
"title": "Venise",
"description": "Une ville posée sur leau... et dans ma mémoire 💦📸",
"location": [
"45.4371908",
"12.3345898"
],
"complete_address": "Venise, Vénétie, 30121-30176, Italie",
"files": [
"6841ffba80cc1ed56dac3f9f",
"6841ffba80cc1ed56dac3fa0",
"6841ffba80cc1ed56dac3fa1",
"6841ffbb80cc1ed56dac3fa2",
"6841ffbb80cc1ed56dac3fa3"
],
"is_poi": false,
"user_id": "6841f1f936abfa071cbff4cd",
"date": {
"$date": "2020-08-01T00:00:00.000Z"
}
}]

@ -6,3 +6,4 @@ joserfc==1.0.1
python-multipart==0.0.9
pillow==11.2.1
python-magic==0.4.27
pywebpush==2.0.3

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.4 MiB

@ -9,3 +9,5 @@ python-magic==0.4.27
pytest==8.3.4
pytest-cov==6.0.0
pytest-order==1.3.0
pywebpush==2.0.3
pytest-asyncio==1.0.0

@ -0,0 +1,129 @@
from test_main import *
import pytest
from datetime import datetime
from bson import ObjectId
from app.routes.auth import users_collection
@pytest.mark.order(3)
def test_set_user_admin():
# Mettre à jour l'utilisateur testuser2 pour le rendre admin
users_collection.update_one(
{"username": "testuser2"},
{"$set": {"is_admin": True}}
)
# Vérifier que la mise à jour a bien été effectuée
user = users_collection.find_one({"username": "testuser2"})
assert user is not None
assert user["is_admin"] is True
def test_get_stats_unauthorized():
response = client.get("/admin/stats")
assert response.status_code == 401
def test_get_stats(token_second):
response = client.get("/admin/stats", headers={"Authorization": f"Bearer {token_second}"})
assert response.status_code == 200
data = response.json()
# Vérifier la structure des statistiques
assert "general" in data
assert "last_30_days" in data
assert "top_users" in data
assert "top_shared_pins" in data
# Vérifier les champs généraux
assert "total_users" in data["general"]
assert "total_pins" in data["general"]
assert "total_images" in data["general"]
assert "total_friends" in data["general"]
assert "total_storage_bytes" in data["general"]
# Vérifier les statistiques des 30 derniers jours
assert "new_users" in data["last_30_days"]
assert "new_pins" in data["last_30_days"]
assert "new_images" in data["last_30_days"]
def test_get_config_unauthorized():
response = client.get("/admin/config")
assert response.status_code == 401
def test_get_config(token_second):
response = client.get("/admin/config", headers={"Authorization": f"Bearer {token_second}"})
assert response.status_code == 200
data = response.json()
# Vérifier les champs de configuration
assert "max_image_size" in data
assert "max_images_per_pin" in data
assert "max_images_per_user" in data
assert "allowed_image_types" in data
assert "max_pins_per_user" in data
assert "max_friends_per_user" in data
def test_update_config_unauthorized():
new_config = {
"max_image_size": 5000000,
"max_images_per_pin": 5,
"max_images_per_user": 100,
"allowed_image_types": ["image/jpeg", "image/png"],
"max_pins_per_user": 50,
"max_friends_per_user": 200
}
response = client.patch("/admin/config", json=new_config)
assert response.status_code == 401
def test_update_config(token_second):
new_config = {
"max_image_size": 5000000,
"max_images_per_pin": 5,
"max_images_per_user": 100,
"allowed_image_types": ["image/jpeg", "image/png"],
"max_pins_per_user": 50,
"max_friends_per_user": 200
}
response = client.patch("/admin/config", json=new_config, headers={"Authorization": f"Bearer {token_second}"})
assert response.status_code == 200
data = response.json()
assert data == new_config
def test_list_users_unauthorized():
response = client.get("/admin/users")
assert response.status_code == 401
def test_list_users(token_second):
response = client.get("/admin/users", headers={"Authorization": f"Bearer {token_second}"})
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
# Vérifier que la liste contient au moins l'utilisateur testuser2
assert any(user["username"] == "testuser2" for user in data)
def test_delete_user_unauthorized():
response = client.delete("/admin/user/123")
assert response.status_code == 401
def test_delete_user_invalid_uid(token_second):
response = client.delete("/admin/user/invalid_uid", headers={"Authorization": f"Bearer {token_second}"})
assert response.status_code == 400
def test_delete_user_not_found(token_second):
# Utiliser un ID qui n'existe probablement pas
non_existent_id = "507f1f77bcf86cd799439011"
response = client.delete(f"/admin/user/{non_existent_id}", headers={"Authorization": f"Bearer {token_second}"})
assert response.status_code == 404
def test_delete_user(token_second):
# D'abord, créer un utilisateur à supprimer
response = client.post("/register", json={"username": "user_to_delete", "password": "password123"})
assert response.status_code == 200
user_id = response.json()["user_id"]
# Ensuite, supprimer cet utilisateur
response = client.delete(f"/admin/user/{user_id}", headers={"Authorization": f"Bearer {token_second}"})
assert response.status_code == 200
assert response.json()["message"] == "Utilisateur supprimé avec succès"
# Vérifier que l'utilisateur n'existe plus
response = client.get(f"/user/{user_id}", headers={"Authorization": f"Bearer {token_second}"})
assert response.status_code == 404

@ -1,9 +1,12 @@
from test_main import *
from PIL import Image
import io
import os
from bson import ObjectId
from app.config import UPLOAD_DIR
def create_test_image():
img = Image.new('RGB', (100, 100), color='red')
def create_test_image(size=(100, 100)):
img = Image.new('RGB', size, color='red')
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format='JPEG')
img_byte_arr.seek(0)
@ -54,6 +57,24 @@ def test_get_image(token):
assert response.status_code == 200
assert response.headers["content-type"] == "image/jpeg"
def test_get_image_without_permission(token):
image_id = add_test_image(token)
response = client.get(
f"/image/{image_id}",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 403
def test_get_image_with_wrong_pin_id(token):
image_id = add_test_image(token)
response = client.get(
f"/image/{image_id}",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 403
def test_get_image_wrong_format(token):
response = client.get(
"/image/randomIdThatDoesntExists",
@ -95,8 +116,45 @@ def test_update_caption(token):
)
assert response.status_code == 200
def test_get_caption_unauthorized(token, token_second):
pin_id = create_test_pin(token)
image_id = add_test_image(token, pin_id)
response = client.get(
f"/image/{image_id}/caption",
headers={"Authorization": f"Bearer {token_second}"}
)
assert response.status_code == 403
def test_get_metadata(token):
pin_id = create_test_pin(token)
image_id = add_test_image(token, pin_id)
response = client.get(
f"/image/{image_id}/metadata",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert "metadata" in data
assert "pin_id" in data
assert data["pin_id"] == pin_id
assert data["metadata"]["original_filename"] == "test.jpg"
assert data["metadata"]["mime_type"] == "image/jpeg"
def test_get_metadata_unauthorized(token, token_second):
pin_id = create_test_pin(token)
image_id = add_test_image(token, pin_id)
response = client.get(
f"/image/{image_id}/metadata",
headers={"Authorization": f"Bearer {token_second}"}
)
assert response.status_code == 403
def test_image_permissions(token, token_second):
image_id = add_test_image(token)
pin_id = create_test_pin(token)
image_id = add_test_image(token, pin_id)
response = client.get(
f"/image/{image_id}",
@ -116,26 +174,68 @@ def test_invalid_image_type(token):
)
assert response.status_code == 415
# K.O
#
# def test_image_too_large(token):
# # Créer une grande image
# img = Image.fromarray(np.random.randint(0, 256, (8000, 8000, 3), dtype=np.uint8), 'RGB')
# buf = io.BytesIO()
# img.save(buf, format='JPEG', quality=100)
# buf.seek(0)
# buf.name = "large.jpg"
# # Vérifier la taille de l'image
# image_size = len(buf.getvalue())
# print(f"Image size: {image_size} bytes ({image_size / (1024*1024):.2f} MB)")
# response = client.post(
# "/image/pin/null/add",
# files={"image": ("large.jpg", buf, "image/jpeg")},
# data={"exif_date": "2024-03-20T12:00:00", "caption": "Test caption"},
# headers={"Authorization": f"Bearer {token}"}
# )
# print(f"Response status: {response.status_code}")
# print(f"Response body: {response.text}")
# assert response.status_code == 413
def test_image_without_exif_date(token):
# Créer d'abord un pin pour l'image
pin_id = create_test_pin(token)
test_image = create_test_image()
response = client.post(
f"/image/pin/{pin_id}/add",
files={"image": ("test.jpg", test_image, "image/jpeg")},
data={"caption": "Test caption"},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
image_id = response.json()["id"]
# Vérifier que la date a été générée automatiquement
metadata_response = client.get(
f"/image/{image_id}/metadata",
headers={"Authorization": f"Bearer {token}"}
)
assert metadata_response.status_code == 200
data = metadata_response.json()
assert "created_at" in data["metadata"]
assert data["metadata"]["created_at"] is not None
def test_image_with_invalid_pin_id(token):
test_image = create_test_image()
response = client.post(
"/image/pin/123456789987654321abcdef/add",
files={"image": ("test.jpg", test_image, "image/jpeg")},
data={"exif_date": "2024-03-20T12:00:00", "caption": "Test caption"},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 404 # Le pin n'existe pas
def test_image_too_large(token):
with open("tests/image-too-large.jpg", "rb") as f:
buf = io.BytesIO(f.read())
buf.name = "large.jpg"
response = client.post(
"/image/pin/null/add",
files={"image": ("large.jpg", buf, "image/jpeg")},
data={"exif_date": "2024-03-20T12:00:00", "caption": "Test caption"},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 413
def test_get_image_file_not_found(token):
pin_id = create_test_pin(token)
image_id = add_test_image(token, pin_id)
image_doc = db["images"].find_one({"_id": ObjectId(image_id)})
image_hash = image_doc["image_hash"]
image_path = os.path.join(UPLOAD_DIR, f"{image_hash}.jpg")
# On supprime le fichier image physiquement
if os.path.exists(image_path):
os.remove(image_path)
# On tente de récupérer l'image
response = client.get(f"/image/{image_id}", headers={"Authorization": f"Bearer {token}"})
assert response.status_code == 404
assert response.json()["detail"] == "Image file not found"

@ -363,3 +363,152 @@ def test_share_pin_not_friend(token, user_id, token_second, user_id_second):
)
assert response.status_code == 403
assert "You can only share pins with your friends" in response.json()["detail"]
def test_get_pin_shares(token, user_id, token_second, user_id_second):
# Créer un pin
image_id = add_test_image(token)
pin_id = create_test_pin(token, user_id, image_id)
# S'assurer que les utilisateurs sont amis
ensure_friendship(user_id, user_id_second)
# Partager le pin
client.post(
f"/pin/{pin_id}/share",
json={"friend_id": user_id_second},
headers={"Authorization": f"Bearer {token}"}
)
# Vérifier les partages du pin
response = client.get(
f"/pin/{pin_id}/shares",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
data = response.json()
assert "shares" in data
assert len(data["shares"]) == 1
assert data["shares"][0]["user_id"] == user_id_second
assert data["shares"][0]["can_edit"] is True
assert data["shares"][0]["can_delete"] is False
def test_get_pin_shares_not_owner(token, user_id, token_second, user_id_second):
# Créer un pin avec le deuxième utilisateur
image_id = add_test_image(token_second)
pin_id = create_test_pin(token_second, user_id_second, image_id)
# Essayer de voir les partages (ne devrait pas fonctionner car on n'est pas le propriétaire)
response = client.get(
f"/pin/{pin_id}/shares",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 403
assert "You can only view shares of your own pins" in response.json()["detail"]
def test_delete_pin_share(token, user_id, token_second, user_id_second):
# Créer un pin
image_id = add_test_image(token)
pin_id = create_test_pin(token, user_id, image_id)
# S'assurer que les utilisateurs sont amis
ensure_friendship(user_id, user_id_second)
# Partager le pin
client.post(
f"/pin/{pin_id}/share",
json={"friend_id": user_id_second},
headers={"Authorization": f"Bearer {token}"}
)
# Supprimer le partage
response = client.delete(
f"/pin/{pin_id}/share/{user_id_second}",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
assert "Share removed successfully" in response.json()["message"]
# Vérifier que l'autre utilisateur n'a plus accès au pin
response = client.get(
f"/pin/{pin_id}",
headers={"Authorization": f"Bearer {token_second}"}
)
assert response.status_code == 403
def test_delete_pin_share_shareduser(token, user_id, token_second, user_id_second):
# Créer un pin avec le deuxième utilisateur
image_id = add_test_image(token_second)
pin_id = create_test_pin(token_second, user_id_second, image_id)
# S'assurer que les utilisateurs sont amis
ensure_friendship(user_id, user_id_second)
# Partager le pin avec le premier utilisateur
client.post(
f"/pin/{pin_id}/share",
json={"friend_id": user_id},
headers={"Authorization": f"Bearer {token_second}"}
)
# Essayer de supprimer le partage (devrait fonctionner car il essaie de se supprimer lui même)
response = client.delete(
f"/pin/{pin_id}/share/{user_id}",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
assert "Share removed successfully" in response.json()["message"]
def test_delete_pin_share_not_found(token, user_id):
# Créer un pin
image_id = add_test_image(token)
pin_id = create_test_pin(token, user_id, image_id)
# Essayer de supprimer un partage inexistant
response = client.delete(
f"/pin/{pin_id}/share/507f1f77bcf86cd799439011",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 404
assert "Share not found" in response.json()["detail"]
def test_delete_pin_share_invalid_id(token, user_id):
# Créer un pin
image_id = add_test_image(token)
pin_id = create_test_pin(token, user_id, image_id)
# Essayer de supprimer un partage avec un ID invalide
response = client.delete(
f"/pin/{pin_id}/share/invalid_id",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 404
def test_delete_pin_share_self(token, user_id, token_second, user_id_second):
# Créer un pin
image_id = add_test_image(token)
pin_id = create_test_pin(token, user_id, image_id)
# S'assurer que les utilisateurs sont amis
ensure_friendship(user_id, user_id_second)
# Partager le pin
client.post(
f"/pin/{pin_id}/share",
json={"friend_id": user_id_second},
headers={"Authorization": f"Bearer {token}"}
)
# L'utilisateur qui a le partage supprime son propre accès
response = client.delete(
f"/pin/{pin_id}/share/{user_id_second}",
headers={"Authorization": f"Bearer {token_second}"}
)
assert response.status_code == 200
assert "Share removed successfully" in response.json()["message"]
# Vérifier que l'utilisateur n'a plus accès au pin
response = client.get(
f"/pin/{pin_id}",
headers={"Authorization": f"Bearer {token_second}"}
)
assert response.status_code == 403

@ -0,0 +1,30 @@
import pytest
from fastapi.testclient import TestClient
from fastapi import FastAPI
from test_main import *
# Exemple de fausse souscription push
FAKE_SUB = {
"endpoint": "https://test",
"keys": {"p256dh": "key", "auth": "auth"}
}
def test_push_subscribe_first(token):
response = client.post("/push/subscribe", json=FAKE_SUB, headers={"Authorization": f"Bearer {token}"})
assert response.status_code == 200
assert response.json()["message"] == "Push subscription successful"
def test_push_subscribe_duplicate(token):
response = client.post("/push/subscribe", json=FAKE_SUB, headers={"Authorization": f"Bearer {token}"})
assert response.status_code == 400
assert "already exists" in response.json()["detail"]
def test_push_subscribe_second(token):
# On s'abonne avec un nouvel endpoint
sub = {
"endpoint": "https://test2",
"keys": {"p256dh": "key2", "auth": "auth2"}
}
response = client.post("/push/subscribe", json=sub, headers={"Authorization": f"Bearer {token}"})
assert response.status_code == 200
assert response.json()["message"] == "Push subscription successful"

@ -0,0 +1,125 @@
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).absolute().parent.parent))
import pytest
from unittest.mock import patch, MagicMock, AsyncMock
from bson import ObjectId
from app.push_service import PushService
# Fixture pour obtenir une instance du service à tester
@pytest.fixture
def push_service():
return PushService()
# Teste l'envoi de notification quand l'utilisateur existe et a des abonnements
@patch('app.push_service.users_collection')
@patch('app.push_service.PushService.send_notification_to_subscription', new_callable=AsyncMock)
@pytest.mark.asyncio
async def test_send_notification_success(mock_send, mock_users, push_service):
user_id = ObjectId()
# Simule un utilisateur avec une souscription push
mock_users.find_one.return_value = {"push_subscriptions": ["{\"endpoint\": \"https://test\"}"]}
mock_send.return_value = True
payload = {"msg": "test"}
result = await push_service.send_notification(user_id, payload)
assert result is None or result is True
mock_send.assert_awaited()
# Teste le cas où l'utilisateur n'existe pas
@patch('app.push_service.users_collection')
@pytest.mark.asyncio
async def test_send_notification_no_user(mock_users, push_service):
user_id = ObjectId()
mock_users.find_one.return_value = None # Aucun utilisateur trouvé
payload = {"msg": "test"}
result = await push_service.send_notification(user_id, payload)
assert result is False
# Teste le cas où l'utilisateur n'a pas de souscriptions push
@patch('app.push_service.users_collection')
@pytest.mark.asyncio
async def test_send_notification_no_subs(mock_users, push_service):
user_id = ObjectId()
mock_users.find_one.return_value = {"push_subscriptions": []} # Pas de souscriptions
payload = {"msg": "test"}
result = await push_service.send_notification(user_id, payload)
assert result is False
# Teste l'envoi d'une notification à une souscription valide
@patch('app.push_service.webpush')
@patch('app.push_service.get_audience', return_value='https://test')
@pytest.mark.asyncio
async def test_send_notification_to_subscription_success(mock_aud, mock_webpush, push_service):
subscription = '{"endpoint": "https://test"}'
payload = {"msg": "test"}
user_id = ObjectId()
result = await push_service.send_notification_to_subscription(subscription, payload, user_id)
assert result is True
mock_webpush.assert_called_once()
# Teste le cas où webpush lève une exception (échec d'envoi)
@patch('app.push_service.webpush', side_effect=Exception('fail'))
@pytest.mark.asyncio
async def test_send_notification_to_subscription_exception(mock_webpush, push_service):
subscription = '{"endpoint": "https://test"}'
payload = {"msg": "test"}
user_id = ObjectId()
result = await push_service.send_notification_to_subscription(subscription, payload, user_id)
assert result is False
# Teste l'envoi à plusieurs souscriptions (succès et échec)
@patch('app.push_service.PushService.send_notification', new_callable=AsyncMock)
@pytest.mark.asyncio
async def test_send_notification_to_all(mock_send, push_service):
mock_send.side_effect = [True, False] # Un succès, un échec
subscriptions = ["sub1", "sub2"]
payload = {"msg": "test"}
result = await push_service.send_notification_to_all(subscriptions, payload)
assert result["success"] == 1
assert result["failed"] == 1
# Teste la création du payload de notification
def test_create_notification_payload(push_service):
title = "Titre"
body = "Corps"
payload = push_service.create_notification_payload(title, body)
assert payload["notification"]["title"] == title
assert payload["notification"]["body"] == body
assert "icon" in payload["notification"]
# Teste la suppression d'une souscription (succès)
@patch('app.push_service.users_collection')
@pytest.mark.asyncio
async def test_delete_subscription_success(mock_users, push_service):
mock_users.update_one.return_value = MagicMock() # Simule la suppression
user_id = ObjectId()
subscription = "sub"
result = await push_service.delete_subscription(subscription, user_id)
assert result is True
# Teste la suppression d'une souscription quand une exception est levée (échec)
@patch('app.push_service.users_collection.update_one', side_effect=Exception('fail'))
@pytest.mark.asyncio
async def test_delete_subscription_exception(mock_update_one, push_service):
user_id = ObjectId()
subscription = "sub"
result = await push_service.delete_subscription(subscription, user_id)
assert result is False

@ -42,3 +42,49 @@ def test_search_users(token):
data = response.json()
assert isinstance(data, list)
assert data[0]["username"] == "testuser"
def test_search_users_unauthorized():
response = client.get("/users?name=testuser", headers={"Authorization": "Bearer invalid_token"})
assert response.status_code == 401
def test_search_users_false_regex(token):
response = client.get("/users?name=*", headers={"Authorization": f"Bearer {token}"})
assert response.status_code == 422
def test_get_user_by_id(token):
# D'abord, on récupère l'ID d'un utilisateur existant
search_response = client.get("/users?name=testuser", headers={"Authorization": f"Bearer {token}"})
user_id = search_response.json()[0]["uid"]
# Ensuite, on teste la récupération de cet utilisateur
response = client.get(f"/user/{user_id}", headers={"Authorization": f"Bearer {token}"})
assert response.status_code == 200
data = response.json()
assert data["username"] == "testuser"
def test_get_user_invalid_id(token):
response = client.get("/user/invalid_id", headers={"Authorization": f"Bearer {token}"})
assert response.status_code == 422
def test_get_user_not_found(token):
# On utilise un ID qui n'existe probablement pas
non_existent_id = "507f1f77bcf86cd799439011"
response = client.get(f"/user/{non_existent_id}", headers={"Authorization": f"Bearer {token}"})
assert response.status_code == 404
def test_refresh_token_success():
login_response = client.post("/login", data={"username": "testuser", "password": "testpassword"})
refresh_token = login_response.json()["refresh_token"]
response = client.post("/refresh-token", json={"refresh_token": refresh_token})
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
assert data["token_type"] == "bearer"
assert data["user_id"] is not None
assert data["is_admin"] is False
def test_refresh_token_invalid():
response = client.post("/refresh-token", json={"refresh_token": "invalidtoken"})
assert response.status_code == 401
Loading…
Cancel
Save