Incluye: backend Node.js/Express, visualización 3D (Three.js/3d-force-graph), scrapers Wikipedia/noticias/imágenes, analizador Qwen3-VL, pipeline maestro con systemd timer, fixes de seguridad (NoSQL injection, XSS, ReDoS, port binding) y documentación técnica completa en docs/. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
408 lines
15 KiB
Python
408 lines
15 KiB
Python
"""
|
|
pipeline_maestro.py
|
|
-------------------
|
|
Orquesta las tres fases del pipeline FLUJOS en orden estricto:
|
|
|
|
Fase 1 — SCRAPING Wikipedia + Noticias + Imágenes Wikipedia
|
|
Fase 2 — ANÁLISIS Tokenización texto (BERT) + Análisis imágenes (Qwen3-VL)
|
|
Fase 3 — COMPARACIÓN Solo documentos nuevos vs corpus existente
|
|
|
|
Garantías:
|
|
- Lockfile: solo una instancia a la vez
|
|
- Estado en MongoDB (`pipeline_log`): cada fase registra inicio/fin/stats
|
|
- Dedup: índices únicos en todas las colecciones (setup automático)
|
|
- Resume: si el proceso muere a mitad, la siguiente ejecución retoma
|
|
desde la última fase que no estaba "completado"
|
|
|
|
Ejecutar manualmente:
|
|
python pipeline_maestro.py
|
|
python pipeline_maestro.py --solo-fase scraping
|
|
python pipeline_maestro.py --solo-fase analisis
|
|
python pipeline_maestro.py --solo-fase comparacion
|
|
python pipeline_maestro.py --forzar # ignora si una fase ya corrió hoy
|
|
"""
|
|
|
|
import argparse
|
|
import fcntl
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
from pymongo import MongoClient, ASCENDING
|
|
from pymongo.errors import DuplicateKeyError, ConnectionFailure
|
|
|
|
# ── Rutas ──────────────────────────────────────────────────────────────────────
|
|
|
|
BASE_DIR = Path(__file__).parent
|
|
VENV_PYTHON = BASE_DIR / "myenv/bin/python3"
|
|
LOCK_FILE = Path("/tmp/flujos_pipeline.lock")
|
|
LOG_FILE = BASE_DIR / "pipeline_maestro.log"
|
|
|
|
MONGO_URL = os.getenv("MONGO_URL", "mongodb://localhost:27017")
|
|
DB_NAME = os.getenv("DB_NAME", "FLUJOS_DATOS")
|
|
|
|
# Intervalo mínimo entre ejecuciones de cada fase (horas)
|
|
# Evita re-ejecutar si el systemd timer se dispara accidentalmente dos veces
|
|
MIN_HORAS_ENTRE_FASES = {
|
|
"scraping": 20,
|
|
"analisis": 20,
|
|
"comparacion": 20,
|
|
}
|
|
|
|
# ── Logging ────────────────────────────────────────────────────────────────────
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.FileHandler(LOG_FILE),
|
|
logging.StreamHandler(sys.stdout),
|
|
],
|
|
)
|
|
log = logging.getLogger("pipeline_maestro")
|
|
|
|
|
|
# ── MongoDB ────────────────────────────────────────────────────────────────────
|
|
|
|
def get_db():
|
|
client = MongoClient(MONGO_URL, serverSelectionTimeoutMS=5000)
|
|
client.admin.command("ping")
|
|
return client[DB_NAME]
|
|
|
|
|
|
def setup_indices(db):
|
|
"""Crea índices únicos para deduplicación en todas las colecciones."""
|
|
indices = {
|
|
"wikipedia": [("archivo", ASCENDING)],
|
|
"noticias": [("archivo", ASCENDING)],
|
|
"imagenes": [("archivo", ASCENDING)],
|
|
"imagenes_wiki":[("archivo", ASCENDING)],
|
|
# comparaciones no tiene índice único — usa upsert por (noticia1, noticia2)
|
|
}
|
|
for col, keys in indices.items():
|
|
try:
|
|
if len(keys) == 1:
|
|
db[col].create_index(keys, unique=True, background=True)
|
|
else:
|
|
db[col].create_index(keys, unique=True, background=True)
|
|
log.info(f" índice OK: {col} → {[k for k,_ in keys]}")
|
|
except Exception as e:
|
|
log.warning(f" índice {col}: {e}")
|
|
|
|
|
|
# ── Estado de fases ────────────────────────────────────────────────────────────
|
|
|
|
def fase_estado(db, fase: str) -> dict | None:
|
|
"""Devuelve el último registro de la fase, o None si nunca corrió."""
|
|
return db["pipeline_log"].find_one({"fase": fase}, sort=[("inicio", -1)])
|
|
|
|
|
|
def fase_necesita_correr(db, fase: str, forzar: bool = False) -> bool:
|
|
if forzar:
|
|
return True
|
|
ultimo = fase_estado(db, fase)
|
|
if not ultimo:
|
|
return True
|
|
if ultimo.get("estado") != "completado":
|
|
log.info(f" [{fase}] última ejecución no completó — reintentando")
|
|
return True
|
|
ultima_fin = ultimo.get("fin")
|
|
if not ultima_fin:
|
|
return True
|
|
horas = MIN_HORAS_ENTRE_FASES.get(fase, 20)
|
|
if datetime.utcnow() - ultima_fin < timedelta(hours=horas):
|
|
log.info(f" [{fase}] completada hace menos de {horas}h — saltando")
|
|
return False
|
|
return True
|
|
|
|
|
|
def log_fase_inicio(db, fase: str) -> str:
|
|
doc = {
|
|
"fase": fase,
|
|
"estado": "en_progreso",
|
|
"inicio": datetime.utcnow(),
|
|
"fin": None,
|
|
"stats": {},
|
|
}
|
|
result = db["pipeline_log"].insert_one(doc)
|
|
return str(result.inserted_id)
|
|
|
|
|
|
def log_fase_fin(db, log_id: str, estado: str, stats: dict):
|
|
from bson import ObjectId
|
|
db["pipeline_log"].update_one(
|
|
{"_id": ObjectId(log_id)},
|
|
{"$set": {"estado": estado, "fin": datetime.utcnow(), "stats": stats}},
|
|
)
|
|
|
|
|
|
def ultima_ejecucion_completada(db, fase: str) -> datetime | None:
|
|
"""Devuelve la fecha de fin de la última ejecución completada de la fase."""
|
|
doc = db["pipeline_log"].find_one(
|
|
{"fase": fase, "estado": "completado"},
|
|
sort=[("fin", -1)]
|
|
)
|
|
return doc["fin"] if doc else None
|
|
|
|
|
|
# ── Lockfile ───────────────────────────────────────────────────────────────────
|
|
|
|
class PipelineLock:
|
|
def __init__(self):
|
|
self._f = None
|
|
|
|
def __enter__(self):
|
|
self._f = open(LOCK_FILE, "w")
|
|
try:
|
|
fcntl.flock(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
except BlockingIOError:
|
|
log.error("Ya hay una instancia del pipeline corriendo. Saliendo.")
|
|
sys.exit(1)
|
|
self._f.write(str(os.getpid()))
|
|
self._f.flush()
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
fcntl.flock(self._f, fcntl.LOCK_UN)
|
|
self._f.close()
|
|
LOCK_FILE.unlink(missing_ok=True)
|
|
|
|
|
|
# ── Ejecutor de scripts ────────────────────────────────────────────────────────
|
|
|
|
def run_script(script_path: Path, args: list[str] = None, cwd: Path = None) -> bool:
|
|
"""Ejecuta un script Python con el venv. Devuelve True si éxito."""
|
|
cmd = [str(VENV_PYTHON), str(script_path)] + (args or [])
|
|
cwd = cwd or script_path.parent
|
|
log.info(f" → {' '.join(cmd)}")
|
|
result = subprocess.run(cmd, cwd=str(cwd))
|
|
if result.returncode != 0:
|
|
log.error(f" Script falló con código {result.returncode}: {script_path.name}")
|
|
return False
|
|
return True
|
|
|
|
|
|
# ── FASE 1: SCRAPING ───────────────────────────────────────────────────────────
|
|
|
|
def fase_scraping(db, forzar: bool = False):
|
|
log.info("\n" + "="*60)
|
|
log.info(" FASE 1 — SCRAPING")
|
|
log.info("="*60)
|
|
|
|
if not fase_necesita_correr(db, "scraping", forzar):
|
|
return
|
|
|
|
log_id = log_fase_inicio(db, "scraping")
|
|
stats = {}
|
|
ok = True
|
|
|
|
try:
|
|
# 1a. Wikipedia — scraper por temas
|
|
log.info("\n[1a] Wikipedia scraper")
|
|
antes_wiki = db["wikipedia"].count_documents({})
|
|
ok_wiki = run_script(
|
|
BASE_DIR / "WIKIPEDIA/main.py",
|
|
cwd=BASE_DIR / "WIKIPEDIA"
|
|
)
|
|
despues_wiki = db["wikipedia"].count_documents({})
|
|
stats["wikipedia_nuevos"] = despues_wiki - antes_wiki
|
|
log.info(f" Wikipedia: +{stats['wikipedia_nuevos']} docs nuevos ({despues_wiki} total)")
|
|
if not ok_wiki:
|
|
ok = False
|
|
|
|
# 1b. Noticias — scraper web
|
|
log.info("\n[1b] Noticias scraper")
|
|
antes_not = db["noticias"].count_documents({})
|
|
ok_not = run_script(
|
|
BASE_DIR / "NOTICIAS/main_noticias.py",
|
|
cwd=BASE_DIR / "NOTICIAS"
|
|
)
|
|
despues_not = db["noticias"].count_documents({})
|
|
stats["noticias_nuevos"] = despues_not - antes_not
|
|
log.info(f" Noticias: +{stats['noticias_nuevos']} docs nuevos ({despues_not} total)")
|
|
if not ok_not:
|
|
ok = False
|
|
|
|
# 1c. Imágenes Wikipedia — scraper de imágenes por temas de FLUJOS
|
|
log.info("\n[1c] Wikipedia image scraper")
|
|
imagenes_dir = BASE_DIR / "IMAGENES"
|
|
antes_img = db["imagenes_wiki"].count_documents({})
|
|
ok_img = run_script(
|
|
imagenes_dir / "wikipedia_image_scraper.py",
|
|
args=["--flujos", "--max", "20", "--mongo"],
|
|
cwd=imagenes_dir,
|
|
)
|
|
despues_img = db["imagenes_wiki"].count_documents({})
|
|
stats["imagenes_wiki_nuevas"] = despues_img - antes_img
|
|
log.info(f" Imágenes wiki: +{stats['imagenes_wiki_nuevas']} nuevas ({despues_img} total)")
|
|
if not ok_img:
|
|
ok = False
|
|
|
|
except Exception as e:
|
|
log.error(f" ERROR en fase scraping: {e}")
|
|
ok = False
|
|
|
|
estado = "completado" if ok else "error"
|
|
log_fase_fin(db, log_id, estado, stats)
|
|
log.info(f"\n Fase scraping: {estado} | {stats}")
|
|
return ok
|
|
|
|
|
|
# ── FASE 2: ANÁLISIS ───────────────────────────────────────────────────────────
|
|
|
|
def fase_analisis(db, forzar: bool = False):
|
|
log.info("\n" + "="*60)
|
|
log.info(" FASE 2 — ANÁLISIS Y TOKENIZACIÓN")
|
|
log.info("="*60)
|
|
|
|
if not fase_necesita_correr(db, "analisis", forzar):
|
|
return
|
|
|
|
log_id = log_fase_inicio(db, "analisis")
|
|
stats = {}
|
|
ok = True
|
|
|
|
try:
|
|
# 2a. Análisis de imágenes con Qwen3-VL (resume automático)
|
|
log.info("\n[2a] Análisis imágenes Qwen3-VL (resume activado)")
|
|
imagenes_dir = BASE_DIR / "IMAGENES"
|
|
antes_img = db["imagenes"].count_documents({})
|
|
ok_img = run_script(
|
|
imagenes_dir / "pipeline_imagenes.py",
|
|
args=["--analizar",
|
|
"--carpeta", str(imagenes_dir / "output/wiki_images"),
|
|
"--mongo"],
|
|
cwd=imagenes_dir,
|
|
)
|
|
despues_img = db["imagenes"].count_documents({})
|
|
stats["imagenes_analizadas"] = despues_img - antes_img
|
|
log.info(f" Imágenes analizadas: +{stats['imagenes_analizadas']}")
|
|
if not ok_img:
|
|
ok = False
|
|
|
|
# 2b. Tokenización texto — pipeline_mongolo (noticias + wikipedia nuevos)
|
|
# Solo corre si hay docs nuevos sin tokenizar
|
|
log.info("\n[2b] Tokenización texto (pipeline_mongolo)")
|
|
comp_dir = BASE_DIR / "COMPARACIONES"
|
|
ok_tok = run_script(
|
|
comp_dir / "pipeline_mongolo.py",
|
|
cwd=comp_dir,
|
|
)
|
|
if not ok_tok:
|
|
ok = False
|
|
stats["tokenizacion"] = "ok" if ok_tok else "error"
|
|
|
|
except Exception as e:
|
|
log.error(f" ERROR en fase análisis: {e}")
|
|
ok = False
|
|
|
|
estado = "completado" if ok else "error"
|
|
log_fase_fin(db, log_id, estado, stats)
|
|
log.info(f"\n Fase análisis: {estado} | {stats}")
|
|
return ok
|
|
|
|
|
|
# ── FASE 3: COMPARACIÓN ────────────────────────────────────────────────────────
|
|
|
|
def fase_comparacion(db, forzar: bool = False):
|
|
log.info("\n" + "="*60)
|
|
log.info(" FASE 3 — COMPARACIÓN")
|
|
log.info("="*60)
|
|
|
|
if not fase_necesita_correr(db, "comparacion", forzar):
|
|
return
|
|
|
|
log_id = log_fase_inicio(db, "comparacion")
|
|
stats = {}
|
|
ok = True
|
|
|
|
try:
|
|
comp_dir = BASE_DIR / "COMPARACIONES"
|
|
antes = db["comparaciones"].count_documents({})
|
|
|
|
# Pasar la fecha de la última comparación para modo incremental
|
|
ultima = ultima_ejecucion_completada(db, "comparacion")
|
|
args = []
|
|
if ultima:
|
|
args = ["--desde", ultima.strftime("%Y-%m-%d")]
|
|
log.info(f" Modo incremental: solo docs desde {ultima.strftime('%Y-%m-%d')}")
|
|
else:
|
|
log.info(" Primera ejecución: comparando todo el corpus")
|
|
|
|
ok_comp = run_script(
|
|
comp_dir / "pipeline_completo.py",
|
|
args=args,
|
|
cwd=comp_dir,
|
|
)
|
|
despues = db["comparaciones"].count_documents({})
|
|
stats["comparaciones_nuevas"] = despues - antes
|
|
stats["total_comparaciones"] = despues
|
|
log.info(f" Comparaciones: +{stats['comparaciones_nuevas']} nuevas ({despues} total)")
|
|
if not ok_comp:
|
|
ok = False
|
|
|
|
except Exception as e:
|
|
log.error(f" ERROR en fase comparación: {e}")
|
|
ok = False
|
|
|
|
estado = "completado" if ok else "error"
|
|
log_fase_fin(db, log_id, estado, stats)
|
|
log.info(f"\n Fase comparación: {estado} | {stats}")
|
|
return ok
|
|
|
|
|
|
# ── Main ───────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Pipeline maestro FLUJOS")
|
|
parser.add_argument("--solo-fase", choices=["scraping", "analisis", "comparacion"],
|
|
help="Ejecutar solo una fase específica")
|
|
parser.add_argument("--forzar", action="store_true",
|
|
help="Ignorar cooldown entre ejecuciones")
|
|
args = parser.parse_args()
|
|
|
|
inicio = datetime.utcnow()
|
|
log.info(f"\n{'='*60}")
|
|
log.info(f" FLUJOS PIPELINE MAESTRO — {inicio.strftime('%Y-%m-%d %H:%M UTC')}")
|
|
log.info(f"{'='*60}")
|
|
|
|
try:
|
|
db = get_db()
|
|
log.info(" MongoDB: conectado")
|
|
except ConnectionFailure as e:
|
|
log.error(f" MongoDB no disponible: {e}")
|
|
sys.exit(1)
|
|
|
|
# Setup índices dedup (idempotente)
|
|
log.info("\n Configurando índices de deduplicación...")
|
|
setup_indices(db)
|
|
|
|
with PipelineLock():
|
|
if args.solo_fase == "scraping" or not args.solo_fase:
|
|
fase_scraping(db, args.forzar)
|
|
|
|
if args.solo_fase == "analisis" or not args.solo_fase:
|
|
fase_analisis(db, args.forzar)
|
|
|
|
if args.solo_fase == "comparacion" or not args.solo_fase:
|
|
fase_comparacion(db, args.forzar)
|
|
|
|
duracion = datetime.utcnow() - inicio
|
|
log.info(f"\n Pipeline completado en {duracion}")
|
|
|
|
# Mostrar resumen del estado actual de la BD
|
|
log.info("\n Estado MongoDB:")
|
|
for col in ["wikipedia", "noticias", "imagenes", "imagenes_wiki", "comparaciones"]:
|
|
try:
|
|
n = db[col].count_documents({})
|
|
log.info(f" {col:20s}: {n:,}")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|