FLUJOS/docs/PIPELINE_MAESTRO.md
CAPITANSITO 83f67b76b4 código completo FLUJOS — snapshot limpio sin datos scrapeados
Incluye: backend Node.js/Express, visualización 3D (Three.js/3d-force-graph),
scrapers Wikipedia/noticias/imágenes, analizador Qwen3-VL, pipeline maestro
con systemd timer, fixes de seguridad (NoSQL injection, XSS, ReDoS, port
binding) y documentación técnica completa en docs/.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 23:45:29 +02:00

266 lines
6.8 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Pipeline Maestro — Contexto técnico FLUJOS
**Fecha:** 2026-04-21
**Archivo:** `FLUJOS_DATOS/pipeline_maestro.py`
**Scheduler:** systemd timer (semanal, domingos 3:00 AM)
---
## Visión general
El pipeline maestro orquesta las tres fases del sistema en orden estricto:
```
FASE 1 — SCRAPING
├── Wikipedia (main.py) → colección wikipedia
├── Noticias (main_noticias.py) → colección noticias
└── Imágenes Wikipedia (wikipedia_image_scraper.py) → colección imagenes_wiki
FASE 2 — ANÁLISIS
├── Qwen3-VL imágenes (pipeline_imagenes.py --analizar) → colección imagenes
└── Tokenización texto (pipeline_mongolo.py) → colección noticias/wikipedia (actualiza)
FASE 3 — COMPARACIÓN
└── Similitud entre documentos (pipeline_completo.py) → colección comparaciones
```
---
## Ejecución
```bash
# Ejecución completa (las 3 fases)
/var/www/theflows.net/flujos/FLUJOS_DATOS/myenv/bin/python3 pipeline_maestro.py
# Solo una fase
python pipeline_maestro.py --solo-fase scraping
python pipeline_maestro.py --solo-fase analisis
python pipeline_maestro.py --solo-fase comparacion
# Forzar re-ejecución ignorando cooldown de 20h
python pipeline_maestro.py --forzar
python pipeline_maestro.py --solo-fase scraping --forzar
```
---
## Control de estado — MongoDB `pipeline_log`
Cada fase registra inicio, fin y stats en la colección `pipeline_log`:
```json
{
"fase": "scraping",
"estado": "completado" | "en_progreso" | "error",
"inicio": ISODate("2026-04-20T03:00:00Z"),
"fin": ISODate("2026-04-20T04:15:00Z"),
"stats": {
"wikipedia_nuevos": 45,
"noticias_nuevos": 312,
"imagenes_wiki_nuevas": 60
}
}
```
### Lógica de "¿necesita correr?"
```python
def fase_necesita_correr(db, fase, forzar=False):
if forzar: return True
ultimo = db['pipeline_log'].find_one({'fase': fase}, sort=[('inicio', -1)])
if not ultimo: return True # nunca corrió
if ultimo['estado'] != 'completado': return True # última ejecución falló
if utcnow() - ultimo['fin'] < timedelta(hours=20):
return False # cooldown activo
return True
```
---
## Lockfile — prevención de instancias simultáneas
```python
LOCK_FILE = Path("/tmp/flujos_pipeline.lock")
class PipelineLock:
def __enter__(self):
self._f = open(LOCK_FILE, "w")
fcntl.flock(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) # non-blocking
# Si ya hay lock → BlockingIOError → log.error + sys.exit(1)
```
Si el pipeline muere abruptamente, el lock se libera automáticamente cuando el proceso termina (el SO libera todos los file locks del proceso muerto).
---
## Índices de deduplicación (idempotente)
```python
def setup_indices(db):
indices = {
"wikipedia": [("archivo", ASCENDING)],
"noticias": [("archivo", ASCENDING)],
"imagenes": [("archivo", ASCENDING)],
"imagenes_wiki": [("archivo", ASCENDING)],
# comparaciones: SIN índice único (52M docs con posibles duplicados)
}
for col, keys in indices.items():
db[col].create_index(keys, unique=True, background=True)
```
Se ejecuta al inicio de cada pipeline. `create_index` es idempotente — no falla si el índice ya existe.
---
## Fase 3 — Modo incremental
La comparación usa el flag `--desde` para solo comparar documentos nuevos:
```python
ultima = ultima_ejecucion_completada(db, "comparacion")
if ultima:
args = ["--desde", ultima.strftime("%Y-%m-%d")]
# pipeline_completo.py solo procesa docs con fecha >= ultima ejecución
```
Esto evita re-comparar los 52M pares existentes en cada ejecución.
---
## Systemd — configuración
**Archivos instalados en `/etc/systemd/system/`:**
### flujos-pipeline.service
```ini
[Unit]
Description=FLUJOS Pipeline Maestro
After=network.target mongod.service
Requires=mongod.service
[Service]
Type=oneshot
User=capitansito
WorkingDirectory=/var/www/theflows.net/flujos/FLUJOS_DATOS
Environment=MONGO_URL=mongodb://localhost:27017
Environment=DB_NAME=FLUJOS_DATOS
ExecStart=/var/www/theflows.net/flujos/FLUJOS_DATOS/myenv/bin/python3 \
/var/www/theflows.net/flujos/FLUJOS_DATOS/pipeline_maestro.py
TimeoutStartSec=43200
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
```
### flujos-pipeline.timer
```ini
[Unit]
Description=Timer semanal para FLUJOS Pipeline Maestro
[Timer]
OnCalendar=Sun *-*-* 03:00:00
Persistent=true
OnBootSec=2min
[Install]
WantedBy=timers.target
```
`Persistent=true` hace que si el servidor estaba apagado el domingo a las 3:00, el timer se dispara 2 minutos después del siguiente arranque (`OnBootSec=2min`).
### Comandos de gestión
```bash
# Ver estado del timer
systemctl status flujos-pipeline.timer
# Ver logs del último pipeline
journalctl -u flujos-pipeline.service -n 100
# Lanzar manualmente
systemctl start flujos-pipeline.service
# Habilitar/deshabilitar timer
systemctl enable flujos-pipeline.timer
systemctl disable flujos-pipeline.timer
# Ver cuándo es la próxima ejecución
systemctl list-timers flujos-pipeline.timer
```
---
## Cooldown y tiempos esperados de ejecución
| Fase | Cooldown mínimo | Tiempo estimado |
|---|---|---|
| scraping | 20h | 26h (depende de medios accesibles) |
| analisis | 20h | 14h (Qwen en CPU es lento: ~3 min/imagen) |
| comparacion | 20h | Variable (incremental: 30 min / full: varios días) |
---
## Resumen de estadísticas al final
Al completar, el pipeline imprime en el log:
```
Estado MongoDB:
wikipedia : 5,234
noticias : 21,456
imagenes : 150
imagenes_wiki : 500
comparaciones : 52,100,000
```
---
## Log del pipeline
```
FLUJOS_DATOS/pipeline_maestro.log → log principal (en .gitignore)
```
También visible en journald:
```bash
journalctl -u flujos-pipeline.service --since "2026-04-20"
```
---
## Dependencias Python
```
pymongo
bson # ObjectId (incluido con pymongo)
fcntl # stdlib Python
argparse # stdlib Python
subprocess
```
El pipeline maestro solo llama a los sub-scripts vía `subprocess.run()`, no importa sus módulos directamente.
---
## Flujo completo diagram
```
pipeline_maestro.py
├── setup_indices(db) # crea índices únicos
├── [FASE 1] fase_scraping()
│ ├── subprocess: WIKIPEDIA/main.py
│ ├── subprocess: NOTICIAS/main_noticias.py
│ └── subprocess: IMAGENES/wikipedia_image_scraper.py --flujos --max 20 --mongo
├── [FASE 2] fase_analisis()
│ ├── subprocess: IMAGENES/pipeline_imagenes.py --analizar --carpeta ... --mongo
│ └── subprocess: COMPARACIONES/pipeline_mongolo.py
└── [FASE 3] fase_comparacion()
└── subprocess: COMPARACIONES/pipeline_completo.py [--desde YYYY-MM-DD]
```