""" pipeline_maestro.py ------------------- Orquesta las tres fases del pipeline FLUJOS en orden estricto: Fase 1 — SCRAPING Wikipedia + Noticias + Imágenes Wikipedia Fase 2 — ANÁLISIS Tokenización texto (BERT) + Análisis imágenes (Qwen3-VL) Fase 3 — COMPARACIÓN Solo documentos nuevos vs corpus existente Garantías: - Lockfile: solo una instancia a la vez - Estado en MongoDB (`pipeline_log`): cada fase registra inicio/fin/stats - Dedup: índices únicos en todas las colecciones (setup automático) - Resume: si el proceso muere a mitad, la siguiente ejecución retoma desde la última fase que no estaba "completado" Ejecutar manualmente: python pipeline_maestro.py python pipeline_maestro.py --solo-fase scraping python pipeline_maestro.py --solo-fase analisis python pipeline_maestro.py --solo-fase comparacion python pipeline_maestro.py --forzar # ignora si una fase ya corrió hoy """ import argparse import fcntl import logging import os import subprocess import sys import time from datetime import datetime, timedelta from pathlib import Path from pymongo import MongoClient, ASCENDING from pymongo.errors import DuplicateKeyError, ConnectionFailure # ── Rutas ────────────────────────────────────────────────────────────────────── BASE_DIR = Path(__file__).parent VENV_PYTHON = BASE_DIR / "myenv/bin/python3" LOCK_FILE = Path("/tmp/flujos_pipeline.lock") LOG_FILE = BASE_DIR / "pipeline_maestro.log" MONGO_URL = os.getenv("MONGO_URL", "mongodb://localhost:27017") DB_NAME = os.getenv("DB_NAME", "FLUJOS_DATOS") # Intervalo mínimo entre ejecuciones de cada fase (horas) # Evita re-ejecutar si el systemd timer se dispara accidentalmente dos veces MIN_HORAS_ENTRE_FASES = { "scraping": 20, "analisis": 20, "comparacion": 20, } # ── Logging ──────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler(sys.stdout), ], ) log = logging.getLogger("pipeline_maestro") # ── MongoDB ──────────────────────────────────────────────────────────────────── def get_db(): client = MongoClient(MONGO_URL, serverSelectionTimeoutMS=5000) client.admin.command("ping") return client[DB_NAME] def setup_indices(db): """Crea índices únicos para deduplicación en todas las colecciones.""" indices = { "wikipedia": [("archivo", ASCENDING)], "noticias": [("archivo", ASCENDING)], "imagenes": [("archivo", ASCENDING)], "imagenes_wiki":[("archivo", ASCENDING)], # comparaciones no tiene índice único — usa upsert por (noticia1, noticia2) } for col, keys in indices.items(): try: if len(keys) == 1: db[col].create_index(keys, unique=True, background=True) else: db[col].create_index(keys, unique=True, background=True) log.info(f" índice OK: {col} → {[k for k,_ in keys]}") except Exception as e: log.warning(f" índice {col}: {e}") # ── Estado de fases ──────────────────────────────────────────────────────────── def fase_estado(db, fase: str) -> dict | None: """Devuelve el último registro de la fase, o None si nunca corrió.""" return db["pipeline_log"].find_one({"fase": fase}, sort=[("inicio", -1)]) def fase_necesita_correr(db, fase: str, forzar: bool = False) -> bool: if forzar: return True ultimo = fase_estado(db, fase) if not ultimo: return True if ultimo.get("estado") != "completado": log.info(f" [{fase}] última ejecución no completó — reintentando") return True ultima_fin = ultimo.get("fin") if not ultima_fin: return True horas = MIN_HORAS_ENTRE_FASES.get(fase, 20) if datetime.utcnow() - ultima_fin < timedelta(hours=horas): log.info(f" [{fase}] completada hace menos de {horas}h — saltando") return False return True def log_fase_inicio(db, fase: str) -> str: doc = { "fase": fase, "estado": "en_progreso", "inicio": datetime.utcnow(), "fin": None, "stats": {}, } result = db["pipeline_log"].insert_one(doc) return str(result.inserted_id) def log_fase_fin(db, log_id: str, estado: str, stats: dict): from bson import ObjectId db["pipeline_log"].update_one( {"_id": ObjectId(log_id)}, {"$set": {"estado": estado, "fin": datetime.utcnow(), "stats": stats}}, ) def ultima_ejecucion_completada(db, fase: str) -> datetime | None: """Devuelve la fecha de fin de la última ejecución completada de la fase.""" doc = db["pipeline_log"].find_one( {"fase": fase, "estado": "completado"}, sort=[("fin", -1)] ) return doc["fin"] if doc else None # ── Lockfile ─────────────────────────────────────────────────────────────────── class PipelineLock: def __init__(self): self._f = None def __enter__(self): self._f = open(LOCK_FILE, "w") try: fcntl.flock(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: log.error("Ya hay una instancia del pipeline corriendo. Saliendo.") sys.exit(1) self._f.write(str(os.getpid())) self._f.flush() return self def __exit__(self, *args): fcntl.flock(self._f, fcntl.LOCK_UN) self._f.close() LOCK_FILE.unlink(missing_ok=True) # ── Ejecutor de scripts ──────────────────────────────────────────────────────── def run_script(script_path: Path, args: list[str] = None, cwd: Path = None) -> bool: """Ejecuta un script Python con el venv. Devuelve True si éxito.""" cmd = [str(VENV_PYTHON), str(script_path)] + (args or []) cwd = cwd or script_path.parent log.info(f" → {' '.join(cmd)}") result = subprocess.run(cmd, cwd=str(cwd)) if result.returncode != 0: log.error(f" Script falló con código {result.returncode}: {script_path.name}") return False return True # ── FASE 1: SCRAPING ─────────────────────────────────────────────────────────── def fase_scraping(db, forzar: bool = False): log.info("\n" + "="*60) log.info(" FASE 1 — SCRAPING") log.info("="*60) if not fase_necesita_correr(db, "scraping", forzar): return log_id = log_fase_inicio(db, "scraping") stats = {} ok = True try: # 1a. Wikipedia — scraper por temas log.info("\n[1a] Wikipedia scraper") antes_wiki = db["wikipedia"].count_documents({}) ok_wiki = run_script( BASE_DIR / "WIKIPEDIA/main.py", cwd=BASE_DIR / "WIKIPEDIA" ) despues_wiki = db["wikipedia"].count_documents({}) stats["wikipedia_nuevos"] = despues_wiki - antes_wiki log.info(f" Wikipedia: +{stats['wikipedia_nuevos']} docs nuevos ({despues_wiki} total)") if not ok_wiki: ok = False # 1b. Noticias — scraper web log.info("\n[1b] Noticias scraper") antes_not = db["noticias"].count_documents({}) ok_not = run_script( BASE_DIR / "NOTICIAS/main_noticias.py", cwd=BASE_DIR / "NOTICIAS" ) despues_not = db["noticias"].count_documents({}) stats["noticias_nuevos"] = despues_not - antes_not log.info(f" Noticias: +{stats['noticias_nuevos']} docs nuevos ({despues_not} total)") if not ok_not: ok = False # 1c. Imágenes Wikipedia — scraper de imágenes por temas de FLUJOS log.info("\n[1c] Wikipedia image scraper") imagenes_dir = BASE_DIR / "IMAGENES" antes_img = db["imagenes_wiki"].count_documents({}) ok_img = run_script( imagenes_dir / "wikipedia_image_scraper.py", args=["--flujos", "--max", "20", "--mongo"], cwd=imagenes_dir, ) despues_img = db["imagenes_wiki"].count_documents({}) stats["imagenes_wiki_nuevas"] = despues_img - antes_img log.info(f" Imágenes wiki: +{stats['imagenes_wiki_nuevas']} nuevas ({despues_img} total)") if not ok_img: ok = False except Exception as e: log.error(f" ERROR en fase scraping: {e}") ok = False estado = "completado" if ok else "error" log_fase_fin(db, log_id, estado, stats) log.info(f"\n Fase scraping: {estado} | {stats}") return ok # ── FASE 2: ANÁLISIS ─────────────────────────────────────────────────────────── def fase_analisis(db, forzar: bool = False): log.info("\n" + "="*60) log.info(" FASE 2 — ANÁLISIS Y TOKENIZACIÓN") log.info("="*60) if not fase_necesita_correr(db, "analisis", forzar): return log_id = log_fase_inicio(db, "analisis") stats = {} ok = True try: # 2a. Análisis de imágenes con Qwen3-VL (resume automático) log.info("\n[2a] Análisis imágenes Qwen3-VL (resume activado)") imagenes_dir = BASE_DIR / "IMAGENES" antes_img = db["imagenes"].count_documents({}) ok_img = run_script( imagenes_dir / "pipeline_imagenes.py", args=["--analizar", "--carpeta", str(imagenes_dir / "output/wiki_images"), "--mongo"], cwd=imagenes_dir, ) despues_img = db["imagenes"].count_documents({}) stats["imagenes_analizadas"] = despues_img - antes_img log.info(f" Imágenes analizadas: +{stats['imagenes_analizadas']}") if not ok_img: ok = False # 2b. Tokenización texto — pipeline_mongolo (noticias + wikipedia nuevos) # Solo corre si hay docs nuevos sin tokenizar log.info("\n[2b] Tokenización texto (pipeline_mongolo)") comp_dir = BASE_DIR / "COMPARACIONES" ok_tok = run_script( comp_dir / "pipeline_mongolo.py", cwd=comp_dir, ) if not ok_tok: ok = False stats["tokenizacion"] = "ok" if ok_tok else "error" except Exception as e: log.error(f" ERROR en fase análisis: {e}") ok = False estado = "completado" if ok else "error" log_fase_fin(db, log_id, estado, stats) log.info(f"\n Fase análisis: {estado} | {stats}") return ok # ── FASE 3: COMPARACIÓN ──────────────────────────────────────────────────────── def fase_comparacion(db, forzar: bool = False): log.info("\n" + "="*60) log.info(" FASE 3 — COMPARACIÓN") log.info("="*60) if not fase_necesita_correr(db, "comparacion", forzar): return log_id = log_fase_inicio(db, "comparacion") stats = {} ok = True try: comp_dir = BASE_DIR / "COMPARACIONES" antes = db["comparaciones"].count_documents({}) # Pasar la fecha de la última comparación para modo incremental ultima = ultima_ejecucion_completada(db, "comparacion") args = [] if ultima: args = ["--desde", ultima.strftime("%Y-%m-%d")] log.info(f" Modo incremental: solo docs desde {ultima.strftime('%Y-%m-%d')}") else: log.info(" Primera ejecución: comparando todo el corpus") ok_comp = run_script( comp_dir / "pipeline_completo.py", args=args, cwd=comp_dir, ) despues = db["comparaciones"].count_documents({}) stats["comparaciones_nuevas"] = despues - antes stats["total_comparaciones"] = despues log.info(f" Comparaciones: +{stats['comparaciones_nuevas']} nuevas ({despues} total)") if not ok_comp: ok = False except Exception as e: log.error(f" ERROR en fase comparación: {e}") ok = False estado = "completado" if ok else "error" log_fase_fin(db, log_id, estado, stats) log.info(f"\n Fase comparación: {estado} | {stats}") return ok # ── Main ─────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Pipeline maestro FLUJOS") parser.add_argument("--solo-fase", choices=["scraping", "analisis", "comparacion"], help="Ejecutar solo una fase específica") parser.add_argument("--forzar", action="store_true", help="Ignorar cooldown entre ejecuciones") args = parser.parse_args() inicio = datetime.utcnow() log.info(f"\n{'='*60}") log.info(f" FLUJOS PIPELINE MAESTRO — {inicio.strftime('%Y-%m-%d %H:%M UTC')}") log.info(f"{'='*60}") try: db = get_db() log.info(" MongoDB: conectado") except ConnectionFailure as e: log.error(f" MongoDB no disponible: {e}") sys.exit(1) # Setup índices dedup (idempotente) log.info("\n Configurando índices de deduplicación...") setup_indices(db) with PipelineLock(): if args.solo_fase == "scraping" or not args.solo_fase: fase_scraping(db, args.forzar) if args.solo_fase == "analisis" or not args.solo_fase: fase_analisis(db, args.forzar) if args.solo_fase == "comparacion" or not args.solo_fase: fase_comparacion(db, args.forzar) duracion = datetime.utcnow() - inicio log.info(f"\n Pipeline completado en {duracion}") # Mostrar resumen del estado actual de la BD log.info("\n Estado MongoDB:") for col in ["wikipedia", "noticias", "imagenes", "imagenes_wiki", "comparaciones"]: try: n = db[col].count_documents({}) log.info(f" {col:20s}: {n:,}") except Exception: pass if __name__ == "__main__": main()