FLUJOS/FLUJOS_DATOS/IMAGENES/pipeline_imagenes.py
CAPITANSITO 83f67b76b4 código completo FLUJOS — snapshot limpio sin datos scrapeados
Incluye: backend Node.js/Express, visualización 3D (Three.js/3d-force-graph),
scrapers Wikipedia/noticias/imágenes, analizador Qwen3-VL, pipeline maestro
con systemd timer, fixes de seguridad (NoSQL injection, XSS, ReDoS, port
binding) y documentación técnica completa en docs/.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 23:45:29 +02:00

134 lines
4.9 KiB
Python

"""
pipeline_imagenes.py
--------------------
Pipeline end-to-end:
1. Scraping de imágenes Wikipedia por temas de FLUJOS
2. Análisis con Qwen3-VL-8B (keywords + metadata)
3. Comparación con corpus texto (noticias/wikipedia/torrents)
4. Guardado en MongoDB
Ejecutar:
python pipeline_imagenes.py --scrape --analizar --mongo
python pipeline_imagenes.py --scrape --tema "cambio climático" --max 10
python pipeline_imagenes.py --analizar --carpeta ./output/wiki_images/
python pipeline_imagenes.py --solo-json # sin MongoDB
"""
import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
from image_analyzer import ImageAnalyzer
from image_comparator import ImageComparator
from mongo_helper import MongoHelper
from wikipedia_image_scraper import WikipediaImageScraper, TEMAS_FLUJOS
OUTPUT_DIR = Path(__file__).parent / "output"
IMAGES_DIR = OUTPUT_DIR / "wiki_images"
def fase_scraping(temas: list[str], max_per_tema: int, lang: str, usar_mongo: bool) -> list[dict]:
print("\n" + "="*60)
print(" FASE 1 — Scraping de imágenes Wikipedia")
print("="*60)
scraper = WikipediaImageScraper(output_dir=IMAGES_DIR, lang=lang)
if len(temas) == 1:
metadata = scraper.scrape_tema(temas[0], max_images=max_per_tema)
else:
metadata = scraper.scrape_multitema(temas, max_per_tema=max_per_tema)
if metadata:
json_path = scraper.save_metadata(metadata)
print(f"\n Total imágenes descargadas: {len(metadata)}")
if usar_mongo:
scraper.save_to_mongo(metadata)
return metadata
def fase_analisis(carpeta: str, usar_mongo: bool, threshold: float) -> tuple[list, list]:
print("\n" + "="*60)
print(" FASE 2 — Análisis con Qwen3-VL-8B")
print("="*60)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
OUTPUT_DIR.mkdir(exist_ok=True)
analyzer = ImageAnalyzer()
image_docs = analyzer.analyze_folder(carpeta)
if not image_docs:
print(" No se analizaron imágenes.")
return [], []
json_imagenes = OUTPUT_DIR / f"imagenes_{timestamp}.json"
ImageAnalyzer.save_json(image_docs, str(json_imagenes))
# Cargar corpus texto para comparar
print("\n Cargando corpus de texto para comparar...")
mongo = MongoHelper()
if usar_mongo and mongo.is_available():
text_docs = mongo.get_all_text_docs(limit_per_collection=300)
else:
print(" MongoDB no disponible — comparación omitida")
text_docs = []
comparaciones = []
if text_docs:
comparador = ImageComparator(threshold=threshold)
valid = [d for d in image_docs if "error" not in d]
comparaciones = comparador.compare_batch(valid, text_docs)
stats = comparador.stats(comparaciones)
print(f" Similitud media: {stats.get('media', 0)}% | max: {stats.get('max', 0)}%")
json_comp = OUTPUT_DIR / f"comparaciones_{timestamp}.json"
with open(json_comp, "w", encoding="utf-8") as f:
json.dump(comparaciones, f, ensure_ascii=False, indent=2)
print(f" Guardado: {json_comp}")
if usar_mongo and mongo.is_available():
mongo.upsert_imagenes([d for d in image_docs if "error" not in d])
mongo.insert_comparaciones(comparaciones)
mongo.disconnect()
return image_docs, comparaciones
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipeline imágenes FLUJOS")
parser.add_argument("--scrape", action="store_true", help="Ejecutar fase de scraping")
parser.add_argument("--analizar", action="store_true", help="Ejecutar fase de análisis VLM")
parser.add_argument("--tema", default=None, help="Tema único (ej: 'cambio climático')")
parser.add_argument("--max", type=int, default=20, help="Máx imágenes por tema (default: 20)")
parser.add_argument("--lang", default="es", help="Idioma Wikipedia: es|en")
parser.add_argument("--carpeta", default=str(IMAGES_DIR), help="Carpeta para analizar")
parser.add_argument("--umbral", type=float, default=5.0, help="Umbral similitud (default: 5.0)")
parser.add_argument("--mongo", action="store_true", help="Guardar en MongoDB")
parser.add_argument("--solo-json", action="store_true", help="Solo JSON local, sin MongoDB")
args = parser.parse_args()
usar_mongo = args.mongo and not args.solo_json
if not args.scrape and not args.analizar:
parser.print_help()
sys.exit(0)
temas = [args.tema] if args.tema else TEMAS_FLUJOS
if args.scrape:
fase_scraping(temas, args.max, args.lang, usar_mongo)
if args.analizar:
carpeta = args.carpeta
if args.scrape and args.tema:
tema_slug = args.tema.lower().replace(" ", "_").replace("/", "-")[:40]
carpeta = str(IMAGES_DIR / tema_slug)
fase_analisis(carpeta, usar_mongo, args.umbral)
print("\n Pipeline completado.")