FLUJOS/FLUJOS_DATOS/pipeline_maestro.py
CAPITANSITO 83f67b76b4 código completo FLUJOS — snapshot limpio sin datos scrapeados
Incluye: backend Node.js/Express, visualización 3D (Three.js/3d-force-graph),
scrapers Wikipedia/noticias/imágenes, analizador Qwen3-VL, pipeline maestro
con systemd timer, fixes de seguridad (NoSQL injection, XSS, ReDoS, port
binding) y documentación técnica completa en docs/.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 23:45:29 +02:00

408 lines
15 KiB
Python

"""
pipeline_maestro.py
-------------------
Orquesta las tres fases del pipeline FLUJOS en orden estricto:
Fase 1 — SCRAPING Wikipedia + Noticias + Imágenes Wikipedia
Fase 2 — ANÁLISIS Tokenización texto (BERT) + Análisis imágenes (Qwen3-VL)
Fase 3 — COMPARACIÓN Solo documentos nuevos vs corpus existente
Garantías:
- Lockfile: solo una instancia a la vez
- Estado en MongoDB (`pipeline_log`): cada fase registra inicio/fin/stats
- Dedup: índices únicos en todas las colecciones (setup automático)
- Resume: si el proceso muere a mitad, la siguiente ejecución retoma
desde la última fase que no estaba "completado"
Ejecutar manualmente:
python pipeline_maestro.py
python pipeline_maestro.py --solo-fase scraping
python pipeline_maestro.py --solo-fase analisis
python pipeline_maestro.py --solo-fase comparacion
python pipeline_maestro.py --forzar # ignora si una fase ya corrió hoy
"""
import argparse
import fcntl
import logging
import os
import subprocess
import sys
import time
from datetime import datetime, timedelta
from pathlib import Path
from pymongo import MongoClient, ASCENDING
from pymongo.errors import DuplicateKeyError, ConnectionFailure
# ── Rutas ──────────────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).parent
VENV_PYTHON = BASE_DIR / "myenv/bin/python3"
LOCK_FILE = Path("/tmp/flujos_pipeline.lock")
LOG_FILE = BASE_DIR / "pipeline_maestro.log"
MONGO_URL = os.getenv("MONGO_URL", "mongodb://localhost:27017")
DB_NAME = os.getenv("DB_NAME", "FLUJOS_DATOS")
# Intervalo mínimo entre ejecuciones de cada fase (horas)
# Evita re-ejecutar si el systemd timer se dispara accidentalmente dos veces
MIN_HORAS_ENTRE_FASES = {
"scraping": 20,
"analisis": 20,
"comparacion": 20,
}
# ── Logging ────────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(sys.stdout),
],
)
log = logging.getLogger("pipeline_maestro")
# ── MongoDB ────────────────────────────────────────────────────────────────────
def get_db():
client = MongoClient(MONGO_URL, serverSelectionTimeoutMS=5000)
client.admin.command("ping")
return client[DB_NAME]
def setup_indices(db):
"""Crea índices únicos para deduplicación en todas las colecciones."""
indices = {
"wikipedia": [("archivo", ASCENDING)],
"noticias": [("archivo", ASCENDING)],
"imagenes": [("archivo", ASCENDING)],
"imagenes_wiki":[("archivo", ASCENDING)],
# comparaciones no tiene índice único — usa upsert por (noticia1, noticia2)
}
for col, keys in indices.items():
try:
if len(keys) == 1:
db[col].create_index(keys, unique=True, background=True)
else:
db[col].create_index(keys, unique=True, background=True)
log.info(f" índice OK: {col}{[k for k,_ in keys]}")
except Exception as e:
log.warning(f" índice {col}: {e}")
# ── Estado de fases ────────────────────────────────────────────────────────────
def fase_estado(db, fase: str) -> dict | None:
"""Devuelve el último registro de la fase, o None si nunca corrió."""
return db["pipeline_log"].find_one({"fase": fase}, sort=[("inicio", -1)])
def fase_necesita_correr(db, fase: str, forzar: bool = False) -> bool:
if forzar:
return True
ultimo = fase_estado(db, fase)
if not ultimo:
return True
if ultimo.get("estado") != "completado":
log.info(f" [{fase}] última ejecución no completó — reintentando")
return True
ultima_fin = ultimo.get("fin")
if not ultima_fin:
return True
horas = MIN_HORAS_ENTRE_FASES.get(fase, 20)
if datetime.utcnow() - ultima_fin < timedelta(hours=horas):
log.info(f" [{fase}] completada hace menos de {horas}h — saltando")
return False
return True
def log_fase_inicio(db, fase: str) -> str:
doc = {
"fase": fase,
"estado": "en_progreso",
"inicio": datetime.utcnow(),
"fin": None,
"stats": {},
}
result = db["pipeline_log"].insert_one(doc)
return str(result.inserted_id)
def log_fase_fin(db, log_id: str, estado: str, stats: dict):
from bson import ObjectId
db["pipeline_log"].update_one(
{"_id": ObjectId(log_id)},
{"$set": {"estado": estado, "fin": datetime.utcnow(), "stats": stats}},
)
def ultima_ejecucion_completada(db, fase: str) -> datetime | None:
"""Devuelve la fecha de fin de la última ejecución completada de la fase."""
doc = db["pipeline_log"].find_one(
{"fase": fase, "estado": "completado"},
sort=[("fin", -1)]
)
return doc["fin"] if doc else None
# ── Lockfile ───────────────────────────────────────────────────────────────────
class PipelineLock:
def __init__(self):
self._f = None
def __enter__(self):
self._f = open(LOCK_FILE, "w")
try:
fcntl.flock(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
log.error("Ya hay una instancia del pipeline corriendo. Saliendo.")
sys.exit(1)
self._f.write(str(os.getpid()))
self._f.flush()
return self
def __exit__(self, *args):
fcntl.flock(self._f, fcntl.LOCK_UN)
self._f.close()
LOCK_FILE.unlink(missing_ok=True)
# ── Ejecutor de scripts ────────────────────────────────────────────────────────
def run_script(script_path: Path, args: list[str] = None, cwd: Path = None) -> bool:
"""Ejecuta un script Python con el venv. Devuelve True si éxito."""
cmd = [str(VENV_PYTHON), str(script_path)] + (args or [])
cwd = cwd or script_path.parent
log.info(f"{' '.join(cmd)}")
result = subprocess.run(cmd, cwd=str(cwd))
if result.returncode != 0:
log.error(f" Script falló con código {result.returncode}: {script_path.name}")
return False
return True
# ── FASE 1: SCRAPING ───────────────────────────────────────────────────────────
def fase_scraping(db, forzar: bool = False):
log.info("\n" + "="*60)
log.info(" FASE 1 — SCRAPING")
log.info("="*60)
if not fase_necesita_correr(db, "scraping", forzar):
return
log_id = log_fase_inicio(db, "scraping")
stats = {}
ok = True
try:
# 1a. Wikipedia — scraper por temas
log.info("\n[1a] Wikipedia scraper")
antes_wiki = db["wikipedia"].count_documents({})
ok_wiki = run_script(
BASE_DIR / "WIKIPEDIA/main.py",
cwd=BASE_DIR / "WIKIPEDIA"
)
despues_wiki = db["wikipedia"].count_documents({})
stats["wikipedia_nuevos"] = despues_wiki - antes_wiki
log.info(f" Wikipedia: +{stats['wikipedia_nuevos']} docs nuevos ({despues_wiki} total)")
if not ok_wiki:
ok = False
# 1b. Noticias — scraper web
log.info("\n[1b] Noticias scraper")
antes_not = db["noticias"].count_documents({})
ok_not = run_script(
BASE_DIR / "NOTICIAS/main_noticias.py",
cwd=BASE_DIR / "NOTICIAS"
)
despues_not = db["noticias"].count_documents({})
stats["noticias_nuevos"] = despues_not - antes_not
log.info(f" Noticias: +{stats['noticias_nuevos']} docs nuevos ({despues_not} total)")
if not ok_not:
ok = False
# 1c. Imágenes Wikipedia — scraper de imágenes por temas de FLUJOS
log.info("\n[1c] Wikipedia image scraper")
imagenes_dir = BASE_DIR / "IMAGENES"
antes_img = db["imagenes_wiki"].count_documents({})
ok_img = run_script(
imagenes_dir / "wikipedia_image_scraper.py",
args=["--flujos", "--max", "20", "--mongo"],
cwd=imagenes_dir,
)
despues_img = db["imagenes_wiki"].count_documents({})
stats["imagenes_wiki_nuevas"] = despues_img - antes_img
log.info(f" Imágenes wiki: +{stats['imagenes_wiki_nuevas']} nuevas ({despues_img} total)")
if not ok_img:
ok = False
except Exception as e:
log.error(f" ERROR en fase scraping: {e}")
ok = False
estado = "completado" if ok else "error"
log_fase_fin(db, log_id, estado, stats)
log.info(f"\n Fase scraping: {estado} | {stats}")
return ok
# ── FASE 2: ANÁLISIS ───────────────────────────────────────────────────────────
def fase_analisis(db, forzar: bool = False):
log.info("\n" + "="*60)
log.info(" FASE 2 — ANÁLISIS Y TOKENIZACIÓN")
log.info("="*60)
if not fase_necesita_correr(db, "analisis", forzar):
return
log_id = log_fase_inicio(db, "analisis")
stats = {}
ok = True
try:
# 2a. Análisis de imágenes con Qwen3-VL (resume automático)
log.info("\n[2a] Análisis imágenes Qwen3-VL (resume activado)")
imagenes_dir = BASE_DIR / "IMAGENES"
antes_img = db["imagenes"].count_documents({})
ok_img = run_script(
imagenes_dir / "pipeline_imagenes.py",
args=["--analizar",
"--carpeta", str(imagenes_dir / "output/wiki_images"),
"--mongo"],
cwd=imagenes_dir,
)
despues_img = db["imagenes"].count_documents({})
stats["imagenes_analizadas"] = despues_img - antes_img
log.info(f" Imágenes analizadas: +{stats['imagenes_analizadas']}")
if not ok_img:
ok = False
# 2b. Tokenización texto — pipeline_mongolo (noticias + wikipedia nuevos)
# Solo corre si hay docs nuevos sin tokenizar
log.info("\n[2b] Tokenización texto (pipeline_mongolo)")
comp_dir = BASE_DIR / "COMPARACIONES"
ok_tok = run_script(
comp_dir / "pipeline_mongolo.py",
cwd=comp_dir,
)
if not ok_tok:
ok = False
stats["tokenizacion"] = "ok" if ok_tok else "error"
except Exception as e:
log.error(f" ERROR en fase análisis: {e}")
ok = False
estado = "completado" if ok else "error"
log_fase_fin(db, log_id, estado, stats)
log.info(f"\n Fase análisis: {estado} | {stats}")
return ok
# ── FASE 3: COMPARACIÓN ────────────────────────────────────────────────────────
def fase_comparacion(db, forzar: bool = False):
log.info("\n" + "="*60)
log.info(" FASE 3 — COMPARACIÓN")
log.info("="*60)
if not fase_necesita_correr(db, "comparacion", forzar):
return
log_id = log_fase_inicio(db, "comparacion")
stats = {}
ok = True
try:
comp_dir = BASE_DIR / "COMPARACIONES"
antes = db["comparaciones"].count_documents({})
# Pasar la fecha de la última comparación para modo incremental
ultima = ultima_ejecucion_completada(db, "comparacion")
args = []
if ultima:
args = ["--desde", ultima.strftime("%Y-%m-%d")]
log.info(f" Modo incremental: solo docs desde {ultima.strftime('%Y-%m-%d')}")
else:
log.info(" Primera ejecución: comparando todo el corpus")
ok_comp = run_script(
comp_dir / "pipeline_completo.py",
args=args,
cwd=comp_dir,
)
despues = db["comparaciones"].count_documents({})
stats["comparaciones_nuevas"] = despues - antes
stats["total_comparaciones"] = despues
log.info(f" Comparaciones: +{stats['comparaciones_nuevas']} nuevas ({despues} total)")
if not ok_comp:
ok = False
except Exception as e:
log.error(f" ERROR en fase comparación: {e}")
ok = False
estado = "completado" if ok else "error"
log_fase_fin(db, log_id, estado, stats)
log.info(f"\n Fase comparación: {estado} | {stats}")
return ok
# ── Main ───────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Pipeline maestro FLUJOS")
parser.add_argument("--solo-fase", choices=["scraping", "analisis", "comparacion"],
help="Ejecutar solo una fase específica")
parser.add_argument("--forzar", action="store_true",
help="Ignorar cooldown entre ejecuciones")
args = parser.parse_args()
inicio = datetime.utcnow()
log.info(f"\n{'='*60}")
log.info(f" FLUJOS PIPELINE MAESTRO — {inicio.strftime('%Y-%m-%d %H:%M UTC')}")
log.info(f"{'='*60}")
try:
db = get_db()
log.info(" MongoDB: conectado")
except ConnectionFailure as e:
log.error(f" MongoDB no disponible: {e}")
sys.exit(1)
# Setup índices dedup (idempotente)
log.info("\n Configurando índices de deduplicación...")
setup_indices(db)
with PipelineLock():
if args.solo_fase == "scraping" or not args.solo_fase:
fase_scraping(db, args.forzar)
if args.solo_fase == "analisis" or not args.solo_fase:
fase_analisis(db, args.forzar)
if args.solo_fase == "comparacion" or not args.solo_fase:
fase_comparacion(db, args.forzar)
duracion = datetime.utcnow() - inicio
log.info(f"\n Pipeline completado en {duracion}")
# Mostrar resumen del estado actual de la BD
log.info("\n Estado MongoDB:")
for col in ["wikipedia", "noticias", "imagenes", "imagenes_wiki", "comparaciones"]:
try:
n = db[col].count_documents({})
log.info(f" {col:20s}: {n:,}")
except Exception:
pass
if __name__ == "__main__":
main()