FLUJOS/BACK_BACK/IMAGENES/mongo_helper.py
SITO b992e25f8f feat: pipeline de imágenes + demos de visualización 3D
- BACK_BACK/IMAGENES/: nueva pipeline para análisis de imágenes con VLM
  - image_analyzer.py: imagen → keywords via ollama
  - image_comparator.py: TF-IDF similitud keywords vs corpus
  - mongo_helper.py: CRUD colecciones imagenes y comparaciones
  - pipeline_pruebas.py: script end-to-end con CLI
  - requirements_imagenes.txt

- BACK_BACK/FLUJOS_APP_PRUEBAS.js: servidor Express ligero (port 3001)
  sin MongoDB para testear los demos de visualización

- VISUALIZACION/public/demos/: tres demos de 3d-force-graph
  - demo_text_nodes.html: nodos como texto (three-spritetext)
  - demo_img_nodes.html: nodos como imágenes (THREE.Sprite)
  - demo_mixed_nodes.html: cards HTML en 3D (CSS2DRenderer)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-28 22:28:34 +01:00

172 lines
6.3 KiB
Python

"""
mongo_helper.py
---------------
Operaciones MongoDB para la colección 'imagenes' y extensión de 'comparaciones'.
Compatible con la estructura existente de FLUJOS_DATOS.
Uso:
mongo = MongoHelper()
mongo.upsert_imagenes(lista_docs)
mongo.insert_comparaciones(lista_comparaciones)
docs = mongo.get_collection_sample("noticias", limit=100)
"""
import os
from pymongo import MongoClient, UpdateOne
from pymongo.errors import ConnectionFailure
MONGO_URL = os.getenv("MONGO_URL", "mongodb://localhost:27017")
DB_NAME = os.getenv("DB_NAME", "FLUJOS_DATOS")
class MongoHelper:
def __init__(self, mongo_url: str = MONGO_URL, db_name: str = DB_NAME):
self.mongo_url = mongo_url
self.db_name = db_name
self._client = None
self._db = None
# ── Conexión ───────────────────────────────────────────────────────────────
def connect(self):
if self._client is None:
self._client = MongoClient(self.mongo_url, serverSelectionTimeoutMS=5000)
self._client.admin.command("ping")
self._db = self._client[self.db_name]
print(f"[MongoDB] Conectado a {self.mongo_url} / {self.db_name}")
return self._db
def disconnect(self):
if self._client:
self._client.close()
self._client = None
self._db = None
def is_available(self) -> bool:
try:
self.connect()
return True
except ConnectionFailure:
return False
# ── Colección IMAGENES ─────────────────────────────────────────────────────
def upsert_imagenes(self, docs: list[dict]) -> dict:
"""
Inserta o actualiza documentos en la colección 'imagenes'.
Usa 'archivo' como clave única (upsert por nombre de archivo).
Returns: {'inserted': N, 'updated': N}
"""
db = self.connect()
collection = db["imagenes"]
collection.create_index("archivo", unique=True)
ops = [
UpdateOne(
{"archivo": doc["archivo"]},
{"$set": doc},
upsert=True
)
for doc in docs if "error" not in doc
]
if not ops:
return {"inserted": 0, "updated": 0}
result = collection.bulk_write(ops)
stats = {
"inserted": result.upserted_count,
"updated": result.modified_count,
}
print(f"[MongoDB] imagenes → {stats}")
return stats
def get_imagenes(self, tema: str = None, limit: int = 500) -> list[dict]:
"""Recupera documentos de la colección 'imagenes'."""
db = self.connect()
query = {"tema": {"$regex": tema, "$options": "i"}} if tema else {}
return list(db["imagenes"].find(query, {"_id": 0}).limit(limit))
# ── Colección COMPARACIONES ────────────────────────────────────────────────
def insert_comparaciones(self, comparaciones: list[dict], replace_existing: bool = False) -> int:
"""
Inserta comparaciones imagen-texto en la colección 'comparaciones'.
Evita duplicados por (noticia1, noticia2).
Returns: número de documentos insertados
"""
db = self.connect()
collection = db["comparaciones"]
ops = []
for comp in comparaciones:
filter_q = {"noticia1": comp["noticia1"], "noticia2": comp["noticia2"]}
update_q = {"$set": comp} if replace_existing else {"$setOnInsert": comp}
ops.append(UpdateOne(filter_q, update_q, upsert=True))
if not ops:
return 0
result = collection.bulk_write(ops)
inserted = result.upserted_count
print(f"[MongoDB] comparaciones → {inserted} nuevas, {result.modified_count} actualizadas")
return inserted
# ── Leer colecciones existentes (para comparar) ────────────────────────────
def get_collection_sample(
self,
collection_name: str,
tema: str = None,
limit: int = 200,
fields: list[str] = None,
) -> list[dict]:
"""
Lee una muestra de documentos de una colección existente.
Compatible con noticias, wikipedia, torrents.
"""
db = self.connect()
query = {}
if tema:
query["$or"] = [
{"tema": {"$regex": tema, "$options": "i"}},
{"subtema": {"$regex": tema, "$options": "i"}},
{"texto": {"$regex": tema, "$options": "i"}},
]
projection = {"_id": 0}
if fields:
for f in fields:
projection[f] = 1
docs = list(db[collection_name].find(query, projection).limit(limit))
for doc in docs:
if "source_type" not in doc:
doc["source_type"] = collection_name
return docs
def get_all_text_docs(self, tema: str = None, limit_per_collection: int = 200) -> list[dict]:
"""
Recupera documentos de noticias + wikipedia + torrents combinados.
Útil para comparar imágenes contra todo el corpus.
"""
all_docs = []
for col in ("noticias", "wikipedia", "torrents"):
try:
docs = self.get_collection_sample(col, tema=tema, limit=limit_per_collection)
all_docs.extend(docs)
print(f"[MongoDB] {col}: {len(docs)} docs cargados")
except Exception as e:
print(f"[MongoDB] WARNING: no se pudo leer '{col}': {e}")
return all_docs
# ── Info de la BD ──────────────────────────────────────────────────────────
def collection_stats(self) -> dict:
db = self.connect()
stats = {}
for col_name in db.list_collection_names():
stats[col_name] = db[col_name].count_documents({})
return stats