""" image_comparator.py ------------------- Compara keywords de imágenes con documentos de texto (noticias, wikipedia, torrents) usando similitud TF-IDF coseno. Produce documentos para la colección 'comparaciones' de MongoDB, con la misma estructura que los comparaciones texto-texto ya existentes: { noticia1, noticia2, porcentaje_similitud } Ampliado con campos opcionales: source1_type, source2_type (para saber qué se comparó). Uso: comp = ImageComparator() resultados = comp.compare_image_vs_collection(imagen_doc, lista_docs_texto) top = comp.top_n(resultados, n=10) """ from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import numpy as np # ── Clase principal ───────────────────────────────────────────────────────────── class ImageComparator: def __init__(self, threshold: float = 5.0): """ Args: threshold: porcentaje mínimo de similitud para incluir en resultados (0-100) """ self.threshold = threshold self.vectorizer = TfidfVectorizer( analyzer="word", ngram_range=(1, 2), min_df=1, strip_accents="unicode", lowercase=True, ) # ── Conversión de documentos a texto ────────────────────────────────────── @staticmethod def doc_to_text(doc: dict) -> str: """ Concatena los campos relevantes de un documento en un string para TF-IDF. Compatible con estructura de noticias/wikipedia/torrents/imagenes. """ parts = [] # keywords de imágenes (lista) — los más informativos, se repiten para darles peso if doc.get("keywords"): kws = doc["keywords"] if isinstance(doc["keywords"], list) else [] parts.extend(kws * 3) # peso extra a keywords # campos de texto estándar for field in ("tema", "subtema", "texto"): val = doc.get(field) if val and isinstance(val, str): parts.append(val) # entidades if doc.get("entidades"): parts.extend(doc["entidades"]) return " ".join(parts) # ── Comparación imagen vs lista de documentos ───────────────────────────── def compare_image_vs_collection( self, image_doc: dict, text_docs: list[dict], ) -> list[dict]: """ Compara una imagen contra una lista de documentos de texto. Returns: Lista de dicts ordenados por porcentaje_similitud desc, filtrados por threshold. """ if not text_docs: return [] all_docs = [image_doc] + text_docs texts = [self.doc_to_text(d) for d in all_docs] try: matrix = self.vectorizer.fit_transform(texts) except ValueError: return [] # Similitud de imagen (índice 0) contra todos los demás sims = cosine_similarity(matrix[0:1], matrix[1:]).flatten() comparaciones = [] for doc, sim in zip(text_docs, sims): pct = round(float(sim) * 100, 2) if pct < self.threshold: continue comparaciones.append({ # Campos compatibles con colección 'comparaciones' existente "noticia1": image_doc.get("archivo", "imagen"), "noticia2": doc.get("archivo", str(doc.get("_id", ""))), "porcentaje_similitud": pct, # Campos extendidos (opcionales — no rompen queries existentes) "source1_type": "imagen", "source2_type": doc.get("source_type", "texto"), "tema_imagen": image_doc.get("tema", ""), "tema_doc": doc.get("tema", ""), }) comparaciones.sort(key=lambda x: x["porcentaje_similitud"], reverse=True) return comparaciones # ── Comparación muchas imágenes vs colección ─────────────────────────────── def compare_batch( self, image_docs: list[dict], text_docs: list[dict], ) -> list[dict]: """ Compara múltiples imágenes contra una colección de documentos. Returns: Todos los pares con similitud >= threshold, sin duplicados. """ all_comparaciones = [] seen = set() for img_doc in image_docs: results = self.compare_image_vs_collection(img_doc, text_docs) for r in results: key = (r["noticia1"], r["noticia2"]) if key not in seen: seen.add(key) all_comparaciones.append(r) all_comparaciones.sort(key=lambda x: x["porcentaje_similitud"], reverse=True) return all_comparaciones # ── Helpers ──────────────────────────────────────────────────────────────── @staticmethod def top_n(comparaciones: list[dict], n: int = 20) -> list[dict]: return sorted(comparaciones, key=lambda x: x["porcentaje_similitud"], reverse=True)[:n] @staticmethod def stats(comparaciones: list[dict]) -> dict: if not comparaciones: return {"total": 0} sims = [c["porcentaje_similitud"] for c in comparaciones] return { "total": len(sims), "media": round(np.mean(sims), 2), "max": round(max(sims), 2), "min": round(min(sims), 2), "sobre_50": sum(1 for s in sims if s >= 50), "sobre_70": sum(1 for s in sims if s >= 70), }