import os import time import logging from typing import List, Tuple import numpy as np import psycopg2 import psycopg2.extras logging.basicConfig( level=logging.INFO, format='[related] %(asctime)s %(levelname)s: %(message)s' ) DB = dict( host=os.environ.get("DB_HOST", "localhost"), port=int(os.environ.get("DB_PORT", 5432)), dbname=os.environ.get("DB_NAME", "rss"), user=os.environ.get("DB_USER", "rss"), password=os.environ.get("DB_PASS", "x"), ) TOPK = int(os.environ.get("RELATED_TOPK", 10)) BATCH_IDS = int(os.environ.get("RELATED_BATCH_IDS", 200)) SLEEP_IDLE = float(os.environ.get("RELATED_SLEEP", 10)) MIN_SCORE = float(os.environ.get("RELATED_MIN_SCORE", 0.0)) WINDOW_HOURS = int(os.environ.get("RELATED_WINDOW_H", 0)) def get_conn(): return psycopg2.connect(**DB) # --------------------------------------------------------- # Cargar embeddings SOLO de traducciones en español (lang_to='es') # --------------------------------------------------------- def _fetch_all_embeddings(cur): base_sql = """ SELECT e.traduccion_id, e.vec FROM embeddings e JOIN traducciones t ON t.id = e.traduccion_id JOIN noticias n ON n.id = t.noticia_id WHERE t.lang_to = 'es' """ params = [] if WINDOW_HOURS > 0: base_sql += " AND n.fecha >= NOW() - INTERVAL %s" params.append(f"{WINDOW_HOURS} hours") cur.execute(base_sql, params) rows = cur.fetchall() if not rows: return [], None ids = [] vecs = [] for tid, v in rows: if v is None: continue ids.append(tid) vecs.append(v) if not ids: return [], None # Convertimos a matriz numpy mat = np.array(vecs, dtype=np.float32) # Normalizamos (evita división por 0) norms = np.linalg.norm(mat, axis=1, keepdims=True) norms[norms == 0] = 1e-8 mat = mat / norms return ids, mat # --------------------------------------------------------- # Obtiene IDs pendientes # --------------------------------------------------------- def _fetch_pending_ids(cur, limit) -> List[int]: cur.execute( """ SELECT e.traduccion_id FROM embeddings e JOIN traducciones t ON t.id = e.traduccion_id LEFT JOIN related_noticias r ON r.traduccion_id = e.traduccion_id WHERE t.lang_to = 'es' GROUP BY e.traduccion_id HAVING COUNT(r.related_traduccion_id) = 0 ORDER BY e.traduccion_id DESC LIMIT %s; """, (limit,), ) return [r[0] for r in cur.fetchall()] # --------------------------------------------------------- # TOP-K usando NumPy (súper rápido) # --------------------------------------------------------- def _topk_numpy( idx: int, ids_all: List[int], mat: np.ndarray, K: int ) -> List[Tuple[int, float]]: # vector de la noticia central q = mat[idx] # (dim,) # similitudes coseno: dot product (matriz · vector) sims = np.dot(mat, q) # eliminar self-match sims[idx] = -999.0 # filtramos por score mínimo if MIN_SCORE > 0: mask = sims >= MIN_SCORE sims = np.where(mask, sims, -999.0) # obtenemos los índices top-k (mucho más rápido que ordenar todo) if K >= len(sims): top_idx = np.argsort(-sims) else: part = np.argpartition(-sims, K)[:K] top_idx = part[np.argsort(-sims[part])] out = [(ids_all[j], float(sims[j])) for j in top_idx[:K]] return out # --------------------------------------------------------- # Inserta en la tabla related_noticias # --------------------------------------------------------- def _insert_related(cur, tr_id: int, pairs: List[Tuple[int, float]]): if not pairs: return psycopg2.extras.execute_values( cur, """ INSERT INTO related_noticias (traduccion_id, related_traduccion_id, score) VALUES %s ON CONFLICT (traduccion_id, related_traduccion_id) DO UPDATE SET score = EXCLUDED.score """, [(tr_id, rid, score) for (rid, score) in pairs], ) # --------------------------------------------------------- # Procesar IDs objetivo # --------------------------------------------------------- def build_for_ids(conn, target_ids: List[int]) -> int: with conn.cursor() as cur: ids_all, mat = _fetch_all_embeddings(cur) if not ids_all or mat is None: return 0 # Mapa ID → index pos = {tid: i for i, tid in enumerate(ids_all)} processed = 0 with conn.cursor() as cur: for tr_id in target_ids: if tr_id not in pos: continue idx = pos[tr_id] pairs = _topk_numpy(idx, ids_all, mat, TOPK) _insert_related(cur, tr_id, pairs) processed += 1 conn.commit() return processed # --------------------------------------------------------- # MAIN # --------------------------------------------------------- def main(): logging.info( "Iniciando related_worker (TOPK=%s, BATCH_IDS=%s, MIN_SCORE=%.3f, WINDOW_H=%s)", TOPK, BATCH_IDS, MIN_SCORE, WINDOW_HOURS, ) while True: try: with get_conn() as conn, conn.cursor() as cur: todo = _fetch_pending_ids(cur, BATCH_IDS) if not todo: time.sleep(SLEEP_IDLE) continue with get_conn() as conn: done = build_for_ids(conn, todo) logging.info("Relacionadas generadas/actualizadas para %d traducciones.", done) except Exception: logging.exception("Error en related_worker") time.sleep(SLEEP_IDLE) if __name__ == "__main__": main()