rss/cluster_worker.py
2025-11-25 03:13:54 +01:00

400 lines
12 KiB
Python

import os
import time
import logging
from typing import List, Dict, Any, Optional
import numpy as np
import psycopg2
import psycopg2.extras
from psycopg2.extras import Json
logging.basicConfig(
level=logging.INFO,
format='[eventos] %(asctime)s %(levelname)s: %(message)s'
)
log = logging.getLogger(__name__)
DB = dict(
host=os.environ.get("DB_HOST", "localhost"),
port=int(os.environ.get("DB_PORT", 5432)),
dbname=os.environ.get("DB_NAME", "rss"),
user=os.environ.get("DB_USER", "rss"),
password=os.environ.get("DB_PASS", "x"),
)
EVENT_LANGS = [
s.strip().lower()
for s in os.environ.get("EVENT_LANGS", "es").split(",")
if s.strip()
]
EVENT_BATCH_IDS = int(os.environ.get("EVENT_BATCH_IDS", "200"))
EVENT_SLEEP_IDLE = float(os.environ.get("EVENT_SLEEP_IDLE", "5.0"))
EVENT_DIST_THRESHOLD = float(os.environ.get("EVENT_DIST_THRESHOLD", "0.25"))
EMB_MODEL = os.environ.get(
"EMB_MODEL",
"sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
)
def get_conn():
return psycopg2.connect(**DB)
def ensure_schema(conn):
with conn.cursor() as cur:
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_traducciones_evento
ON traducciones(evento_id);
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_traducciones_evento_fecha
ON traducciones(evento_id, noticia_id);
"""
)
conn.commit()
def fetch_pending_traducciones(conn) -> List[int]:
with conn.cursor() as cur:
cur.execute(
"""
SELECT t.id
FROM traducciones t
JOIN traduccion_embeddings e
ON e.traduccion_id = t.id
AND e.model = %s
WHERE t.status = 'done'
AND t.evento_id IS NULL
AND t.lang_to = ANY(%s)
ORDER BY t.id DESC
LIMIT %s;
""",
(EMB_MODEL, EVENT_LANGS, EVENT_BATCH_IDS),
)
rows = cur.fetchall()
return [r[0] for r in rows]
def fetch_embeddings_for(conn, tr_ids: List[int]) -> Dict[int, np.ndarray]:
if not tr_ids:
return {}
with conn.cursor() as cur:
cur.execute(
"""
SELECT traduccion_id, embedding
FROM traduccion_embeddings
WHERE traduccion_id = ANY(%s)
AND model = %s;
""",
(tr_ids, EMB_MODEL),
)
rows = cur.fetchall()
out: Dict[int, np.ndarray] = {}
for tr_id, emb in rows:
if not emb:
continue
arr = np.array([float(x or 0.0) for x in emb], dtype="float32")
if arr.size == 0:
continue
out[int(tr_id)] = arr
return out
def fetch_centroids(conn) -> List[Dict[str, Any]]:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(
"""
SELECT id, centroid, total_traducciones
FROM eventos
ORDER BY id;
"""
)
rows = cur.fetchall()
centroids: List[Dict[str, Any]] = []
for r in rows:
cid = int(r["id"])
raw = r["centroid"]
cnt = int(r["total_traducciones"] or 1)
if not isinstance(raw, (list, tuple)):
continue
arr = np.array([float(x or 0.0) for x in raw], dtype="float32")
if arr.size == 0:
continue
centroids.append({"id": cid, "vec": arr, "n": cnt})
return centroids
def cosine_distance(a: np.ndarray, b: np.ndarray) -> float:
num = float(np.dot(a, b))
da = float(np.linalg.norm(a))
db = float(np.linalg.norm(b))
denom = da * db
if denom <= 0.0:
return 1.0
cos = num / denom
if cos > 1.0:
cos = 1.0
if cos < -1.0:
cos = -1.0
return 1.0 - cos
def fetch_traduccion_info(conn, tr_id: int) -> Optional[Dict[str, Any]]:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(
"""
SELECT
t.id AS traduccion_id,
t.noticia_id AS noticia_id,
n.fecha AS fecha,
COALESCE(NULLIF(t.titulo_trad, ''), n.titulo) AS titulo_evento
FROM traducciones t
JOIN noticias n ON n.id = t.noticia_id
WHERE t.id = %s;
""",
(tr_id,),
)
row = cur.fetchone()
if not row:
return None
return {
"traduccion_id": int(row["traduccion_id"]),
"noticia_id": row["noticia_id"],
"fecha": row["fecha"],
"titulo_evento": row["titulo_evento"],
}
def _insert_evento_noticia(cur, evento_id: int, info: Dict[str, Any]) -> None:
if not info or not info.get("noticia_id"):
return
cur.execute(
"""
INSERT INTO eventos_noticias (evento_id, noticia_id, traduccion_id)
VALUES (%s, %s, %s)
ON CONFLICT (evento_id, noticia_id) DO NOTHING;
""",
(evento_id, info["noticia_id"], info["traduccion_id"]),
)
def assign_to_event(
conn,
tr_id: int,
vec: np.ndarray,
centroids: List[Dict[str, Any]],
) -> None:
if vec is None or vec.size == 0:
return
info = fetch_traduccion_info(conn, tr_id)
if not centroids:
centroid_list = [float(x) for x in vec.tolist()]
with conn.cursor() as cur:
if info and info.get("fecha"):
cur.execute(
"""
INSERT INTO eventos (centroid, total_traducciones,
fecha_inicio, fecha_fin, n_noticias, titulo)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id;
""",
(
Json(centroid_list),
1,
info["fecha"],
info["fecha"],
1,
info.get("titulo_evento"),
),
)
else:
cur.execute(
"""
INSERT INTO eventos (centroid, total_traducciones)
VALUES (%s, %s)
RETURNING id;
""",
(Json(centroid_list), 1),
)
new_id = cur.fetchone()[0]
cur.execute(
"UPDATE traducciones SET evento_id = %s WHERE id = %s;",
(new_id, tr_id),
)
_insert_evento_noticia(cur, new_id, info or {})
centroids.append({"id": new_id, "vec": vec.copy(), "n": 1})
return
best_idx: Optional[int] = None
best_dist: float = 1.0
for i, c in enumerate(centroids):
d = cosine_distance(vec, c["vec"])
if d < best_dist:
best_dist = d
best_idx = i
with conn.cursor() as cur:
if best_idx is not None and best_dist <= EVENT_DIST_THRESHOLD:
c = centroids[best_idx]
n_old = c["n"]
new_n = n_old + 1
new_vec = (c["vec"] * n_old + vec) / float(new_n)
c["vec"] = new_vec
c["n"] = new_n
centroid_list = [float(x) for x in new_vec.tolist()]
if info and info.get("fecha"):
cur.execute(
"""
UPDATE eventos
SET centroid = %s,
total_traducciones = total_traducciones + 1,
fecha_inicio = COALESCE(LEAST(fecha_inicio, %s), %s),
fecha_fin = COALESCE(GREATEST(fecha_fin, %s), %s),
n_noticias = n_noticias + 1
WHERE id = %s;
""",
(
Json(centroid_list),
info["fecha"],
info["fecha"],
info["fecha"],
info["fecha"],
c["id"],
),
)
else:
cur.execute(
"""
UPDATE eventos
SET centroid = %s,
total_traducciones = total_traducciones + 1
WHERE id = %s;
""",
(Json(centroid_list), c["id"]),
)
cur.execute(
"UPDATE traducciones SET evento_id = %s WHERE id = %s;",
(c["id"], tr_id),
)
_insert_evento_noticia(cur, c["id"], info or {})
return
centroid_list = [float(x) for x in vec.tolist()]
if info and info.get("fecha"):
cur.execute(
"""
INSERT INTO eventos (centroid, total_traducciones,
fecha_inicio, fecha_fin, n_noticias, titulo)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id;
""",
(
Json(centroid_list),
1,
info["fecha"],
info["fecha"],
1,
info.get("titulo_evento"),
),
)
else:
cur.execute(
"""
INSERT INTO eventos (centroid, total_traducciones)
VALUES (%s, %s)
RETURNING id;
""",
(Json(centroid_list), 1),
)
new_id = cur.fetchone()[0]
cur.execute(
"UPDATE traducciones SET evento_id = %s WHERE id = %s;",
(new_id, tr_id),
)
_insert_evento_noticia(cur, new_id, info or {})
centroids.append({"id": new_id, "vec": vec.copy(), "n": 1})
def main():
log.info(
"Iniciando cluster_worker eventos "
"(EVENT_LANGS=%s, BATCH_IDS=%s, DIST_THRESHOLD=%.3f, SLEEP=%.1fs, EMB_MODEL=%s)",
",".join(EVENT_LANGS),
EVENT_BATCH_IDS,
EVENT_DIST_THRESHOLD,
EVENT_SLEEP_IDLE,
EMB_MODEL,
)
while True:
try:
with get_conn() as conn:
ensure_schema(conn)
pending_ids = fetch_pending_traducciones(conn)
if not pending_ids:
time.sleep(EVENT_SLEEP_IDLE)
continue
log.info(
"Traducciones pendientes de asignar evento: %d",
len(pending_ids),
)
emb_by_tr = fetch_embeddings_for(conn, pending_ids)
if not emb_by_tr:
log.warning(
"No se encontraron embeddings para las traducciones pendientes."
)
time.sleep(EVENT_SLEEP_IDLE)
continue
centroids = fetch_centroids(conn)
log.info("Centroides cargados: %d", len(centroids))
processed = 0
for tr_id in pending_ids:
vec = emb_by_tr.get(tr_id)
if vec is None:
continue
assign_to_event(conn, tr_id, vec, centroids)
processed += 1
conn.commit()
log.info(
"Asignación de eventos completada. Traducciones procesadas: %d",
processed,
)
except Exception as e:
log.exception("Error en cluster_worker: %s", e)
time.sleep(EVENT_SLEEP_IDLE)
if __name__ == "__main__":
main()