rss/related_worker.py

206 lines
6.6 KiB
Python

# related_worker.py
import os
import time
import math
import logging
from typing import List, Tuple
import psycopg2
import psycopg2.extras
logging.basicConfig(
level=logging.INFO,
format='[related] %(asctime)s %(levelname)s: %(message)s'
)
DB = dict(
host=os.environ.get("DB_HOST", "localhost"),
port=int(os.environ.get("DB_PORT", 5432)),
dbname=os.environ.get("DB_NAME", "rss"),
user=os.environ.get("DB_USER", "rss"),
password=os.environ.get("DB_PASS", "x"),
)
# Config
TOPK = int(os.environ.get("RELATED_TOPK", 10)) # vecinos por traducción
BATCH_IDS = int(os.environ.get("RELATED_BATCH_IDS", 200)) # cuántas traducciones objetivo por pasada
BATCH_SIM = int(os.environ.get("RELATED_BATCH_SIM", 2000)) # tamaño de bloque al comparar contra el resto
SLEEP_IDLE = float(os.environ.get("RELATED_SLEEP", 10)) # pausa cuando no hay trabajo
MIN_SCORE = float(os.environ.get("RELATED_MIN_SCORE", 0.0)) # descarta relaciones por debajo de este coseno
WINDOW_HOURS = int(os.environ.get("RELATED_WINDOW_H", 0)) # 0 = sin filtro temporal; >0 = últimas X horas
def get_conn():
return psycopg2.connect(**DB)
def _fetch_all_embeddings(cur):
"""
Devuelve:
ids: List[int] con traduccion_id
vecs: List[List[float]] con el embedding (puede venir como list de DOUBLE PRECISION[])
norms: List[float] con la norma L2 de cada vector (precalculada para acelerar el coseno)
Si WINDOW_HOURS > 0, limitamos a noticias recientes.
"""
if WINDOW_HOURS > 0:
cur.execute("""
SELECT e.traduccion_id, e.vec
FROM embeddings e
JOIN traducciones t ON t.id = e.traduccion_id
JOIN noticias n ON n.id = t.noticia_id
WHERE n.fecha >= NOW() - INTERVAL %s
""", (f"{WINDOW_HOURS} hours",))
else:
cur.execute("SELECT traduccion_id, vec FROM embeddings")
rows = cur.fetchall()
if not rows:
return [], [], []
ids = []
vecs = []
norms = []
for tr_id, v in rows:
# v llega como lista de floats (DOUBLE PRECISION[]); protegemos None
if v is None:
v = []
# calcular norma
nrm = math.sqrt(sum(((x or 0.0) * (x or 0.0)) for x in v)) or 1e-8
ids.append(tr_id)
vecs.append(v)
norms.append(nrm)
return ids, vecs, norms
def _fetch_pending_ids(cur, limit) -> List[int]:
"""
Traducciones con embedding pero sin relaciones generadas aún.
Si quieres regenerar periódicamente, puedes cambiar la condición
para tener en cuenta antigüedad o un flag de 'stale'.
"""
cur.execute("""
SELECT e.traduccion_id
FROM embeddings e
LEFT JOIN related_noticias r ON r.traduccion_id = e.traduccion_id
GROUP BY e.traduccion_id
HAVING COUNT(r.related_traduccion_id) = 0
ORDER BY e.traduccion_id DESC
LIMIT %s;
""", (limit,))
return [r[0] for r in cur.fetchall()]
def _cosine_with_norms(a, b, na, nb):
# producto punto
num = 0.0
# zip se corta por el más corto; si longitudes difieren, usamos la intersección
for x, y in zip(a, b):
xv = x or 0.0
yv = y or 0.0
num += xv * yv
denom = na * nb
if denom <= 0.0:
return 0.0
return num / denom
def _topk_for_one(idx: int,
ids_all: List[int],
vecs_all: List[List[float]],
norms_all: List[float],
pool_indices: List[int],
K: int) -> List[Tuple[int, float]]:
"""
Devuelve los K mejores (related_id, score) para ids_all[idx] restringido al conjunto pool_indices.
"""
me_vec = vecs_all[idx]
me_norm = norms_all[idx]
out: List[Tuple[int, float]] = []
for j in pool_indices:
if j == idx:
continue
s = _cosine_with_norms(me_vec, vecs_all[j], me_norm, norms_all[j])
out.append((ids_all[j], s))
# top-K ordenado por score desc
out.sort(key=lambda t: t[1], reverse=True)
if MIN_SCORE > 0.0:
out = [p for p in out if p[1] >= MIN_SCORE]
return out[:K]
def _insert_related(cur, tr_id: int, pairs: List[Tuple[int, float]]):
if not pairs:
return
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO related_noticias (traduccion_id, related_traduccion_id, score)
VALUES %s
ON CONFLICT (traduccion_id, related_traduccion_id)
DO UPDATE SET score = EXCLUDED.score
""",
[(tr_id, rid, float(score)) for (rid, score) in pairs]
)
def build_for_ids(conn, target_ids: List[int]) -> int:
"""
Para las traducciones de target_ids:
- carga TODOS los embeddings (opcionalmente filtrados por ventana temporal),
- para cada target calcula sus TOPK vecinos por coseno, por bloques,
- upsert en related_noticias.
"""
with conn.cursor() as cur:
ids_all, vecs_all, norms_all = _fetch_all_embeddings(cur)
if not ids_all:
return 0
# mapa traduccion_id -> índice en arrays
pos = {tid: i for i, tid in enumerate(ids_all)}
n = len(ids_all)
processed = 0
with conn.cursor() as cur:
for tr_id in target_ids:
if tr_id not in pos:
continue
i = pos[tr_id]
# barrido por bloques para no disparar memoria
top: List[Tuple[int, float]] = []
for start in range(0, n, BATCH_SIM):
block = list(range(start, min(start + BATCH_SIM, n)))
candidates = _topk_for_one(i, ids_all, vecs_all, norms_all, block, TOPK)
# merge de top-K global
top += candidates
top.sort(key=lambda t: t[1], reverse=True)
if len(top) > TOPK:
top = top[:TOPK]
_insert_related(cur, tr_id, top)
processed += 1
conn.commit()
return processed
def main():
logging.info(
"Iniciando related_worker (TOPK=%s, BATCH_IDS=%s, BATCH_SIM=%s, MIN_SCORE=%.3f, WINDOW_H=%s)",
TOPK, BATCH_IDS, BATCH_SIM, MIN_SCORE, WINDOW_HOURS
)
while True:
try:
with get_conn() as conn, conn.cursor() as cur:
todo = _fetch_pending_ids(cur, BATCH_IDS)
if not todo:
time.sleep(SLEEP_IDLE)
continue
with get_conn() as conn:
done = build_for_ids(conn, todo)
logging.info("Relacionadas generadas/actualizadas para %d traducciones.", done)
except Exception:
logging.exception("Error en related_worker")
time.sleep(SLEEP_IDLE)
if __name__ == "__main__":
main()