import os import time import logging from typing import List, Dict, Any, Optional import numpy as np import psycopg2 import psycopg2.extras logging.basicConfig( level=logging.INFO, format='[eventos] %(asctime)s %(levelname)s: %(message)s' ) log = logging.getLogger(__name__) DB = dict( host=os.environ.get("DB_HOST", "localhost"), port=int(os.environ.get("DB_PORT", 5432)), dbname=os.environ.get("DB_NAME", "rss"), user=os.environ.get("DB_USER", "rss"), password=os.environ.get("DB_PASS", "x"), ) EVENT_LANGS = [ s.strip().lower() for s in os.environ.get("EVENT_LANGS", "es").split(",") if s.strip() ] EVENT_BATCH_IDS = int(os.environ.get("EVENT_BATCH_IDS", "200")) EVENT_SLEEP_IDLE = float(os.environ.get("EVENT_SLEEP_IDLE", "5.0")) EVENT_DIST_THRESHOLD = float(os.environ.get("EVENT_DIST_THRESHOLD", "0.25")) def get_conn(): return psycopg2.connect(**DB) def ensure_schema(conn): with conn.cursor() as cur: cur.execute( """ CREATE TABLE IF NOT EXISTS eventos ( id SERIAL PRIMARY KEY, creado_en TIMESTAMP NOT NULL DEFAULT NOW(), actualizado_en TIMESTAMP NOT NULL DEFAULT NOW(), centroid JSONB NOT NULL, total_traducciones INTEGER NOT NULL DEFAULT 1 ); """ ) cur.execute( """ ALTER TABLE traducciones ADD COLUMN IF NOT EXISTS evento_id INTEGER REFERENCES eventos(id); """ ) cur.execute( """ CREATE INDEX IF NOT EXISTS idx_traducciones_evento ON traducciones(evento_id); """ ) cur.execute( """ CREATE INDEX IF NOT EXISTS idx_traducciones_evento_fecha ON traducciones(evento_id, noticia_id); """ ) cur.execute( """ CREATE OR REPLACE FUNCTION actualizar_evento_modificado() RETURNS TRIGGER AS $$ BEGIN NEW.actualizado_en = NOW(); RETURN NEW; END; $$ LANGUAGE plpgsql; """ ) cur.execute("DROP TRIGGER IF EXISTS trg_evento_modificado ON eventos;") cur.execute( """ CREATE TRIGGER trg_evento_modificado BEFORE UPDATE ON eventos FOR EACH ROW EXECUTE FUNCTION actualizar_evento_modificado(); """ ) conn.commit() def fetch_pending_traducciones(conn) -> List[int]: with conn.cursor() as cur: cur.execute( """ SELECT t.id FROM traducciones t JOIN embeddings e ON e.traduccion_id = t.id WHERE t.status = 'done' AND t.evento_id IS NULL AND t.lang_to = ANY(%s) ORDER BY t.id DESC LIMIT %s; """, (EVENT_LANGS, EVENT_BATCH_IDS), ) rows = cur.fetchall() return [r[0] for r in rows] def fetch_embeddings_for(conn, tr_ids: List[int]) -> Dict[int, np.ndarray]: if not tr_ids: return {} with conn.cursor() as cur: cur.execute( """ SELECT traduccion_id, vec FROM embeddings WHERE traduccion_id = ANY(%s); """, (tr_ids,), ) rows = cur.fetchall() out: Dict[int, np.ndarray] = {} for tr_id, vec in rows: if not vec: continue arr = np.array([float(x or 0.0) for x in vec], dtype="float32") if arr.size == 0: continue out[int(tr_id)] = arr return out def fetch_centroids(conn) -> List[Dict[str, Any]]: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute( """ SELECT id, centroid, total_traducciones FROM eventos ORDER BY id; """ ) rows = cur.fetchall() centroids: List[Dict[str, Any]] = [] for r in rows: cid = int(r["id"]) raw = r["centroid"] cnt = int(r["total_traducciones"] or 1) if not isinstance(raw, (list, tuple)): continue arr = np.array([float(x or 0.0) for x in raw], dtype="float32") if arr.size == 0: continue centroids.append({"id": cid, "vec": arr, "n": cnt}) return centroids def cosine_distance(a: np.ndarray, b: np.ndarray) -> float: num = float(np.dot(a, b)) da = float(np.linalg.norm(a)) db = float(np.linalg.norm(b)) denom = da * db if denom <= 0.0: return 1.0 cos = num / denom if cos > 1.0: cos = 1.0 if cos < -1.0: cos = -1.0 return 1.0 - cos def assign_to_event( conn, tr_id: int, vec: np.ndarray, centroids: List[Dict[str, Any]], ) -> None: from psycopg2.extras import Json if vec is None or vec.size == 0: return if not centroids: centroid_list = [float(x) for x in vec.tolist()] with conn.cursor() as cur: cur.execute( """ INSERT INTO eventos (centroid, total_traducciones) VALUES (%s, %s) RETURNING id; """, (Json(centroid_list), 1), ) new_id = cur.fetchone()[0] cur.execute( "UPDATE traducciones SET evento_id = %s WHERE id = %s;", (new_id, tr_id), ) centroids.append({"id": new_id, "vec": vec.copy(), "n": 1}) return best_idx: Optional[int] = None best_dist: float = 1.0 for i, c in enumerate(centroids): d = cosine_distance(vec, c["vec"]) if d < best_dist: best_dist = d best_idx = i if best_idx is not None and best_dist <= EVENT_DIST_THRESHOLD: c = centroids[best_idx] n_old = c["n"] new_n = n_old + 1 new_vec = (c["vec"] * n_old + vec) / float(new_n) c["vec"] = new_vec c["n"] = new_n centroid_list = [float(x) for x in new_vec.tolist()] with conn.cursor() as cur: cur.execute( """ UPDATE eventos SET centroid = %s, total_traducciones = total_traducciones + 1 WHERE id = %s; """, (Json(centroid_list), c["id"]), ) cur.execute( "UPDATE traducciones SET evento_id = %s WHERE id = %s;", (c["id"], tr_id), ) return centroid_list = [float(x) for x in vec.tolist()] with conn.cursor() as cur: cur.execute( """ INSERT INTO eventos (centroid, total_traducciones) VALUES (%s, %s) RETURNING id; """, (Json(centroid_list), 1), ) new_id = cur.fetchone()[0] cur.execute( "UPDATE traducciones SET evento_id = %s WHERE id = %s;", (new_id, tr_id), ) centroids.append({"id": new_id, "vec": vec.copy(), "n": 1}) def main(): log.info( "Iniciando cluster_worker eventos " "(EVENT_LANGS=%s, BATCH_IDS=%s, DIST_THRESHOLD=%.3f, SLEEP=%.1fs)", ",".join(EVENT_LANGS), EVENT_BATCH_IDS, EVENT_DIST_THRESHOLD, EVENT_SLEEP_IDLE, ) while True: try: with get_conn() as conn: ensure_schema(conn) pending_ids = fetch_pending_traducciones(conn) if not pending_ids: time.sleep(EVENT_SLEEP_IDLE) continue log.info("Traducciones pendientes de asignar evento: %d", len(pending_ids)) emb_by_tr = fetch_embeddings_for(conn, pending_ids) if not emb_by_tr: log.warning("No se encontraron embeddings para las traducciones pendientes.") time.sleep(EVENT_SLEEP_IDLE) continue centroids = fetch_centroids(conn) log.info("Centroides cargados: %d", len(centroids)) processed = 0 for tr_id in pending_ids: vec = emb_by_tr.get(tr_id) if vec is None: continue assign_to_event(conn, tr_id, vec, centroids) processed += 1 conn.commit() log.info("Asignación de eventos completada. Traducciones procesadas: %d", processed) except Exception as e: log.exception("Error en cluster_worker: %s", e) time.sleep(EVENT_SLEEP_IDLE) if __name__ == "__main__": main()