# Pipeline Maestro — Contexto técnico FLUJOS **Fecha:** 2026-04-21 **Archivo:** `FLUJOS_DATOS/pipeline_maestro.py` **Scheduler:** systemd timer (semanal, domingos 3:00 AM) --- ## Visión general El pipeline maestro orquesta las tres fases del sistema en orden estricto: ``` FASE 1 — SCRAPING ├── Wikipedia (main.py) → colección wikipedia ├── Noticias (main_noticias.py) → colección noticias └── Imágenes Wikipedia (wikipedia_image_scraper.py) → colección imagenes_wiki FASE 2 — ANÁLISIS ├── Qwen3-VL imágenes (pipeline_imagenes.py --analizar) → colección imagenes └── Tokenización texto (pipeline_mongolo.py) → colección noticias/wikipedia (actualiza) FASE 3 — COMPARACIÓN └── Similitud entre documentos (pipeline_completo.py) → colección comparaciones ``` --- ## Ejecución ```bash # Ejecución completa (las 3 fases) /var/www/theflows.net/flujos/FLUJOS_DATOS/myenv/bin/python3 pipeline_maestro.py # Solo una fase python pipeline_maestro.py --solo-fase scraping python pipeline_maestro.py --solo-fase analisis python pipeline_maestro.py --solo-fase comparacion # Forzar re-ejecución ignorando cooldown de 20h python pipeline_maestro.py --forzar python pipeline_maestro.py --solo-fase scraping --forzar ``` --- ## Control de estado — MongoDB `pipeline_log` Cada fase registra inicio, fin y stats en la colección `pipeline_log`: ```json { "fase": "scraping", "estado": "completado" | "en_progreso" | "error", "inicio": ISODate("2026-04-20T03:00:00Z"), "fin": ISODate("2026-04-20T04:15:00Z"), "stats": { "wikipedia_nuevos": 45, "noticias_nuevos": 312, "imagenes_wiki_nuevas": 60 } } ``` ### Lógica de "¿necesita correr?" ```python def fase_necesita_correr(db, fase, forzar=False): if forzar: return True ultimo = db['pipeline_log'].find_one({'fase': fase}, sort=[('inicio', -1)]) if not ultimo: return True # nunca corrió if ultimo['estado'] != 'completado': return True # última ejecución falló if utcnow() - ultimo['fin'] < timedelta(hours=20): return False # cooldown activo return True ``` --- ## Lockfile — prevención de instancias simultáneas ```python LOCK_FILE = Path("/tmp/flujos_pipeline.lock") class PipelineLock: def __enter__(self): self._f = open(LOCK_FILE, "w") fcntl.flock(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) # non-blocking # Si ya hay lock → BlockingIOError → log.error + sys.exit(1) ``` Si el pipeline muere abruptamente, el lock se libera automáticamente cuando el proceso termina (el SO libera todos los file locks del proceso muerto). --- ## Índices de deduplicación (idempotente) ```python def setup_indices(db): indices = { "wikipedia": [("archivo", ASCENDING)], "noticias": [("archivo", ASCENDING)], "imagenes": [("archivo", ASCENDING)], "imagenes_wiki": [("archivo", ASCENDING)], # comparaciones: SIN índice único (52M docs con posibles duplicados) } for col, keys in indices.items(): db[col].create_index(keys, unique=True, background=True) ``` Se ejecuta al inicio de cada pipeline. `create_index` es idempotente — no falla si el índice ya existe. --- ## Fase 3 — Modo incremental La comparación usa el flag `--desde` para solo comparar documentos nuevos: ```python ultima = ultima_ejecucion_completada(db, "comparacion") if ultima: args = ["--desde", ultima.strftime("%Y-%m-%d")] # pipeline_completo.py solo procesa docs con fecha >= ultima ejecución ``` Esto evita re-comparar los 52M pares existentes en cada ejecución. --- ## Systemd — configuración **Archivos instalados en `/etc/systemd/system/`:** ### flujos-pipeline.service ```ini [Unit] Description=FLUJOS Pipeline Maestro After=network.target mongod.service Requires=mongod.service [Service] Type=oneshot User=capitansito WorkingDirectory=/var/www/theflows.net/flujos/FLUJOS_DATOS Environment=MONGO_URL=mongodb://localhost:27017 Environment=DB_NAME=FLUJOS_DATOS ExecStart=/var/www/theflows.net/flujos/FLUJOS_DATOS/myenv/bin/python3 \ /var/www/theflows.net/flujos/FLUJOS_DATOS/pipeline_maestro.py TimeoutStartSec=43200 Nice=15 CPUQuota=400% StandardOutput=journal StandardError=journal [Install] WantedBy=multi-user.target ``` `Nice=15` + `CPUQuota=400%` limita el pipeline a ~4 cores para no saturar el servidor (Qwen en CPU puede usar 1000%+ por instancia sin esto). ### flujos-pipeline.timer ```ini [Unit] Description=Ejecutar FLUJOS Pipeline cada 2 dias a las 03:00 Requires=flujos-pipeline.service [Timer] OnBootSec=2min OnUnitActiveSec=2d Persistent=true Unit=flujos-pipeline.service [Install] WantedBy=timers.target ``` Cambiado de semanal (`OnCalendar=Sun`) a cada 2 días (`OnUnitActiveSec=2d`) el 2026-04-22. `Persistent=true` hace que si el servidor estaba apagado cuando tocaba, el timer se dispara 2 minutos después del siguiente arranque. ### Comandos de gestión ```bash # Ver estado del timer systemctl status flujos-pipeline.timer # Ver logs del último pipeline journalctl -u flujos-pipeline.service -n 100 # Lanzar manualmente systemctl start flujos-pipeline.service # Habilitar/deshabilitar timer systemctl enable flujos-pipeline.timer systemctl disable flujos-pipeline.timer # Ver cuándo es la próxima ejecución systemctl list-timers flujos-pipeline.timer ``` --- ## Cooldown y tiempos esperados de ejecución | Fase | Cooldown mínimo | Tiempo estimado | |---|---|---| | scraping | 20h | 2–6h (depende de medios accesibles) | | analisis | 20h | 1–4h (Qwen en CPU es lento: ~3 min/imagen) | | comparacion | 20h | Variable (incremental: 30 min / full: varios días) | --- ## Resumen de estadísticas al final Al completar, el pipeline imprime en el log: ``` Estado MongoDB: wikipedia : 5,234 noticias : 21,456 imagenes : 150 imagenes_wiki : 500 comparaciones : 52,100,000 ``` --- ## Log del pipeline ``` FLUJOS_DATOS/pipeline_maestro.log → log principal (en .gitignore) ``` También visible en journald: ```bash journalctl -u flujos-pipeline.service --since "2026-04-20" ``` --- ## Errores conocidos y fixes aplicados (2026-04-22) | Script | Error | Fix | |---|---|---| | `WIKIPEDIA/main.py` | `NameError: buscar_articulos` no definida | Añadido `from wikipedia_utils import buscar_articulos, obtener_contenido_wikipedia` | | `WIKIPEDIA/main.py` | `ModuleNotFoundError: wikipedia` | `pip install wikipedia` en myenv | | `WIKIPEDIA/main.py` | `ModuleNotFoundError: wikipediaapi` | `pip install wikipedia-api` en myenv | | `NOTICIAS/main_noticias.py` | `ModuleNotFoundError: deep_translator` | `pip install deep-translator` en myenv | Todos los módulos se instalan dentro del entorno virtual: ```bash /var/www/theflows.net/flujos/FLUJOS_DATOS/myenv/bin/python3 -m pip install ``` No usar `pip` directamente (el ejecutable `pip` del venv puede estar roto; usar `python3 -m pip`). --- ## Dependencias Python ``` pymongo bson # ObjectId (incluido con pymongo) fcntl # stdlib Python argparse # stdlib Python subprocess ``` El pipeline maestro solo llama a los sub-scripts vía `subprocess.run()`, no importa sus módulos directamente. --- ## Flujo completo diagram ``` pipeline_maestro.py │ ├── setup_indices(db) # crea índices únicos │ ├── [FASE 1] fase_scraping() │ ├── subprocess: WIKIPEDIA/main.py │ ├── subprocess: NOTICIAS/main_noticias.py │ └── subprocess: IMAGENES/wikipedia_image_scraper.py --flujos --max 20 --mongo │ ├── [FASE 2] fase_analisis() │ ├── subprocess: IMAGENES/pipeline_imagenes.py --analizar --carpeta ... --mongo │ └── subprocess: COMPARACIONES/pipeline_mongolo.py │ └── [FASE 3] fase_comparacion() └── subprocess: COMPARACIONES/pipeline_completo.py [--desde YYYY-MM-DD] ```