import os import time import logging from typing import List, Dict, Any, Optional import numpy as np import psycopg2 import psycopg2.extras from psycopg2.extras import Json logging.basicConfig( level=logging.INFO, format='[eventos] %(asctime)s %(levelname)s: %(message)s' ) log = logging.getLogger(__name__) DB = dict( host=os.environ.get("DB_HOST", "localhost"), port=int(os.environ.get("DB_PORT", 5432)), dbname=os.environ.get("DB_NAME", "rss"), user=os.environ.get("DB_USER", "rss"), password=os.environ.get("DB_PASS", "x"), ) EVENT_LANGS = [ s.strip().lower() for s in os.environ.get("EVENT_LANGS", "es").split(",") if s.strip() ] EVENT_BATCH_IDS = int(os.environ.get("EVENT_BATCH_IDS", "200")) EVENT_SLEEP_IDLE = float(os.environ.get("EVENT_SLEEP_IDLE", "5.0")) EVENT_DIST_THRESHOLD = float(os.environ.get("EVENT_DIST_THRESHOLD", "0.25")) EMB_MODEL = os.environ.get( "EMB_MODEL", "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", ) def get_conn(): return psycopg2.connect(**DB) def ensure_schema(conn): with conn.cursor() as cur: cur.execute( """ CREATE INDEX IF NOT EXISTS idx_traducciones_evento ON traducciones(evento_id); """ ) cur.execute( """ CREATE INDEX IF NOT EXISTS idx_traducciones_evento_fecha ON traducciones(evento_id, noticia_id); """ ) conn.commit() def fetch_pending_traducciones(conn) -> List[int]: with conn.cursor() as cur: cur.execute( """ SELECT t.id FROM traducciones t JOIN traduccion_embeddings e ON e.traduccion_id = t.id AND e.model = %s WHERE t.status = 'done' AND t.evento_id IS NULL AND t.lang_to = ANY(%s) ORDER BY t.id DESC LIMIT %s; """, (EMB_MODEL, EVENT_LANGS, EVENT_BATCH_IDS), ) rows = cur.fetchall() return [r[0] for r in rows] def fetch_embeddings_for(conn, tr_ids: List[int]) -> Dict[int, np.ndarray]: if not tr_ids: return {} with conn.cursor() as cur: cur.execute( """ SELECT traduccion_id, embedding FROM traduccion_embeddings WHERE traduccion_id = ANY(%s) AND model = %s; """, (tr_ids, EMB_MODEL), ) rows = cur.fetchall() out: Dict[int, np.ndarray] = {} for tr_id, emb in rows: if not emb: continue arr = np.array([float(x or 0.0) for x in emb], dtype="float32") if arr.size == 0: continue out[int(tr_id)] = arr return out def fetch_centroids(conn) -> List[Dict[str, Any]]: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute( """ SELECT id, centroid, total_traducciones FROM eventos ORDER BY id; """ ) rows = cur.fetchall() centroids: List[Dict[str, Any]] = [] for r in rows: cid = int(r["id"]) raw = r["centroid"] cnt = int(r["total_traducciones"] or 1) if not isinstance(raw, (list, tuple)): continue arr = np.array([float(x or 0.0) for x in raw], dtype="float32") if arr.size == 0: continue centroids.append({"id": cid, "vec": arr, "n": cnt}) return centroids def cosine_distance(a: np.ndarray, b: np.ndarray) -> float: num = float(np.dot(a, b)) da = float(np.linalg.norm(a)) db = float(np.linalg.norm(b)) denom = da * db if denom <= 0.0: return 1.0 cos = num / denom if cos > 1.0: cos = 1.0 if cos < -1.0: cos = -1.0 return 1.0 - cos def fetch_traduccion_info(conn, tr_id: int) -> Optional[Dict[str, Any]]: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute( """ SELECT t.id AS traduccion_id, t.noticia_id AS noticia_id, n.fecha AS fecha, COALESCE(NULLIF(t.titulo_trad, ''), n.titulo) AS titulo_evento FROM traducciones t JOIN noticias n ON n.id = t.noticia_id WHERE t.id = %s; """, (tr_id,), ) row = cur.fetchone() if not row: return None return { "traduccion_id": int(row["traduccion_id"]), "noticia_id": row["noticia_id"], "fecha": row["fecha"], "titulo_evento": row["titulo_evento"], } def _insert_evento_noticia(cur, evento_id: int, info: Dict[str, Any]) -> None: if not info or not info.get("noticia_id"): return cur.execute( """ INSERT INTO eventos_noticias (evento_id, noticia_id, traduccion_id) VALUES (%s, %s, %s) ON CONFLICT (evento_id, noticia_id) DO NOTHING; """, (evento_id, info["noticia_id"], info["traduccion_id"]), ) def assign_to_event( conn, tr_id: int, vec: np.ndarray, centroids: List[Dict[str, Any]], ) -> None: if vec is None or vec.size == 0: return info = fetch_traduccion_info(conn, tr_id) if not centroids: centroid_list = [float(x) for x in vec.tolist()] with conn.cursor() as cur: if info and info.get("fecha"): cur.execute( """ INSERT INTO eventos (centroid, total_traducciones, fecha_inicio, fecha_fin, n_noticias, titulo) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id; """, ( Json(centroid_list), 1, info["fecha"], info["fecha"], 1, info.get("titulo_evento"), ), ) else: cur.execute( """ INSERT INTO eventos (centroid, total_traducciones) VALUES (%s, %s) RETURNING id; """, (Json(centroid_list), 1), ) new_id = cur.fetchone()[0] cur.execute( "UPDATE traducciones SET evento_id = %s WHERE id = %s;", (new_id, tr_id), ) _insert_evento_noticia(cur, new_id, info or {}) centroids.append({"id": new_id, "vec": vec.copy(), "n": 1}) return best_idx: Optional[int] = None best_dist: float = 1.0 for i, c in enumerate(centroids): d = cosine_distance(vec, c["vec"]) if d < best_dist: best_dist = d best_idx = i with conn.cursor() as cur: if best_idx is not None and best_dist <= EVENT_DIST_THRESHOLD: c = centroids[best_idx] n_old = c["n"] new_n = n_old + 1 new_vec = (c["vec"] * n_old + vec) / float(new_n) c["vec"] = new_vec c["n"] = new_n centroid_list = [float(x) for x in new_vec.tolist()] if info and info.get("fecha"): cur.execute( """ UPDATE eventos SET centroid = %s, total_traducciones = total_traducciones + 1, fecha_inicio = COALESCE(LEAST(fecha_inicio, %s), %s), fecha_fin = COALESCE(GREATEST(fecha_fin, %s), %s), n_noticias = n_noticias + 1 WHERE id = %s; """, ( Json(centroid_list), info["fecha"], info["fecha"], info["fecha"], info["fecha"], c["id"], ), ) else: cur.execute( """ UPDATE eventos SET centroid = %s, total_traducciones = total_traducciones + 1 WHERE id = %s; """, (Json(centroid_list), c["id"]), ) cur.execute( "UPDATE traducciones SET evento_id = %s WHERE id = %s;", (c["id"], tr_id), ) _insert_evento_noticia(cur, c["id"], info or {}) return centroid_list = [float(x) for x in vec.tolist()] if info and info.get("fecha"): cur.execute( """ INSERT INTO eventos (centroid, total_traducciones, fecha_inicio, fecha_fin, n_noticias, titulo) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id; """, ( Json(centroid_list), 1, info["fecha"], info["fecha"], 1, info.get("titulo_evento"), ), ) else: cur.execute( """ INSERT INTO eventos (centroid, total_traducciones) VALUES (%s, %s) RETURNING id; """, (Json(centroid_list), 1), ) new_id = cur.fetchone()[0] cur.execute( "UPDATE traducciones SET evento_id = %s WHERE id = %s;", (new_id, tr_id), ) _insert_evento_noticia(cur, new_id, info or {}) centroids.append({"id": new_id, "vec": vec.copy(), "n": 1}) def main(): log.info( "Iniciando cluster_worker eventos " "(EVENT_LANGS=%s, BATCH_IDS=%s, DIST_THRESHOLD=%.3f, SLEEP=%.1fs, EMB_MODEL=%s)", ",".join(EVENT_LANGS), EVENT_BATCH_IDS, EVENT_DIST_THRESHOLD, EVENT_SLEEP_IDLE, EMB_MODEL, ) while True: try: with get_conn() as conn: ensure_schema(conn) pending_ids = fetch_pending_traducciones(conn) if not pending_ids: time.sleep(EVENT_SLEEP_IDLE) continue log.info( "Traducciones pendientes de asignar evento: %d", len(pending_ids), ) emb_by_tr = fetch_embeddings_for(conn, pending_ids) if not emb_by_tr: log.warning( "No se encontraron embeddings para las traducciones pendientes." ) time.sleep(EVENT_SLEEP_IDLE) continue centroids = fetch_centroids(conn) log.info("Centroides cargados: %d", len(centroids)) processed = 0 for tr_id in pending_ids: vec = emb_by_tr.get(tr_id) if vec is None: continue assign_to_event(conn, tr_id, vec, centroids) processed += 1 conn.commit() log.info( "Asignación de eventos completada. Traducciones procesadas: %d", processed, ) except Exception as e: log.exception("Error en cluster_worker: %s", e) time.sleep(EVENT_SLEEP_IDLE) if __name__ == "__main__": main()