""" mongo_helper.py --------------- Operaciones MongoDB para la colección 'imagenes' y extensión de 'comparaciones'. Compatible con la estructura existente de FLUJOS_DATOS. Uso: mongo = MongoHelper() mongo.upsert_imagenes(lista_docs) mongo.insert_comparaciones(lista_comparaciones) docs = mongo.get_collection_sample("noticias", limit=100) """ import os from pymongo import MongoClient, UpdateOne from pymongo.errors import ConnectionFailure MONGO_URL = os.getenv("MONGO_URL", "mongodb://localhost:27017") DB_NAME = os.getenv("DB_NAME", "FLUJOS_DATOS") class MongoHelper: def __init__(self, mongo_url: str = MONGO_URL, db_name: str = DB_NAME): self.mongo_url = mongo_url self.db_name = db_name self._client = None self._db = None # ── Conexión ─────────────────────────────────────────────────────────────── def connect(self): if self._client is None: self._client = MongoClient(self.mongo_url, serverSelectionTimeoutMS=5000) self._client.admin.command("ping") self._db = self._client[self.db_name] print(f"[MongoDB] Conectado a {self.mongo_url} / {self.db_name}") return self._db def disconnect(self): if self._client: self._client.close() self._client = None self._db = None def is_available(self) -> bool: try: self.connect() return True except ConnectionFailure: return False # ── Colección IMAGENES ───────────────────────────────────────────────────── def upsert_imagenes(self, docs: list[dict]) -> dict: """ Inserta o actualiza documentos en la colección 'imagenes'. Usa 'archivo' como clave única (upsert por nombre de archivo). Returns: {'inserted': N, 'updated': N} """ db = self.connect() collection = db["imagenes"] collection.create_index("archivo", unique=True) ops = [ UpdateOne( {"archivo": doc["archivo"]}, {"$set": doc}, upsert=True ) for doc in docs if "error" not in doc ] if not ops: return {"inserted": 0, "updated": 0} result = collection.bulk_write(ops) stats = { "inserted": result.upserted_count, "updated": result.modified_count, } print(f"[MongoDB] imagenes → {stats}") return stats def get_imagenes(self, tema: str = None, limit: int = 500) -> list[dict]: """Recupera documentos de la colección 'imagenes'.""" db = self.connect() query = {"tema": {"$regex": tema, "$options": "i"}} if tema else {} return list(db["imagenes"].find(query, {"_id": 0}).limit(limit)) # ── Colección COMPARACIONES ──────────────────────────────────────────────── def insert_comparaciones(self, comparaciones: list[dict], replace_existing: bool = False) -> int: """ Inserta comparaciones imagen-texto en la colección 'comparaciones'. Evita duplicados por (noticia1, noticia2). Returns: número de documentos insertados """ db = self.connect() collection = db["comparaciones"] ops = [] for comp in comparaciones: filter_q = {"noticia1": comp["noticia1"], "noticia2": comp["noticia2"]} update_q = {"$set": comp} if replace_existing else {"$setOnInsert": comp} ops.append(UpdateOne(filter_q, update_q, upsert=True)) if not ops: return 0 result = collection.bulk_write(ops) inserted = result.upserted_count print(f"[MongoDB] comparaciones → {inserted} nuevas, {result.modified_count} actualizadas") return inserted # ── Leer colecciones existentes (para comparar) ──────────────────────────── def get_collection_sample( self, collection_name: str, tema: str = None, limit: int = 200, fields: list[str] = None, ) -> list[dict]: """ Lee una muestra de documentos de una colección existente. Compatible con noticias, wikipedia, torrents. """ db = self.connect() query = {} if tema: query["$or"] = [ {"tema": {"$regex": tema, "$options": "i"}}, {"subtema": {"$regex": tema, "$options": "i"}}, {"texto": {"$regex": tema, "$options": "i"}}, ] projection = {"_id": 0} if fields: for f in fields: projection[f] = 1 docs = list(db[collection_name].find(query, projection).limit(limit)) for doc in docs: if "source_type" not in doc: doc["source_type"] = collection_name return docs def get_all_text_docs(self, tema: str = None, limit_per_collection: int = 200) -> list[dict]: """ Recupera documentos de noticias + wikipedia + torrents combinados. Útil para comparar imágenes contra todo el corpus. """ all_docs = [] for col in ("noticias", "wikipedia", "torrents"): try: docs = self.get_collection_sample(col, tema=tema, limit=limit_per_collection) all_docs.extend(docs) print(f"[MongoDB] {col}: {len(docs)} docs cargados") except Exception as e: print(f"[MongoDB] WARNING: no se pudo leer '{col}': {e}") return all_docs # ── Info de la BD ────────────────────────────────────────────────────────── def collection_stats(self) -> dict: db = self.connect() stats = {} for col_name in db.list_collection_names(): stats[col_name] = db[col_name].count_documents({}) return stats