rss/embeddings_worker.py
2025-11-19 21:29:15 +01:00

190 lines
6.1 KiB
Python

# embeddings_worker.py
# Worker de embeddings para TRADUCCIONES:
# - Lee traducciones con status='done' y sin embedding para un modelo concreto
# - Calcula embedding (Sentence-Transformers) sobre titulo_trad + resumen_trad
# - Guarda en traduccion_embeddings (traduccion_id, model, dim, embedding)
import os
import time
import logging
from typing import List
import numpy as np
import psycopg2
import psycopg2.extras
from sentence_transformers import SentenceTransformer
import torch
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s')
log = logging.getLogger(__name__)
# ---------- Configuración DB ----------
DB = dict(
host=os.environ.get("DB_HOST", "localhost"),
port=int(os.environ.get("DB_PORT", 5432)),
dbname=os.environ.get("DB_NAME", "rss"),
user=os.environ.get("DB_USER", "rss"),
password=os.environ.get("DB_PASS", "x"),
)
# ---------- Parámetros de worker ----------
# Modelo por defecto: multilingüe, bueno para muchas lenguas
EMB_MODEL = os.environ.get(
"EMB_MODEL",
"sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
)
EMB_BATCH = int(os.environ.get("EMB_BATCH", "128"))
SLEEP_IDLE = float(os.environ.get("EMB_SLEEP_IDLE", "5.0"))
# Filtrado por idiomas destino (coma-separado). Por defecto sólo 'es'
EMB_LANGS = [s.strip() for s in os.environ.get("EMB_LANGS", "es").split(",") if s.strip()]
# DEVICE_ENV: 'auto' | 'cpu' | 'cuda'
DEVICE_ENV = os.environ.get("DEVICE", "auto").lower()
# Límite por iteración (para no tragar toda la tabla de golpe)
EMB_LIMIT = int(os.environ.get("EMB_LIMIT", "1000"))
# ---------- Utilidades ----------
def get_conn():
return psycopg2.connect(**DB)
def ensure_schema(conn):
"""Crea la tabla de embeddings para traducciones si no existe."""
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS traduccion_embeddings (
id SERIAL PRIMARY KEY,
traduccion_id INT NOT NULL REFERENCES traducciones(id) ON DELETE CASCADE,
model TEXT NOT NULL,
dim INT NOT NULL,
embedding DOUBLE PRECISION[] NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE (traduccion_id, model)
);
"""
)
cur.execute("CREATE INDEX IF NOT EXISTS idx_tr_emb_model ON traduccion_embeddings(model);")
cur.execute("CREATE INDEX IF NOT EXISTS idx_tr_emb_trid ON traduccion_embeddings(traduccion_id);")
conn.commit()
def fetch_batch_pending(conn) -> List[psycopg2.extras.DictRow]:
"""
Devuelve un lote de traducciones 'done' del/los idioma(s) objetivo
que no tienen embedding aún para el EMB_MODEL indicado.
"""
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(
"""
SELECT t.id AS traduccion_id,
t.lang_to AS lang_to,
COALESCE(NULLIF(t.titulo_trad, ''), '') AS titulo_trad,
COALESCE(NULLIF(t.resumen_trad,''), '') AS resumen_trad,
n.id AS noticia_id
FROM traducciones t
JOIN noticias n ON n.id = t.noticia_id
LEFT JOIN traduccion_embeddings e
ON e.traduccion_id = t.id AND e.model = %s
WHERE t.status = 'done'
AND t.lang_to = ANY(%s)
AND e.traduccion_id IS NULL
ORDER BY t.id
LIMIT %s
""",
(EMB_MODEL, EMB_LANGS, EMB_LIMIT),
)
rows = cur.fetchall()
return rows
def texts_from_rows(rows: List[psycopg2.extras.DictRow]) -> List[str]:
"""
Compone el texto a vectorizar por cada traducción:
'titulo_trad' + '\n' + 'resumen_trad'. Si alguno falta, usa lo disponible.
"""
texts = []
for r in rows:
title = (r["titulo_trad"] or "").strip()
body = (r["resumen_trad"] or "").strip()
if title and body:
texts.append(f"{title}\n{body}")
else:
texts.append(title or body or "")
return texts
def upsert_embeddings(conn, rows, embs: np.ndarray, model_name: str):
"""
Inserta/actualiza embeddings por traducción.
"""
if embs.size == 0 or not rows:
return
dim = int(embs.shape[1])
with conn.cursor() as cur:
for r, e in zip(rows, embs):
cur.execute(
"""
INSERT INTO traduccion_embeddings (traduccion_id, model, dim, embedding)
VALUES (%s, %s, %s, %s)
ON CONFLICT (traduccion_id, model) DO UPDATE
SET embedding = EXCLUDED.embedding,
dim = EXCLUDED.dim,
created_at = NOW()
""",
(int(r["traduccion_id"]), model_name, dim, list(map(float, e))),
)
conn.commit()
# ---------- Main loop ----------
def main():
log.info("Arrancando embeddings_worker para TRADUCCIONES")
log.info(
"Modelo: %s | Batch: %s | Idiomas: %s | DEVICE env: %s",
EMB_MODEL,
EMB_BATCH,
",".join(EMB_LANGS),
DEVICE_ENV,
)
if DEVICE_ENV == "auto":
device = "cuda" if torch.cuda.is_available() else "cpu"
else:
device = DEVICE_ENV
log.info("Usando dispositivo: %s", device)
model = SentenceTransformer(EMB_MODEL, device=device)
while True:
try:
with get_conn() as conn:
ensure_schema(conn)
rows = fetch_batch_pending(conn)
if not rows:
time.sleep(SLEEP_IDLE)
continue
texts = texts_from_rows(rows)
embs = model.encode(
texts,
batch_size=EMB_BATCH,
convert_to_numpy=True,
show_progress_bar=False,
normalize_embeddings=True,
)
upsert_embeddings(conn, rows, embs, EMB_MODEL)
log.info("Embeddings upserted: %d", len(rows))
except Exception as e:
log.exception("Error en embeddings_worker: %s", e)
time.sleep(SLEEP_IDLE)
if __name__ == "__main__":
main()