rss/cluster_worker.py
2025-11-24 23:06:26 +01:00

445 lines
13 KiB
Python

import os
import time
import logging
from typing import List, Dict, Any, Optional
import numpy as np
import psycopg2
import psycopg2.extras
from psycopg2.extras import Json
logging.basicConfig(
level=logging.INFO,
format='[eventos] %(asctime)s %(levelname)s: %(message)s'
)
log = logging.getLogger(__name__)
DB = dict(
host=os.environ.get("DB_HOST", "localhost"),
port=int(os.environ.get("DB_PORT", 5432)),
dbname=os.environ.get("DB_NAME", "rss"),
user=os.environ.get("DB_USER", "rss"),
password=os.environ.get("DB_PASS", "x"),
)
EVENT_LANGS = [
s.strip().lower()
for s in os.environ.get("EVENT_LANGS", "es").split(",")
if s.strip()
]
EVENT_BATCH_IDS = int(os.environ.get("EVENT_BATCH_IDS", "200"))
EVENT_SLEEP_IDLE = float(os.environ.get("EVENT_SLEEP_IDLE", "5.0"))
EVENT_DIST_THRESHOLD = float(os.environ.get("EVENT_DIST_THRESHOLD", "0.25"))
EMB_MODEL = os.environ.get(
"EMB_MODEL",
"sentence-transformers/paraphrase-multilingual-mpnet-base-v2",
)
def get_conn():
return psycopg2.connect(**DB)
def ensure_schema(conn):
"""
Asumimos que las tablas y columnas (eventos, traducciones.evento_id,
eventos_noticias, función/trigger) ya existen por los scripts init-db.
Aquí solo nos aseguramos de que existan ciertos índices clave
(idempotente).
"""
with conn.cursor() as cur:
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_traducciones_evento
ON traducciones(evento_id);
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_traducciones_evento_fecha
ON traducciones(evento_id, noticia_id);
"""
)
conn.commit()
def fetch_pending_traducciones(conn) -> List[int]:
"""
Traducciones con status 'done', sin evento asignado
y que ya tienen embedding en traduccion_embeddings para EMB_MODEL.
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT t.id
FROM traducciones t
JOIN traduccion_embeddings e
ON e.traduccion_id = t.id
AND e.model = %s
WHERE t.status = 'done'
AND t.evento_id IS NULL
AND t.lang_to = ANY(%s)
ORDER BY t.id DESC
LIMIT %s;
""",
(EMB_MODEL, EVENT_LANGS, EVENT_BATCH_IDS),
)
rows = cur.fetchall()
return [r[0] for r in rows]
def fetch_embeddings_for(conn, tr_ids: List[int]) -> Dict[int, np.ndarray]:
"""
Devuelve un diccionario {traduccion_id: vector_numpy}
leyendo de traduccion_embeddings.embedding para el EMB_MODEL.
"""
if not tr_ids:
return {}
with conn.cursor() as cur:
cur.execute(
"""
SELECT traduccion_id, embedding
FROM traduccion_embeddings
WHERE traduccion_id = ANY(%s)
AND model = %s;
""",
(tr_ids, EMB_MODEL),
)
rows = cur.fetchall()
out: Dict[int, np.ndarray] = {}
for tr_id, emb in rows:
if not emb:
continue
arr = np.array([float(x or 0.0) for x in emb], dtype="float32")
if arr.size == 0:
continue
out[int(tr_id)] = arr
return out
def fetch_centroids(conn) -> List[Dict[str, Any]]:
"""
Carga todos los centroides actuales desde eventos.
Solo usamos campos de clustering: id, centroid, total_traducciones.
"""
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(
"""
SELECT id, centroid, total_traducciones
FROM eventos
ORDER BY id;
"""
)
rows = cur.fetchall()
centroids: List[Dict[str, Any]] = []
for r in rows:
cid = int(r["id"])
raw = r["centroid"]
cnt = int(r["total_traducciones"] or 1)
if not isinstance(raw, (list, tuple)):
# centroid se almacena como JSONB array → en Python suele llegar como list
continue
arr = np.array([float(x or 0.0) for x in raw], dtype="float32")
if arr.size == 0:
continue
centroids.append({"id": cid, "vec": arr, "n": cnt})
return centroids
def cosine_distance(a: np.ndarray, b: np.ndarray) -> float:
num = float(np.dot(a, b))
da = float(np.linalg.norm(a))
db = float(np.linalg.norm(b))
denom = da * db
if denom <= 0.0:
return 1.0
cos = num / denom
if cos > 1.0:
cos = 1.0
if cos < -1.0:
cos = -1.0
return 1.0 - cos
def fetch_traduccion_info(conn, tr_id: int) -> Optional[Dict[str, Any]]:
"""
Devuelve info básica para un tr_id:
- noticia_id
- fecha de la noticia
- un título “representativo” para el evento (traducido u original).
"""
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(
"""
SELECT
t.id AS traduccion_id,
t.noticia_id AS noticia_id,
n.fecha AS fecha,
COALESCE(NULLIF(t.titulo_trad, ''), n.titulo) AS titulo_evento
FROM traducciones t
JOIN noticias n ON n.id = t.noticia_id
WHERE t.id = %s;
""",
(tr_id,),
)
row = cur.fetchone()
if not row:
return None
return {
"traduccion_id": int(row["traduccion_id"]),
"noticia_id": row["noticia_id"],
"fecha": row["fecha"],
"titulo_evento": row["titulo_evento"],
}
def _insert_evento_noticia(cur, evento_id: int, info: Dict[str, Any]) -> None:
"""
Inserta relación en eventos_noticias (idempotente).
"""
if not info or not info.get("noticia_id"):
return
cur.execute(
"""
INSERT INTO eventos_noticias (evento_id, noticia_id, traduccion_id)
VALUES (%s, %s, %s)
ON CONFLICT (evento_id, traduccion_id) DO NOTHING;
""",
(evento_id, info["noticia_id"], info["traduccion_id"]),
)
def assign_to_event(
conn,
tr_id: int,
vec: np.ndarray,
centroids: List[Dict[str, Any]],
) -> None:
"""
Asigna una traducción a un evento existente (si distancia <= umbral)
o crea un evento nuevo con este vector como centroide.
Además:
- Actualiza fecha_inicio, fecha_fin, n_noticias del evento.
- Rellena eventos_noticias (evento_id, noticia_id, traduccion_id).
"""
if vec is None or vec.size == 0:
return
info = fetch_traduccion_info(conn, tr_id)
# Si no hay centroides todavía → primer evento
if not centroids:
centroid_list = [float(x) for x in vec.tolist()]
with conn.cursor() as cur:
if info and info.get("fecha"):
cur.execute(
"""
INSERT INTO eventos (centroid, total_traducciones,
fecha_inicio, fecha_fin, n_noticias, titulo)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id;
""",
(
Json(centroid_list),
1,
info["fecha"],
info["fecha"],
1,
info.get("titulo_evento"),
),
)
else:
# Fallback mínimo si no hay info de noticia
cur.execute(
"""
INSERT INTO eventos (centroid, total_traducciones)
VALUES (%s, %s)
RETURNING id;
""",
(Json(centroid_list), 1),
)
new_id = cur.fetchone()[0]
# Vincular traducción al evento
cur.execute(
"UPDATE traducciones SET evento_id = %s WHERE id = %s;",
(new_id, tr_id),
)
# Rellenar tabla de relación
_insert_evento_noticia(cur, new_id, info or {})
centroids.append({"id": new_id, "vec": vec.copy(), "n": 1})
return
# Buscar el centroide más cercano
best_idx: Optional[int] = None
best_dist: float = 1.0
for i, c in enumerate(centroids):
d = cosine_distance(vec, c["vec"])
if d < best_dist:
best_dist = d
best_idx = i
with conn.cursor() as cur:
# Asignar a evento existente si está por debajo del umbral
if best_idx is not None and best_dist <= EVENT_DIST_THRESHOLD:
c = centroids[best_idx]
n_old = c["n"]
new_n = n_old + 1
new_vec = (c["vec"] * n_old + vec) / float(new_n)
c["vec"] = new_vec
c["n"] = new_n
centroid_list = [float(x) for x in new_vec.tolist()]
if info and info.get("fecha"):
cur.execute(
"""
UPDATE eventos
SET centroid = %s,
total_traducciones = total_traducciones + 1,
fecha_inicio = COALESCE(LEAST(fecha_inicio, %s), %s),
fecha_fin = COALESCE(GREATEST(fecha_fin, %s), %s),
n_noticias = n_noticias + 1
WHERE id = %s;
""",
(
Json(centroid_list),
info["fecha"],
info["fecha"],
info["fecha"],
info["fecha"],
c["id"],
),
)
else:
# Sin info de fecha: solo actualizamos centroid/contador
cur.execute(
"""
UPDATE eventos
SET centroid = %s,
total_traducciones = total_traducciones + 1
WHERE id = %s;
""",
(Json(centroid_list), c["id"]),
)
# Vincular traducción y relación
cur.execute(
"UPDATE traducciones SET evento_id = %s WHERE id = %s;",
(c["id"], tr_id),
)
_insert_evento_noticia(cur, c["id"], info or {})
return
# Si no hay evento adecuado → crear uno nuevo
centroid_list = [float(x) for x in vec.tolist()]
if info and info.get("fecha"):
cur.execute(
"""
INSERT INTO eventos (centroid, total_traducciones,
fecha_inicio, fecha_fin, n_noticias, titulo)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id;
""",
(
Json(centroid_list),
1,
info["fecha"],
info["fecha"],
1,
info.get("titulo_evento"),
),
)
else:
cur.execute(
"""
INSERT INTO eventos (centroid, total_traducciones)
VALUES (%s, %s)
RETURNING id;
""",
(Json(centroid_list), 1),
)
new_id = cur.fetchone()[0]
cur.execute(
"UPDATE traducciones SET evento_id = %s WHERE id = %s;",
(new_id, tr_id),
)
_insert_evento_noticia(cur, new_id, info or {})
centroids.append({"id": new_id, "vec": vec.copy(), "n": 1})
def main():
log.info(
"Iniciando cluster_worker eventos "
"(EVENT_LANGS=%s, BATCH_IDS=%s, DIST_THRESHOLD=%.3f, SLEEP=%.1fs, EMB_MODEL=%s)",
",".join(EVENT_LANGS),
EVENT_BATCH_IDS,
EVENT_DIST_THRESHOLD,
EVENT_SLEEP_IDLE,
EMB_MODEL,
)
while True:
try:
with get_conn() as conn:
ensure_schema(conn)
pending_ids = fetch_pending_traducciones(conn)
if not pending_ids:
time.sleep(EVENT_SLEEP_IDLE)
continue
log.info(
"Traducciones pendientes de asignar evento: %d",
len(pending_ids),
)
emb_by_tr = fetch_embeddings_for(conn, pending_ids)
if not emb_by_tr:
log.warning(
"No se encontraron embeddings para las traducciones pendientes."
)
time.sleep(EVENT_SLEEP_IDLE)
continue
centroids = fetch_centroids(conn)
log.info("Centroides cargados: %d", len(centroids))
processed = 0
for tr_id in pending_ids:
vec = emb_by_tr.get(tr_id)
if vec is None:
continue
assign_to_event(conn, tr_id, vec, centroids)
processed += 1
conn.commit()
log.info(
"Asignación de eventos completada. Traducciones procesadas: %d",
processed,
)
except Exception as e:
log.exception("Error en cluster_worker: %s", e)
time.sleep(EVENT_SLEEP_IDLE)
if __name__ == "__main__":
main()