- docs/ → INFO/DOCS/CONTEXT/ (documentación técnica en markdown) - FLUJOS/DOCS/ + FLUJOS_DATOS/DOCS/ → INFO/DOCS/ (txts de arquitectura) - POCS/ → INFO/POCS/ (pruebas de concepto) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
6.8 KiB
Pipeline Maestro — Contexto técnico FLUJOS
Fecha: 2026-04-21
Archivo: FLUJOS_DATOS/pipeline_maestro.py
Scheduler: systemd timer (semanal, domingos 3:00 AM)
Visión general
El pipeline maestro orquesta las tres fases del sistema en orden estricto:
FASE 1 — SCRAPING
├── Wikipedia (main.py) → colección wikipedia
├── Noticias (main_noticias.py) → colección noticias
└── Imágenes Wikipedia (wikipedia_image_scraper.py) → colección imagenes_wiki
FASE 2 — ANÁLISIS
├── Qwen3-VL imágenes (pipeline_imagenes.py --analizar) → colección imagenes
└── Tokenización texto (pipeline_mongolo.py) → colección noticias/wikipedia (actualiza)
FASE 3 — COMPARACIÓN
└── Similitud entre documentos (pipeline_completo.py) → colección comparaciones
Ejecución
# Ejecución completa (las 3 fases)
/var/www/theflows.net/flujos/FLUJOS_DATOS/myenv/bin/python3 pipeline_maestro.py
# Solo una fase
python pipeline_maestro.py --solo-fase scraping
python pipeline_maestro.py --solo-fase analisis
python pipeline_maestro.py --solo-fase comparacion
# Forzar re-ejecución ignorando cooldown de 20h
python pipeline_maestro.py --forzar
python pipeline_maestro.py --solo-fase scraping --forzar
Control de estado — MongoDB pipeline_log
Cada fase registra inicio, fin y stats en la colección pipeline_log:
{
"fase": "scraping",
"estado": "completado" | "en_progreso" | "error",
"inicio": ISODate("2026-04-20T03:00:00Z"),
"fin": ISODate("2026-04-20T04:15:00Z"),
"stats": {
"wikipedia_nuevos": 45,
"noticias_nuevos": 312,
"imagenes_wiki_nuevas": 60
}
}
Lógica de "¿necesita correr?"
def fase_necesita_correr(db, fase, forzar=False):
if forzar: return True
ultimo = db['pipeline_log'].find_one({'fase': fase}, sort=[('inicio', -1)])
if not ultimo: return True # nunca corrió
if ultimo['estado'] != 'completado': return True # última ejecución falló
if utcnow() - ultimo['fin'] < timedelta(hours=20):
return False # cooldown activo
return True
Lockfile — prevención de instancias simultáneas
LOCK_FILE = Path("/tmp/flujos_pipeline.lock")
class PipelineLock:
def __enter__(self):
self._f = open(LOCK_FILE, "w")
fcntl.flock(self._f, fcntl.LOCK_EX | fcntl.LOCK_NB) # non-blocking
# Si ya hay lock → BlockingIOError → log.error + sys.exit(1)
Si el pipeline muere abruptamente, el lock se libera automáticamente cuando el proceso termina (el SO libera todos los file locks del proceso muerto).
Índices de deduplicación (idempotente)
def setup_indices(db):
indices = {
"wikipedia": [("archivo", ASCENDING)],
"noticias": [("archivo", ASCENDING)],
"imagenes": [("archivo", ASCENDING)],
"imagenes_wiki": [("archivo", ASCENDING)],
# comparaciones: SIN índice único (52M docs con posibles duplicados)
}
for col, keys in indices.items():
db[col].create_index(keys, unique=True, background=True)
Se ejecuta al inicio de cada pipeline. create_index es idempotente — no falla si el índice ya existe.
Fase 3 — Modo incremental
La comparación usa el flag --desde para solo comparar documentos nuevos:
ultima = ultima_ejecucion_completada(db, "comparacion")
if ultima:
args = ["--desde", ultima.strftime("%Y-%m-%d")]
# pipeline_completo.py solo procesa docs con fecha >= ultima ejecución
Esto evita re-comparar los 52M pares existentes en cada ejecución.
Systemd — configuración
Archivos instalados en /etc/systemd/system/:
flujos-pipeline.service
[Unit]
Description=FLUJOS Pipeline Maestro
After=network.target mongod.service
Requires=mongod.service
[Service]
Type=oneshot
User=capitansito
WorkingDirectory=/var/www/theflows.net/flujos/FLUJOS_DATOS
Environment=MONGO_URL=mongodb://localhost:27017
Environment=DB_NAME=FLUJOS_DATOS
ExecStart=/var/www/theflows.net/flujos/FLUJOS_DATOS/myenv/bin/python3 \
/var/www/theflows.net/flujos/FLUJOS_DATOS/pipeline_maestro.py
TimeoutStartSec=43200
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
flujos-pipeline.timer
[Unit]
Description=Timer semanal para FLUJOS Pipeline Maestro
[Timer]
OnCalendar=Sun *-*-* 03:00:00
Persistent=true
OnBootSec=2min
[Install]
WantedBy=timers.target
Persistent=true hace que si el servidor estaba apagado el domingo a las 3:00, el timer se dispara 2 minutos después del siguiente arranque (OnBootSec=2min).
Comandos de gestión
# Ver estado del timer
systemctl status flujos-pipeline.timer
# Ver logs del último pipeline
journalctl -u flujos-pipeline.service -n 100
# Lanzar manualmente
systemctl start flujos-pipeline.service
# Habilitar/deshabilitar timer
systemctl enable flujos-pipeline.timer
systemctl disable flujos-pipeline.timer
# Ver cuándo es la próxima ejecución
systemctl list-timers flujos-pipeline.timer
Cooldown y tiempos esperados de ejecución
| Fase | Cooldown mínimo | Tiempo estimado |
|---|---|---|
| scraping | 20h | 2–6h (depende de medios accesibles) |
| analisis | 20h | 1–4h (Qwen en CPU es lento: ~3 min/imagen) |
| comparacion | 20h | Variable (incremental: 30 min / full: varios días) |
Resumen de estadísticas al final
Al completar, el pipeline imprime en el log:
Estado MongoDB:
wikipedia : 5,234
noticias : 21,456
imagenes : 150
imagenes_wiki : 500
comparaciones : 52,100,000
Log del pipeline
FLUJOS_DATOS/pipeline_maestro.log → log principal (en .gitignore)
También visible en journald:
journalctl -u flujos-pipeline.service --since "2026-04-20"
Dependencias Python
pymongo
bson # ObjectId (incluido con pymongo)
fcntl # stdlib Python
argparse # stdlib Python
subprocess
El pipeline maestro solo llama a los sub-scripts vía subprocess.run(), no importa sus módulos directamente.
Flujo completo diagram
pipeline_maestro.py
│
├── setup_indices(db) # crea índices únicos
│
├── [FASE 1] fase_scraping()
│ ├── subprocess: WIKIPEDIA/main.py
│ ├── subprocess: NOTICIAS/main_noticias.py
│ └── subprocess: IMAGENES/wikipedia_image_scraper.py --flujos --max 20 --mongo
│
├── [FASE 2] fase_analisis()
│ ├── subprocess: IMAGENES/pipeline_imagenes.py --analizar --carpeta ... --mongo
│ └── subprocess: COMPARACIONES/pipeline_mongolo.py
│
└── [FASE 3] fase_comparacion()
└── subprocess: COMPARACIONES/pipeline_completo.py [--desde YYYY-MM-DD]