You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
api/app/stub.py

220 lines
7.5 KiB

from datetime import datetime
from config import MONGODB_URL, MONGODB_USERNAME, MONGODB_PASSWORD, MONGODB_DATABASE
from utils import get_password_hash
from PIL import Image
import pymongo
import io
import os
import hashlib
import json
client = pymongo.MongoClient(MONGODB_URL, username=MONGODB_USERNAME, password=MONGODB_PASSWORD)
db = client[MONGODB_DATABASE]
# Créer le dossier pour stocker les images s'il n'existe pas
IMAGES_DIR = "../images"
if not os.path.exists(IMAGES_DIR):
os.makedirs(IMAGES_DIR)
def convert_mongo_date(date_obj):
"""Convertit un objet date MongoDB en datetime Python"""
if isinstance(date_obj, dict) and '$date' in date_obj:
return datetime.fromisoformat(date_obj['$date'].replace('Z', '+00:00'))
return date_obj
def create_test_image(color='red', size=(100, 100)):
img = Image.new('RGB', size, color=color)
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format='JPEG')
img_byte_arr.seek(0)
return img_byte_arr
def process_image(file_path: str) -> tuple[str, str]:
"""Traite et optimise l'image"""
with Image.open(file_path) as img:
# Convertir en RGB si nécessaire
if img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info):
bg = Image.new('RGB', img.size, (255, 255, 255))
bg.paste(img, mask=img.split()[-1])
img = bg
elif img.mode != 'RGB':
img = img.convert('RGB')
# Redimensionner si nécessaire pour le Full HD (1920x1080)
if img.size[0] > 1920 or img.size[1] > 1080:
img.thumbnail((1920, 1080), Image.Resampling.LANCZOS)
# Sauvegarder avec compression
img.save(file_path, 'JPEG', quality=85, optimize=True)
# Calculer le hash du fichier optimisé
with open(file_path, 'rb') as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
return file_hash, 'jpg'
def create_test_data(user_id):
"""Crée des données de test pour l'utilisateur spécifié"""
pins_collection = db["pins"]
images_collection = db["images"]
# Créer des pins de test
test_pins = [
{
"title": "Test Pin 1",
"description": "Description du test 1",
"complete_address": "123 Test Street, Test City",
"location": [48.8566, 2.3522],
"files": [],
"user_id": str(user_id),
"is_poi": False
},
{
"title": "Test Pin 2",
"description": "Description du test 2",
"complete_address": "456 Test Avenue, Test Town",
"location": [45.7640, 4.8357],
"files": [],
"user_id": str(user_id),
"is_poi": False
}
]
# Insérer les pins et créer les images associées
for pin_data in test_pins:
pin_id = pins_collection.insert_one(pin_data).inserted_id
# Créer une image de test
image = create_test_image(color='blue')
temp_path = os.path.join(IMAGES_DIR, f"temp_{pin_id}.jpg")
with open(temp_path, 'wb') as f:
f.write(image.getvalue())
# Traiter l'image
file_hash, extension = process_image(temp_path)
final_path = os.path.join(IMAGES_DIR, f"{file_hash}.{extension}")
if not os.path.exists(final_path):
os.rename(temp_path, final_path)
else:
os.remove(temp_path)
# Créer l'entrée dans la collection images
image_id = images_collection.insert_one({
"pin_id": pin_id,
"image_hash": file_hash,
"metadata": {
"created_at": datetime.now().isoformat(),
"original_filename": f"test_{pin_id}.jpg",
"mime_type": "image/jpeg",
"size": len(image.getvalue())
},
"caption": None
}).inserted_id
# Mettre à jour le pin avec l'ID de l'image
pins_collection.update_one(
{"_id": pin_id},
{"$set": {"files": [str(image_id)]}}
)
def import_real_data(user_id):
"""Importe les données réelles depuis les fichiers JSON"""
pins_collection = db["pins"]
images_collection = db["images"]
# Charger les données JSON
with open('../memorymap.pins.json', 'r', encoding='utf-8') as f:
pins_data = json.load(f)
with open('../memorymap.images.json', 'r', encoding='utf-8') as f:
images_data = json.load(f)
# Créer un mapping des anciens IDs vers les nouveaux
pin_id_mapping = {}
image_id_mapping = {}
# Insérer les pins
for pin in pins_data:
old_pin_id = pin['_id']['$oid']
# Créer une copie du pin sans le champ _id
new_pin = {k: v for k, v in pin.items() if k != '_id'}
new_pin['user_id'] = str(user_id)
new_pin['files'] = [] # Réinitialiser les fichiers
# Convertir la date si elle existe
if 'date' in new_pin:
new_pin['date'] = convert_mongo_date(new_pin['date'])
new_pin_id = pins_collection.insert_one(new_pin).inserted_id
pin_id_mapping[old_pin_id] = new_pin_id
# Insérer les images
for image in images_data:
old_image_id = image['_id']['$oid']
old_pin_id = image['pin_id']['$oid']
# Vérifier si le pin existe
if old_pin_id not in pin_id_mapping:
print(f"Attention: Pin {old_pin_id} référencé par l'image {old_image_id} n'existe pas dans les données")
continue
# Vérifier si l'image physique existe
image_path = os.path.join(IMAGES_DIR, f"{image['image_hash']}.jpg")
if not os.path.exists(image_path):
print(f"Attention: Image manquante pour le hash {image['image_hash']}")
continue
# Créer la nouvelle entrée d'image sans le champ _id
new_image = {k: v for k, v in image.items() if k not in ['_id', 'pin_id']}
new_image['pin_id'] = pin_id_mapping[old_pin_id]
# Convertir les dates dans les métadonnées
if 'metadata' in new_image and 'created_at' in new_image['metadata']:
new_image['metadata']['created_at'] = convert_mongo_date(new_image['metadata']['created_at'])
new_image_id = images_collection.insert_one(new_image).inserted_id
image_id_mapping[old_image_id] = new_image_id
# Mettre à jour le pin avec le nouvel ID d'image
pins_collection.update_one(
{"_id": pin_id_mapping[old_pin_id]},
{"$push": {"files": str(new_image_id)}}
)
def populate_data():
users_collection = db["users"]
# Créer les utilisateurs
user1_id = users_collection.insert_one({
"username": "string",
"password": get_password_hash("string"),
"is_admin": True
}).inserted_id
user2_id = users_collection.insert_one({
"username": "test",
"password": get_password_hash("testtest"),
"is_admin": False
}).inserted_id
# Créer les données de test pour l'utilisateur "test"
create_test_data(user2_id)
# Importer les données réelles pour l'utilisateur "string"
import_real_data(user1_id)
# Créer une relation d'amitié
friends_collection = db["friends"]
friends_collection.insert_one({
"user_id": str(user1_id),
"friend_user_id": str(user2_id),
"status": "accepted"
})
if __name__ == "__main__":
populate_data()
print("Données insérées avec succès.")