rss/cluster_worker.py
2025-11-24 01:40:46 +01:00

341 lines
9.5 KiB
Python

import os
import time
import logging
from typing import List, Dict, Any, Optional
import numpy as np
import psycopg2
import psycopg2.extras
logging.basicConfig(
level=logging.INFO,
format='[eventos] %(asctime)s %(levelname)s: %(message)s'
)
log = logging.getLogger(__name__)
DB = dict(
host=os.environ.get("DB_HOST", "localhost"),
port=int(os.environ.get("DB_PORT", 5432)),
dbname=os.environ.get("DB_NAME", "rss"),
user=os.environ.get("DB_USER", "rss"),
password=os.environ.get("DB_PASS", "x"),
)
EVENT_LANGS = [
s.strip().lower()
for s in os.environ.get("EVENT_LANGS", "es").split(",")
if s.strip()
]
EVENT_BATCH_IDS = int(os.environ.get("EVENT_BATCH_IDS", "200"))
EVENT_SLEEP_IDLE = float(os.environ.get("EVENT_SLEEP_IDLE", "5.0"))
EVENT_DIST_THRESHOLD = float(os.environ.get("EVENT_DIST_THRESHOLD", "0.25"))
EMB_MODEL = os.environ.get(
"EMB_MODEL",
"sentence-transformers/paraphrase-multilingual-mpnet-base-v2",
)
def get_conn():
return psycopg2.connect(**DB)
def ensure_schema(conn):
"""
Asegura que la tabla de eventos y las columnas necesarias existen.
Aquí se asume el esquema original de eventos con centroid JSONB.
"""
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS eventos (
id SERIAL PRIMARY KEY,
creado_en TIMESTAMP NOT NULL DEFAULT NOW(),
actualizado_en TIMESTAMP NOT NULL DEFAULT NOW(),
centroid JSONB NOT NULL,
total_traducciones INTEGER NOT NULL DEFAULT 1
);
"""
)
cur.execute(
"""
ALTER TABLE traducciones
ADD COLUMN IF NOT EXISTS evento_id INTEGER REFERENCES eventos(id);
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_traducciones_evento
ON traducciones(evento_id);
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_traducciones_evento_fecha
ON traducciones(evento_id, noticia_id);
"""
)
cur.execute(
"""
CREATE OR REPLACE FUNCTION actualizar_evento_modificado()
RETURNS TRIGGER AS $$
BEGIN
NEW.actualizado_en = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""
)
cur.execute("DROP TRIGGER IF EXISTS trg_evento_modificado ON eventos;")
cur.execute(
"""
CREATE TRIGGER trg_evento_modificado
BEFORE UPDATE ON eventos
FOR EACH ROW
EXECUTE FUNCTION actualizar_evento_modificado();
"""
)
conn.commit()
def fetch_pending_traducciones(conn) -> List[int]:
"""
Traducciones con status 'done', sin evento asignado
y que ya tienen embedding en traduccion_embeddings para EMB_MODEL.
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT t.id
FROM traducciones t
JOIN traduccion_embeddings e
ON e.traduccion_id = t.id
AND e.model = %s
WHERE t.status = 'done'
AND t.evento_id IS NULL
AND t.lang_to = ANY(%s)
ORDER BY t.id DESC
LIMIT %s;
""",
(EMB_MODEL, EVENT_LANGS, EVENT_BATCH_IDS),
)
rows = cur.fetchall()
return [r[0] for r in rows]
def fetch_embeddings_for(conn, tr_ids: List[int]) -> Dict[int, np.ndarray]:
"""
Devuelve un diccionario {traduccion_id: vector_numpy}
leyendo de traduccion_embeddings.embedding para el EMB_MODEL.
"""
if not tr_ids:
return {}
with conn.cursor() as cur:
cur.execute(
"""
SELECT traduccion_id, embedding
FROM traduccion_embeddings
WHERE traduccion_id = ANY(%s)
AND model = %s;
""",
(tr_ids, EMB_MODEL),
)
rows = cur.fetchall()
out: Dict[int, np.ndarray] = {}
for tr_id, emb in rows:
if not emb:
continue
arr = np.array([float(x or 0.0) for x in emb], dtype="float32")
if arr.size == 0:
continue
out[int(tr_id)] = arr
return out
def fetch_centroids(conn) -> List[Dict[str, Any]]:
"""
Carga todos los centroides actuales desde eventos.
"""
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(
"""
SELECT id, centroid, total_traducciones
FROM eventos
ORDER BY id;
"""
)
rows = cur.fetchall()
centroids: List[Dict[str, Any]] = []
for r in rows:
cid = int(r["id"])
raw = r["centroid"]
cnt = int(r["total_traducciones"] or 1)
if not isinstance(raw, (list, tuple)):
continue
arr = np.array([float(x or 0.0) for x in raw], dtype="float32")
if arr.size == 0:
continue
centroids.append({"id": cid, "vec": arr, "n": cnt})
return centroids
def cosine_distance(a: np.ndarray, b: np.ndarray) -> float:
num = float(np.dot(a, b))
da = float(np.linalg.norm(a))
db = float(np.linalg.norm(b))
denom = da * db
if denom <= 0.0:
return 1.0
cos = num / denom
if cos > 1.0:
cos = 1.0
if cos < -1.0:
cos = -1.0
return 1.0 - cos
def assign_to_event(
conn,
tr_id: int,
vec: np.ndarray,
centroids: List[Dict[str, Any]],
) -> None:
"""
Asigna una traducción a un evento existente (si distancia <= umbral)
o crea un evento nuevo con este vector como centroide.
"""
from psycopg2.extras import Json
if vec is None or vec.size == 0:
return
if not centroids:
centroid_list = [float(x) for x in vec.tolist()]
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO eventos (centroid, total_traducciones)
VALUES (%s, %s)
RETURNING id;
""",
(Json(centroid_list), 1),
)
new_id = cur.fetchone()[0]
cur.execute(
"UPDATE traducciones SET evento_id = %s WHERE id = %s;",
(new_id, tr_id),
)
centroids.append({"id": new_id, "vec": vec.copy(), "n": 1})
return
best_idx: Optional[int] = None
best_dist: float = 1.0
for i, c in enumerate(centroids):
d = cosine_distance(vec, c["vec"])
if d < best_dist:
best_dist = d
best_idx = i
if best_idx is not None and best_dist <= EVENT_DIST_THRESHOLD:
c = centroids[best_idx]
n_old = c["n"]
new_n = n_old + 1
new_vec = (c["vec"] * n_old + vec) / float(new_n)
c["vec"] = new_vec
c["n"] = new_n
centroid_list = [float(x) for x in new_vec.tolist()]
with conn.cursor() as cur:
cur.execute(
"""
UPDATE eventos
SET centroid = %s,
total_traducciones = total_traducciones + 1
WHERE id = %s;
""",
(Json(centroid_list), c["id"]),
)
cur.execute(
"UPDATE traducciones SET evento_id = %s WHERE id = %s;",
(c["id"], tr_id),
)
return
centroid_list = [float(x) for x in vec.tolist()]
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO eventos (centroid, total_traducciones)
VALUES (%s, %s)
RETURNING id;
""",
(Json(centroid_list), 1),
)
new_id = cur.fetchone()[0]
cur.execute(
"UPDATE traducciones SET evento_id = %s WHERE id = %s;",
(new_id, tr_id),
)
centroids.append({"id": new_id, "vec": vec.copy(), "n": 1})
def main():
log.info(
"Iniciando cluster_worker eventos "
"(EVENT_LANGS=%s, BATCH_IDS=%s, DIST_THRESHOLD=%.3f, SLEEP=%.1fs, EMB_MODEL=%s)",
",".join(EVENT_LANGS),
EVENT_BATCH_IDS,
EVENT_DIST_THRESHOLD,
EVENT_SLEEP_IDLE,
EMB_MODEL,
)
while True:
try:
with get_conn() as conn:
ensure_schema(conn)
pending_ids = fetch_pending_traducciones(conn)
if not pending_ids:
time.sleep(EVENT_SLEEP_IDLE)
continue
log.info("Traducciones pendientes de asignar evento: %d", len(pending_ids))
emb_by_tr = fetch_embeddings_for(conn, pending_ids)
if not emb_by_tr:
log.warning("No se encontraron embeddings para las traducciones pendientes.")
time.sleep(EVENT_SLEEP_IDLE)
continue
centroids = fetch_centroids(conn)
log.info("Centroides cargados: %d", len(centroids))
processed = 0
for tr_id in pending_ids:
vec = emb_by_tr.get(tr_id)
if vec is None:
continue
assign_to_event(conn, tr_id, vec, centroids)
processed += 1
conn.commit()
log.info("Asignación de eventos completada. Traducciones procesadas: %d", processed)
except Exception as e:
log.exception("Error en cluster_worker: %s", e)
time.sleep(EVENT_SLEEP_IDLE)
if __name__ == "__main__":
main()