diff --git a/.env b/.env index 4a66dec..4d55eff 100644 --- a/.env +++ b/.env @@ -1,58 +1,34 @@ -# ========================= -# Base de datos -# ========================= DB_NAME=rss DB_USER=rss DB_PASS=lalalilo -# DB_HOST y DB_PORT los inyecta docker-compose (DB_HOST=db). -# Si ejecutas la app fuera de Docker, puedes descomentar: -# DB_HOST=localhost -# DB_PORT=5432 +DB_HOST=localhost +DB_PORT=5432 -# ========================= -# Flask / Web -# ========================= -# ¡Pon aquí una clave larga y aleatoria! SECRET_KEY=CAMBIA_ESTA_CLAVE_POR_ALGO_LARGO_Y_ALEATORIO -# Idioma por defecto de la web y traducción activada por defecto DEFAULT_LANG=es DEFAULT_TRANSLATION_LANG=es WEB_TRANSLATED_DEFAULT=1 - -# Paginación por defecto (app.py limita entre 10 y 100) NEWS_PER_PAGE=20 -# ========================= -# Ingesta / Scheduler -# ========================= RSS_MAX_WORKERS=20 RSS_FEED_TIMEOUT=30 RSS_MAX_FAILURES=5 -# ========================= -# Worker de traducción (NLLB 1.3B) -# ========================= TARGET_LANGS=es TRANSLATOR_BATCH=4 ENQUEUE=200 TRANSLATOR_SLEEP_IDLE=5 -# Límites de tokens (equilibrio calidad/VRAM para 12 GB) MAX_SRC_TOKENS=512 MAX_NEW_TOKENS=256 -# Beams (más calidad en títulos) NUM_BEAMS_TITLE=3 NUM_BEAMS_BODY=2 -# Modelo y dispositivo UNIVERSAL_MODEL=facebook/nllb-200-1.3B DEVICE=cuda -# ========================= -# Runtime (estabilidad/VRAM) -# ========================= PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True,max_split_size_mb:64,garbage_collection_threshold:0.9 TOKENIZERS_PARALLELISM=false PYTHONUNBUFFERED=1 diff --git a/Dockerfile b/Dockerfile index 1747b9f..9c9c134 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,59 +1,38 @@ -# Dockerfile -# ----------- - -# Imagen base Python FROM python:3.11-slim -# Construcción para CUDA 12.1 por defecto (usa --build-arg TORCH_CUDA=cpu para CPU) ARG TORCH_CUDA=cu121 WORKDIR /app -# Paquetes del sistema necesarios -# - libpq-dev y gcc: para compilar dependencias que hablen con PostgreSQL (psycopg2) -# - git: algunos modelos/liberías pueden tirar de git RUN apt-get update && apt-get install -y --no-install-recommends \ libpq-dev \ gcc \ git \ && rm -rf /var/lib/apt/lists/* -# Ajustes de pip / runtime ENV PYTHONUNBUFFERED=1 \ PIP_DISABLE_PIP_VERSION_CHECK=1 \ TOKENIZERS_PARALLELISM=false \ HF_HUB_DISABLE_SYMLINKS_WARNING=1 -# Dependencias Python COPY requirements.txt ./ RUN python -m pip install --no-cache-dir --upgrade pip setuptools wheel -# Instala PyTorch: -# - Con CUDA 12.1 si TORCH_CUDA=cu121 (requiere runtime nvidia al ejecutar) -# - Con ruedas CPU si TORCH_CUDA=cpu RUN if [ "$TORCH_CUDA" = "cu121" ]; then \ pip install --no-cache-dir --index-url https://download.pytorch.org/whl/cu121 \ torch==2.4.1 torchvision==0.19.1 torchaudio==2.4.1 ; \ else \ - pip install --no-cache-dir \ + pip install --no-cache-dir --index-url https://download.pytorch.org/whl/cpu \ torch==2.4.1 torchvision==0.19.1 torchaudio==2.4.1 ; \ fi -# Instala el resto de dependencias de tu app RUN pip install --no-cache-dir -r requirements.txt -# Descarga el modelo de spaCy (español) para NER -# Si el entorno de build no tiene red, no rompas la build: intenta en runtime. RUN python -m spacy download es_core_news_md || true -# Copia el código de la app COPY . . -# Descarga de recursos NLTK que usa newspaper3k (no crítico en build) RUN python download_models.py || true -# Puerto que usa gunicorn en el servicio web EXPOSE 8000 -# El CMD/entrypoint se define en docker-compose.yml (web, scheduler, workers) - diff --git a/app.py b/app.py index b740494..e4412ab 100644 --- a/app.py +++ b/app.py @@ -224,7 +224,6 @@ def _process_feed(feed_row): except psycopg2.Error as e: app.logger.warning(f"[ingesta] Error insertando noticia de {feed_url}: {e}") - # Si ha ido bien, reseteamos fallos with get_conn() as conn, conn.cursor() as cur: cur.execute( "UPDATE feeds SET fallos = 0 WHERE id = %s;", @@ -236,7 +235,6 @@ def _process_feed(feed_row): except Exception as e: app.logger.exception(f"[ingesta] Error procesando feed {feed_id} ({feed_url}): {e}") try: - # Incrementamos fallos y marcamos inactivo si supera RSS_MAX_FAILURES with get_conn() as conn, conn.cursor() as cur: cur.execute( """ @@ -818,11 +816,6 @@ def restore_feeds(): return redirect(url_for("restore_feeds")) def parse_int_field(row, key): - """ - Intenta convertir row[key] a int. - - Si está vacío -> None - - Si no es convertible (p.ej. 'categoria_id') -> None y log de aviso - """ val = row.get(key) if val is None or str(val).strip() == "": return None @@ -842,6 +835,18 @@ def restore_feeds(): categoria_id = parse_int_field(row, "categoria_id") pais_id = parse_int_field(row, "pais_id") + raw_fallos = (row.get("fallos") or "").strip() + if raw_fallos == "": + fallos = 0 + else: + try: + fallos = int(raw_fallos) + except (ValueError, TypeError): + app.logger.warning( + f"[restore_feeds] Valor no numérico '{raw_fallos}' en columna fallos, se usará 0." + ) + fallos = 0 + cur.execute( """ INSERT INTO feeds (nombre, descripcion, url, categoria_id, pais_id, idioma, activo, fallos) @@ -863,7 +868,7 @@ def restore_feeds(): pais_id, (row.get("idioma") or "").strip().lower()[:2] or None, row.get("activo") in ("1", "True", "true", "t", "on"), - int(row.get("fallos") or 0), + fallos, ), ) conn.commit() diff --git a/docker-compose.yml b/docker-compose.yml index 1fb5e5e..7e77869 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -80,32 +80,24 @@ services: - DB_NAME=${DB_NAME} - DB_USER=${DB_USER} - DB_PASS=${DB_PASS} - - TARGET_LANGS=es - - TRANSLATOR_BATCH=8 + - TRANSLATOR_BATCH=32 - ENQUEUE=200 - TRANSLATOR_SLEEP_IDLE=5 - - MAX_SRC_TOKENS=680 - MAX_NEW_TOKENS=400 - - NUM_BEAMS_TITLE=2 - NUM_BEAMS_BODY=1 - - UNIVERSAL_MODEL=facebook/nllb-200-1.3B - - CHUNK_BY_SENTENCES=True - - CHUNK_MAX_TOKENS=700 + - CHUNK_MAX_TOKENS=400 - CHUNK_OVERLAP_SENTS=1 - CLEAN_ARTICLE=1 - - DEVICE=cuda - - PYTHONUNBUFFERED=1 - HF_HOME=/root/.cache/huggingface - TOKENIZERS_PARALLELISM=false - PYTORCH_CUDA_ALLOC_CONF=max_split_size_mb:64,garbage_collection_threshold:0.9 - - NVIDIA_VISIBLE_DEVICES=all - NVIDIA_DRIVER_CAPABILITIES=compute,utility volumes: @@ -149,11 +141,12 @@ services: - DB_NAME=${DB_NAME} - DB_USER=${DB_USER} - DB_PASS=${DB_PASS} - - EMB_MODEL=sentence-transformers/all-MiniLM-L6-v2 - - EMB_BATCH=64 - - EMB_SLEEP=5 - + - EMB_BATCH=256 + - EMB_SLEEP_IDLE=5 + - EMB_LANGS=es + - EMB_LIMIT=5000 + - DEVICE=cuda - PYTHONUNBUFFERED=1 - HF_HOME=/root/.cache/huggingface - TOKENIZERS_PARALLELISM=false @@ -163,7 +156,7 @@ services: db: condition: service_healthy restart: always - # gpus: all + gpus: all related: build: @@ -178,7 +171,6 @@ services: - DB_NAME=${DB_NAME} - DB_USER=${DB_USER} - DB_PASS=${DB_PASS} - - RELATED_TOPK=10 - RELATED_BATCH_IDS=200 - RELATED_BATCH_SIM=2000 diff --git a/embeddings_worker.py b/embeddings_worker.py index d4f337e..4643f53 100644 --- a/embeddings_worker.py +++ b/embeddings_worker.py @@ -1,17 +1,18 @@ # embeddings_worker.py # Worker de embeddings para TRADUCCIONES: # - Lee traducciones con status='done' y sin embedding para un modelo concreto -# - Calcula embedding (Sentence-Transformers) sobre título_trad + resumen_trad +# - Calcula embedding (Sentence-Transformers) sobre titulo_trad + resumen_trad # - Guarda en traduccion_embeddings (traduccion_id, model, dim, embedding) import os import time import logging -from typing import List, Tuple +from typing import List import numpy as np import psycopg2 import psycopg2.extras from sentence_transformers import SentenceTransformer +import torch logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s') log = logging.getLogger(__name__) @@ -26,25 +27,33 @@ DB = dict( ) # ---------- Parámetros de worker ---------- -# Modelo por defecto: MiniLM pequeño y rápido -EMB_MODEL = os.environ.get("EMB_MODEL", "sentence-transformers/all-MiniLM-L6-v2") -EMB_BATCH = int(os.environ.get("EMB_BATCH", "128")) -SLEEP_IDLE = float(os.environ.get("EMB_SLEEP_IDLE", "5.0")) +# Modelo por defecto: multilingüe, bueno para muchas lenguas +EMB_MODEL = os.environ.get( + "EMB_MODEL", + "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", +) +EMB_BATCH = int(os.environ.get("EMB_BATCH", "128")) +SLEEP_IDLE = float(os.environ.get("EMB_SLEEP_IDLE", "5.0")) # Filtrado por idiomas destino (coma-separado). Por defecto sólo 'es' -EMB_LANGS = [s.strip() for s in os.environ.get("EMB_LANGS", "es").split(",") if s.strip()] -DEVICE = os.environ.get("DEVICE", "auto").lower() # 'auto' | 'cpu' | 'cuda' +EMB_LANGS = [s.strip() for s in os.environ.get("EMB_LANGS", "es").split(",") if s.strip()] + +# DEVICE_ENV: 'auto' | 'cpu' | 'cuda' +DEVICE_ENV = os.environ.get("DEVICE", "auto").lower() # Límite por iteración (para no tragar toda la tabla de golpe) -EMB_LIMIT = int(os.environ.get("EMB_LIMIT", "1000")) +EMB_LIMIT = int(os.environ.get("EMB_LIMIT", "1000")) + # ---------- Utilidades ---------- def get_conn(): return psycopg2.connect(**DB) + def ensure_schema(conn): """Crea la tabla de embeddings para traducciones si no existe.""" with conn.cursor() as cur: - cur.execute(""" + cur.execute( + """ CREATE TABLE IF NOT EXISTS traduccion_embeddings ( id SERIAL PRIMARY KEY, traduccion_id INT NOT NULL REFERENCES traducciones(id) ON DELETE CASCADE, @@ -54,19 +63,21 @@ def ensure_schema(conn): created_at TIMESTAMP DEFAULT NOW(), UNIQUE (traduccion_id, model) ); - """) + """ + ) cur.execute("CREATE INDEX IF NOT EXISTS idx_tr_emb_model ON traduccion_embeddings(model);") cur.execute("CREATE INDEX IF NOT EXISTS idx_tr_emb_trid ON traduccion_embeddings(traduccion_id);") conn.commit() + def fetch_batch_pending(conn) -> List[psycopg2.extras.DictRow]: """ Devuelve un lote de traducciones 'done' del/los idioma(s) objetivo que no tienen embedding aún para el EMB_MODEL indicado. """ with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: - # Usamos ANY(%s) para filtrar por múltiples idiomas destino - cur.execute(f""" + cur.execute( + """ SELECT t.id AS traduccion_id, t.lang_to AS lang_to, COALESCE(NULLIF(t.titulo_trad, ''), '') AS titulo_trad, @@ -81,10 +92,13 @@ def fetch_batch_pending(conn) -> List[psycopg2.extras.DictRow]: AND e.traduccion_id IS NULL ORDER BY t.id LIMIT %s - """, (EMB_MODEL, EMB_LANGS, EMB_LIMIT)) + """, + (EMB_MODEL, EMB_LANGS, EMB_LIMIT), + ) rows = cur.fetchall() return rows + def texts_from_rows(rows: List[psycopg2.extras.DictRow]) -> List[str]: """ Compone el texto a vectorizar por cada traducción: @@ -93,13 +107,14 @@ def texts_from_rows(rows: List[psycopg2.extras.DictRow]) -> List[str]: texts = [] for r in rows: title = (r["titulo_trad"] or "").strip() - body = (r["resumen_trad"] or "").strip() + body = (r["resumen_trad"] or "").strip() if title and body: texts.append(f"{title}\n{body}") else: texts.append(title or body or "") return texts + def upsert_embeddings(conn, rows, embs: np.ndarray, model_name: str): """ Inserta/actualiza embeddings por traducción. @@ -109,25 +124,39 @@ def upsert_embeddings(conn, rows, embs: np.ndarray, model_name: str): dim = int(embs.shape[1]) with conn.cursor() as cur: for r, e in zip(rows, embs): - cur.execute(""" + cur.execute( + """ INSERT INTO traduccion_embeddings (traduccion_id, model, dim, embedding) VALUES (%s, %s, %s, %s) ON CONFLICT (traduccion_id, model) DO UPDATE SET embedding = EXCLUDED.embedding, dim = EXCLUDED.dim, created_at = NOW() - """, (int(r["traduccion_id"]), model_name, dim, list(map(float, e)))) + """, + (int(r["traduccion_id"]), model_name, dim, list(map(float, e))), + ) conn.commit() + # ---------- Main loop ---------- def main(): log.info("Arrancando embeddings_worker para TRADUCCIONES") - log.info("Modelo: %s | Batch: %s | Idiomas: %s | Device: %s", - EMB_MODEL, EMB_BATCH, ",".join(EMB_LANGS), DEVICE) + log.info( + "Modelo: %s | Batch: %s | Idiomas: %s | DEVICE env: %s", + EMB_MODEL, + EMB_BATCH, + ",".join(EMB_LANGS), + DEVICE_ENV, + ) - # Carga modelo - # DEVICE='auto' -> deja que S-B decida (usa CUDA si está disponible) - model = SentenceTransformer(EMB_MODEL, device=None if DEVICE == "auto" else DEVICE) + if DEVICE_ENV == "auto": + device = "cuda" if torch.cuda.is_available() else "cpu" + else: + device = DEVICE_ENV + + log.info("Usando dispositivo: %s", device) + + model = SentenceTransformer(EMB_MODEL, device=device) while True: try: @@ -140,13 +169,12 @@ def main(): continue texts = texts_from_rows(rows) - # Normalizamos embeddings (unit-length) para facilitar similitudes posteriores embs = model.encode( texts, batch_size=EMB_BATCH, convert_to_numpy=True, show_progress_bar=False, - normalize_embeddings=True + normalize_embeddings=True, ) upsert_embeddings(conn, rows, embs, EMB_MODEL) @@ -156,6 +184,7 @@ def main(): log.exception("Error en embeddings_worker: %s", e) time.sleep(SLEEP_IDLE) + if __name__ == "__main__": main() diff --git a/init-db/08-embeddings.sql b/init-db/08-embeddings.sql index 2e80632..6966670 100644 --- a/init-db/08-embeddings.sql +++ b/init-db/08-embeddings.sql @@ -1,11 +1,3 @@ --- init-db/08-embeddings.sql --- ============================================================ --- Esquema para embeddings y relaciones semánticas entre noticias --- Compatible con embeddings_worker.py (usa traduccion_embeddings) --- y mantiene una vista "embeddings" para compatibilidad previa. --- ============================================================ - --- Tabla principal de embeddings por traducción (con modelo) CREATE TABLE IF NOT EXISTS traduccion_embeddings ( id SERIAL PRIMARY KEY, traduccion_id INT NOT NULL REFERENCES traducciones(id) ON DELETE CASCADE, @@ -16,18 +8,11 @@ CREATE TABLE IF NOT EXISTS traduccion_embeddings ( UNIQUE (traduccion_id, model) ); --- Índices recomendados CREATE INDEX IF NOT EXISTS idx_tr_emb_traduccion_id ON traduccion_embeddings(traduccion_id); CREATE INDEX IF NOT EXISTS idx_tr_emb_model ON traduccion_embeddings(model); --- ----------------------------------------------------------------- --- Vista de compatibilidad "embeddings" --- (emula tu antigua tabla con columnas: traduccion_id, dim, vec) --- Ajusta el valor del WHERE model = '...' si usas otro modelo. --- ----------------------------------------------------------------- DO $$ BEGIN - -- Si ya existe una tabla llamada embeddings, la renombramos a embeddings_legacy para evitar conflicto IF EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'embeddings' @@ -35,11 +20,9 @@ BEGIN EXECUTE 'ALTER TABLE embeddings RENAME TO embeddings_legacy'; END IF; EXCEPTION WHEN others THEN - -- No bloqueamos la migración por esto NULL; END$$; --- Crea/actualiza la vista CREATE OR REPLACE VIEW embeddings AS SELECT te.traduccion_id, @@ -48,20 +31,6 @@ SELECT FROM traduccion_embeddings te WHERE te.model = 'sentence-transformers/all-MiniLM-L6-v2'; --- Nota: --- Si quieres que la vista siempre coja el embedding más reciente de CUALQUIER modelo: --- REEMPLAZA el WHERE anterior por: --- WHERE te.id IN ( --- SELECT DISTINCT ON (traduccion_id) id --- FROM traduccion_embeddings --- ORDER BY traduccion_id, created_at DESC --- ); - --- ----------------------------------------------------------------- --- Relaciones semánticas entre traducciones (opcional) --- Esta tabla no la usa el worker directamente, pero permite cachear --- "noticias relacionadas" precalculadas por otro proceso/batch. --- ----------------------------------------------------------------- CREATE TABLE IF NOT EXISTS related_noticias ( traduccion_id INT NOT NULL REFERENCES traducciones(id) ON DELETE CASCADE, related_traduccion_id INT NOT NULL REFERENCES traducciones(id) ON DELETE CASCADE, @@ -71,11 +40,6 @@ CREATE TABLE IF NOT EXISTS related_noticias ( CHECK (traduccion_id <> related_traduccion_id) ); --- Índices para acelerar consultas en ambos sentidos CREATE INDEX IF NOT EXISTS idx_related_by_tr ON related_noticias (traduccion_id); CREATE INDEX IF NOT EXISTS idx_related_by_relatedtr ON related_noticias (related_traduccion_id); --- Sugerencias: --- - Si pretendes recalcular periódicamente, podrías limpiar por ventana temporal: --- DELETE FROM related_noticias WHERE created_at < NOW() - INTERVAL '7 days'; - diff --git a/related_worker.py b/related_worker.py index e745c6a..6f8f17d 100644 --- a/related_worker.py +++ b/related_worker.py @@ -1,4 +1,3 @@ -# related_worker.py import os import time import math @@ -21,33 +20,30 @@ DB = dict( password=os.environ.get("DB_PASS", "x"), ) -# Config -TOPK = int(os.environ.get("RELATED_TOPK", 10)) # vecinos por traducción -BATCH_IDS = int(os.environ.get("RELATED_BATCH_IDS", 200)) # cuántas traducciones objetivo por pasada -BATCH_SIM = int(os.environ.get("RELATED_BATCH_SIM", 2000)) # tamaño de bloque al comparar contra el resto -SLEEP_IDLE = float(os.environ.get("RELATED_SLEEP", 10)) # pausa cuando no hay trabajo -MIN_SCORE = float(os.environ.get("RELATED_MIN_SCORE", 0.0)) # descarta relaciones por debajo de este coseno -WINDOW_HOURS = int(os.environ.get("RELATED_WINDOW_H", 0)) # 0 = sin filtro temporal; >0 = últimas X horas +TOPK = int(os.environ.get("RELATED_TOPK", 10)) +BATCH_IDS = int(os.environ.get("RELATED_BATCH_IDS", 200)) +BATCH_SIM = int(os.environ.get("RELATED_BATCH_SIM", 2000)) +SLEEP_IDLE = float(os.environ.get("RELATED_SLEEP", 10)) +MIN_SCORE = float(os.environ.get("RELATED_MIN_SCORE", 0.0)) +WINDOW_HOURS = int(os.environ.get("RELATED_WINDOW_H", 0)) + def get_conn(): return psycopg2.connect(**DB) + def _fetch_all_embeddings(cur): - """ - Devuelve: - ids: List[int] con traduccion_id - vecs: List[List[float]] con el embedding (puede venir como list de DOUBLE PRECISION[]) - norms: List[float] con la norma L2 de cada vector (precalculada para acelerar el coseno) - Si WINDOW_HOURS > 0, limitamos a noticias recientes. - """ if WINDOW_HOURS > 0: - cur.execute(""" + cur.execute( + """ SELECT e.traduccion_id, e.vec FROM embeddings e JOIN traducciones t ON t.id = e.traduccion_id JOIN noticias n ON n.id = t.noticia_id WHERE n.fecha >= NOW() - INTERVAL %s - """, (f"{WINDOW_HOURS} hours",)) + """, + (f"{WINDOW_HOURS} hours",), + ) else: cur.execute("SELECT traduccion_id, vec FROM embeddings") @@ -59,23 +55,18 @@ def _fetch_all_embeddings(cur): vecs = [] norms = [] for tr_id, v in rows: - # v llega como lista de floats (DOUBLE PRECISION[]); protegemos None if v is None: v = [] - # calcular norma nrm = math.sqrt(sum(((x or 0.0) * (x or 0.0)) for x in v)) or 1e-8 ids.append(tr_id) vecs.append(v) norms.append(nrm) return ids, vecs, norms + def _fetch_pending_ids(cur, limit) -> List[int]: - """ - Traducciones con embedding pero sin relaciones generadas aún. - Si quieres regenerar periódicamente, puedes cambiar la condición - para tener en cuenta antigüedad o un flag de 'stale'. - """ - cur.execute(""" + cur.execute( + """ SELECT e.traduccion_id FROM embeddings e LEFT JOIN related_noticias r ON r.traduccion_id = e.traduccion_id @@ -83,13 +74,14 @@ def _fetch_pending_ids(cur, limit) -> List[int]: HAVING COUNT(r.related_traduccion_id) = 0 ORDER BY e.traduccion_id DESC LIMIT %s; - """, (limit,)) + """, + (limit,), + ) return [r[0] for r in cur.fetchall()] + def _cosine_with_norms(a, b, na, nb): - # producto punto num = 0.0 - # zip se corta por el más corto; si longitudes difieren, usamos la intersección for x, y in zip(a, b): xv = x or 0.0 yv = y or 0.0 @@ -99,15 +91,15 @@ def _cosine_with_norms(a, b, na, nb): return 0.0 return num / denom -def _topk_for_one(idx: int, - ids_all: List[int], - vecs_all: List[List[float]], - norms_all: List[float], - pool_indices: List[int], - K: int) -> List[Tuple[int, float]]: - """ - Devuelve los K mejores (related_id, score) para ids_all[idx] restringido al conjunto pool_indices. - """ + +def _topk_for_one( + idx: int, + ids_all: List[int], + vecs_all: List[List[float]], + norms_all: List[float], + pool_indices: List[int], + K: int, +) -> List[Tuple[int, float]]: me_vec = vecs_all[idx] me_norm = norms_all[idx] @@ -118,12 +110,12 @@ def _topk_for_one(idx: int, s = _cosine_with_norms(me_vec, vecs_all[j], me_norm, norms_all[j]) out.append((ids_all[j], s)) - # top-K ordenado por score desc out.sort(key=lambda t: t[1], reverse=True) if MIN_SCORE > 0.0: out = [p for p in out if p[1] >= MIN_SCORE] return out[:K] + def _insert_related(cur, tr_id: int, pairs: List[Tuple[int, float]]): if not pairs: return @@ -135,22 +127,16 @@ def _insert_related(cur, tr_id: int, pairs: List[Tuple[int, float]]): ON CONFLICT (traduccion_id, related_traduccion_id) DO UPDATE SET score = EXCLUDED.score """, - [(tr_id, rid, float(score)) for (rid, score) in pairs] + [(tr_id, rid, float(score)) for (rid, score) in pairs], ) + def build_for_ids(conn, target_ids: List[int]) -> int: - """ - Para las traducciones de target_ids: - - carga TODOS los embeddings (opcionalmente filtrados por ventana temporal), - - para cada target calcula sus TOPK vecinos por coseno, por bloques, - - upsert en related_noticias. - """ with conn.cursor() as cur: ids_all, vecs_all, norms_all = _fetch_all_embeddings(cur) if not ids_all: return 0 - # mapa traduccion_id -> índice en arrays pos = {tid: i for i, tid in enumerate(ids_all)} n = len(ids_all) processed = 0 @@ -161,13 +147,10 @@ def build_for_ids(conn, target_ids: List[int]) -> int: continue i = pos[tr_id] - # barrido por bloques para no disparar memoria top: List[Tuple[int, float]] = [] for start in range(0, n, BATCH_SIM): block = list(range(start, min(start + BATCH_SIM, n))) candidates = _topk_for_one(i, ids_all, vecs_all, norms_all, block, TOPK) - - # merge de top-K global top += candidates top.sort(key=lambda t: t[1], reverse=True) if len(top) > TOPK: @@ -179,10 +162,15 @@ def build_for_ids(conn, target_ids: List[int]) -> int: conn.commit() return processed + def main(): logging.info( "Iniciando related_worker (TOPK=%s, BATCH_IDS=%s, BATCH_SIM=%s, MIN_SCORE=%.3f, WINDOW_H=%s)", - TOPK, BATCH_IDS, BATCH_SIM, MIN_SCORE, WINDOW_HOURS + TOPK, + BATCH_IDS, + BATCH_SIM, + MIN_SCORE, + WINDOW_HOURS, ) while True: try: @@ -201,6 +189,7 @@ def main(): logging.exception("Error en related_worker") time.sleep(SLEEP_IDLE) + if __name__ == "__main__": main() diff --git a/templates/_noticias_list.html b/templates/_noticias_list.html index 3a8fd4b..6a90885 100644 --- a/templates/_noticias_list.html +++ b/templates/_noticias_list.html @@ -1,9 +1,14 @@