From e3a99d96041917cf4f66e41ed483b84965422be5 Mon Sep 17 00:00:00 2001 From: jlimolina Date: Mon, 24 Nov 2025 23:06:26 +0100 Subject: [PATCH] retoques --- .gitignore | 21 ++- cluster_worker.py | 272 +++++++++++++++++++--------- docker-compose.yml | 67 +++++-- estructura.sql | 364 -------------------------------------- init-db/08-embeddings.sql | 2 +- init-db/09-eventos.sql | 8 +- scheduler.py | 2 +- translation_worker.py | 236 ++++++++++++++++++++++-- 8 files changed, 489 insertions(+), 483 deletions(-) delete mode 100644 estructura.sql diff --git a/.gitignore b/.gitignore index f60e61a..27c6195 100755 --- a/.gitignore +++ b/.gitignore @@ -1,24 +1,33 @@ -# Virtual environment +# --- Virtual environments --- venv/ .venv/ env/ .env/ +.env -# Byte-code files +# --- Python bytecode --- *.pyc __pycache__/ -# IDE specific files +# --- IDE project folders --- .vscode/ .idea/ -# Operating System files +# --- OS-generated files --- .DS_Store Thumbs.db -# Database files (if SQLite) +# --- SQLite / misc DB files --- *.sqlite3 *.db -# Logs +# --- Postgres Docker data directory --- +pgdata/ + +# --- HuggingFace models cache --- +hf_cache/ + +# --- Logs --- *.log +logs/ + diff --git a/cluster_worker.py b/cluster_worker.py index 6facadf..c808d82 100644 --- a/cluster_worker.py +++ b/cluster_worker.py @@ -6,6 +6,7 @@ from typing import List, Dict, Any, Optional import numpy as np import psycopg2 import psycopg2.extras +from psycopg2.extras import Json logging.basicConfig( level=logging.INFO, @@ -43,29 +44,12 @@ def get_conn(): def ensure_schema(conn): """ - Asegura que la tabla de eventos y las columnas necesarias existen. - Aquí se asume el esquema original de eventos con centroid JSONB. + Asumimos que las tablas y columnas (eventos, traducciones.evento_id, + eventos_noticias, función/trigger) ya existen por los scripts init-db. + Aquí solo nos aseguramos de que existan ciertos índices clave + (idempotente). """ with conn.cursor() as cur: - cur.execute( - """ - CREATE TABLE IF NOT EXISTS eventos ( - id SERIAL PRIMARY KEY, - creado_en TIMESTAMP NOT NULL DEFAULT NOW(), - actualizado_en TIMESTAMP NOT NULL DEFAULT NOW(), - centroid JSONB NOT NULL, - total_traducciones INTEGER NOT NULL DEFAULT 1 - ); - """ - ) - - cur.execute( - """ - ALTER TABLE traducciones - ADD COLUMN IF NOT EXISTS evento_id INTEGER REFERENCES eventos(id); - """ - ) - cur.execute( """ CREATE INDEX IF NOT EXISTS idx_traducciones_evento @@ -78,27 +62,6 @@ def ensure_schema(conn): ON traducciones(evento_id, noticia_id); """ ) - - cur.execute( - """ - CREATE OR REPLACE FUNCTION actualizar_evento_modificado() - RETURNS TRIGGER AS $$ - BEGIN - NEW.actualizado_en = NOW(); - RETURN NEW; - END; - $$ LANGUAGE plpgsql; - """ - ) - cur.execute("DROP TRIGGER IF EXISTS trg_evento_modificado ON eventos;") - cur.execute( - """ - CREATE TRIGGER trg_evento_modificado - BEFORE UPDATE ON eventos - FOR EACH ROW - EXECUTE FUNCTION actualizar_evento_modificado(); - """ - ) conn.commit() @@ -161,6 +124,7 @@ def fetch_embeddings_for(conn, tr_ids: List[int]) -> Dict[int, np.ndarray]: def fetch_centroids(conn) -> List[Dict[str, Any]]: """ Carga todos los centroides actuales desde eventos. + Solo usamos campos de clustering: id, centroid, total_traducciones. """ with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute( @@ -178,6 +142,7 @@ def fetch_centroids(conn) -> List[Dict[str, Any]]: raw = r["centroid"] cnt = int(r["total_traducciones"] or 1) if not isinstance(raw, (list, tuple)): + # centroid se almacena como JSONB array → en Python suele llegar como list continue arr = np.array([float(x or 0.0) for x in raw], dtype="float32") if arr.size == 0: @@ -201,6 +166,54 @@ def cosine_distance(a: np.ndarray, b: np.ndarray) -> float: return 1.0 - cos +def fetch_traduccion_info(conn, tr_id: int) -> Optional[Dict[str, Any]]: + """ + Devuelve info básica para un tr_id: + - noticia_id + - fecha de la noticia + - un título “representativo” para el evento (traducido u original). + """ + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute( + """ + SELECT + t.id AS traduccion_id, + t.noticia_id AS noticia_id, + n.fecha AS fecha, + COALESCE(NULLIF(t.titulo_trad, ''), n.titulo) AS titulo_evento + FROM traducciones t + JOIN noticias n ON n.id = t.noticia_id + WHERE t.id = %s; + """, + (tr_id,), + ) + row = cur.fetchone() + if not row: + return None + return { + "traduccion_id": int(row["traduccion_id"]), + "noticia_id": row["noticia_id"], + "fecha": row["fecha"], + "titulo_evento": row["titulo_evento"], + } + + +def _insert_evento_noticia(cur, evento_id: int, info: Dict[str, Any]) -> None: + """ + Inserta relación en eventos_noticias (idempotente). + """ + if not info or not info.get("noticia_id"): + return + cur.execute( + """ + INSERT INTO eventos_noticias (evento_id, noticia_id, traduccion_id) + VALUES (%s, %s, %s) + ON CONFLICT (evento_id, traduccion_id) DO NOTHING; + """, + (evento_id, info["noticia_id"], info["traduccion_id"]), + ) + + def assign_to_event( conn, tr_id: int, @@ -210,31 +223,63 @@ def assign_to_event( """ Asigna una traducción a un evento existente (si distancia <= umbral) o crea un evento nuevo con este vector como centroide. - """ - from psycopg2.extras import Json + Además: + - Actualiza fecha_inicio, fecha_fin, n_noticias del evento. + - Rellena eventos_noticias (evento_id, noticia_id, traduccion_id). + """ if vec is None or vec.size == 0: return + info = fetch_traduccion_info(conn, tr_id) + + # Si no hay centroides todavía → primer evento if not centroids: centroid_list = [float(x) for x in vec.tolist()] with conn.cursor() as cur: - cur.execute( - """ - INSERT INTO eventos (centroid, total_traducciones) - VALUES (%s, %s) - RETURNING id; - """, - (Json(centroid_list), 1), - ) + if info and info.get("fecha"): + cur.execute( + """ + INSERT INTO eventos (centroid, total_traducciones, + fecha_inicio, fecha_fin, n_noticias, titulo) + VALUES (%s, %s, %s, %s, %s, %s) + RETURNING id; + """, + ( + Json(centroid_list), + 1, + info["fecha"], + info["fecha"], + 1, + info.get("titulo_evento"), + ), + ) + else: + # Fallback mínimo si no hay info de noticia + cur.execute( + """ + INSERT INTO eventos (centroid, total_traducciones) + VALUES (%s, %s) + RETURNING id; + """, + (Json(centroid_list), 1), + ) + new_id = cur.fetchone()[0] + + # Vincular traducción al evento cur.execute( "UPDATE traducciones SET evento_id = %s WHERE id = %s;", (new_id, tr_id), ) + + # Rellenar tabla de relación + _insert_evento_noticia(cur, new_id, info or {}) + centroids.append({"id": new_id, "vec": vec.copy(), "n": 1}) return + # Buscar el centroide más cercano best_idx: Optional[int] = None best_dist: float = 1.0 @@ -244,47 +289,98 @@ def assign_to_event( best_dist = d best_idx = i - if best_idx is not None and best_dist <= EVENT_DIST_THRESHOLD: - c = centroids[best_idx] - n_old = c["n"] - new_n = n_old + 1 - new_vec = (c["vec"] * n_old + vec) / float(new_n) + with conn.cursor() as cur: + # Asignar a evento existente si está por debajo del umbral + if best_idx is not None and best_dist <= EVENT_DIST_THRESHOLD: + c = centroids[best_idx] + n_old = c["n"] + new_n = n_old + 1 + new_vec = (c["vec"] * n_old + vec) / float(new_n) - c["vec"] = new_vec - c["n"] = new_n + c["vec"] = new_vec + c["n"] = new_n - centroid_list = [float(x) for x in new_vec.tolist()] - with conn.cursor() as cur: - cur.execute( - """ - UPDATE eventos - SET centroid = %s, - total_traducciones = total_traducciones + 1 - WHERE id = %s; - """, - (Json(centroid_list), c["id"]), - ) + centroid_list = [float(x) for x in new_vec.tolist()] + + if info and info.get("fecha"): + cur.execute( + """ + UPDATE eventos + SET centroid = %s, + total_traducciones = total_traducciones + 1, + fecha_inicio = COALESCE(LEAST(fecha_inicio, %s), %s), + fecha_fin = COALESCE(GREATEST(fecha_fin, %s), %s), + n_noticias = n_noticias + 1 + WHERE id = %s; + """, + ( + Json(centroid_list), + info["fecha"], + info["fecha"], + info["fecha"], + info["fecha"], + c["id"], + ), + ) + else: + # Sin info de fecha: solo actualizamos centroid/contador + cur.execute( + """ + UPDATE eventos + SET centroid = %s, + total_traducciones = total_traducciones + 1 + WHERE id = %s; + """, + (Json(centroid_list), c["id"]), + ) + + # Vincular traducción y relación cur.execute( "UPDATE traducciones SET evento_id = %s WHERE id = %s;", (c["id"], tr_id), ) - return + _insert_evento_noticia(cur, c["id"], info or {}) + + return + + # Si no hay evento adecuado → crear uno nuevo + centroid_list = [float(x) for x in vec.tolist()] + + if info and info.get("fecha"): + cur.execute( + """ + INSERT INTO eventos (centroid, total_traducciones, + fecha_inicio, fecha_fin, n_noticias, titulo) + VALUES (%s, %s, %s, %s, %s, %s) + RETURNING id; + """, + ( + Json(centroid_list), + 1, + info["fecha"], + info["fecha"], + 1, + info.get("titulo_evento"), + ), + ) + else: + cur.execute( + """ + INSERT INTO eventos (centroid, total_traducciones) + VALUES (%s, %s) + RETURNING id; + """, + (Json(centroid_list), 1), + ) - centroid_list = [float(x) for x in vec.tolist()] - with conn.cursor() as cur: - cur.execute( - """ - INSERT INTO eventos (centroid, total_traducciones) - VALUES (%s, %s) - RETURNING id; - """, - (Json(centroid_list), 1), - ) new_id = cur.fetchone()[0] + cur.execute( "UPDATE traducciones SET evento_id = %s WHERE id = %s;", (new_id, tr_id), ) + _insert_evento_noticia(cur, new_id, info or {}) + centroids.append({"id": new_id, "vec": vec.copy(), "n": 1}) @@ -309,11 +405,16 @@ def main(): time.sleep(EVENT_SLEEP_IDLE) continue - log.info("Traducciones pendientes de asignar evento: %d", len(pending_ids)) + log.info( + "Traducciones pendientes de asignar evento: %d", + len(pending_ids), + ) emb_by_tr = fetch_embeddings_for(conn, pending_ids) if not emb_by_tr: - log.warning("No se encontraron embeddings para las traducciones pendientes.") + log.warning( + "No se encontraron embeddings para las traducciones pendientes." + ) time.sleep(EVENT_SLEEP_IDLE) continue @@ -329,7 +430,10 @@ def main(): processed += 1 conn.commit() - log.info("Asignación de eventos completada. Traducciones procesadas: %d", processed) + log.info( + "Asignación de eventos completada. Traducciones procesadas: %d", + processed, + ) except Exception as e: log.exception("Error en cluster_worker: %s", e) diff --git a/docker-compose.yml b/docker-compose.yml index ed5d8f0..33d8f22 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,7 +13,9 @@ services: PGDATA: /var/lib/postgresql/data/18/main command: ["postgres", "-c", "max_connections=400"] volumes: - - /datos/rss/postgres/18:/var/lib/postgresql/data + # Datos de Postgres dentro del proyecto + - ./pgdata:/var/lib/postgresql/data + # Scripts de inicialización - ./init-db:/docker-entrypoint-initdb.d:ro restart: always healthcheck: @@ -61,18 +63,19 @@ services: - DB_USER=${DB_USER} - DB_PASS=${DB_PASS} - SECRET_KEY=${SECRET_KEY} - - RSS_MAX_WORKERS=8 + - RSS_MAX_WORKERS=16 depends_on: db: condition: service_healthy restart: always - translator: + # --- Worker de traducción en GPU: encola + traduce --- + translator_gpu: build: context: . args: TORCH_CUDA: cu121 - container_name: rss_translator + container_name: rss_translator_gpu command: bash -lc "python translation_worker.py" environment: - DB_HOST=db @@ -81,19 +84,19 @@ services: - DB_USER=${DB_USER} - DB_PASS=${DB_PASS} - TARGET_LANGS=es - - TRANSLATOR_BATCH=32 - - ENQUEUE=200 + - TRANSLATOR_BATCH=16 + - ENQUEUE=200 # ESTE encola traducciones nuevas - TRANSLATOR_SLEEP_IDLE=5 - MAX_SRC_TOKENS=680 - MAX_NEW_TOKENS=400 - - NUM_BEAMS_TITLE=2 + - NUM_BEAMS_TITLE=1 - NUM_BEAMS_BODY=1 - - UNIVERSAL_MODEL=facebook/nllb-200-1.3B + - UNIVERSAL_MODEL=facebook/nllb-200-distilled-600M - CHUNK_BY_SENTENCES=True - CHUNK_MAX_TOKENS=400 - CHUNK_OVERLAP_SENTS=1 - CLEAN_ARTICLE=1 - - DEVICE=cuda + - DEVICE=cuda # GPU - PYTHONUNBUFFERED=1 - HF_HOME=/root/.cache/huggingface - TOKENIZERS_PARALLELISM=false @@ -101,13 +104,52 @@ services: - NVIDIA_VISIBLE_DEVICES=all - NVIDIA_DRIVER_CAPABILITIES=compute,utility volumes: - - /datos/rss/hf_cache:/root/.cache/huggingface + # Cache de modelos HF dentro del proyecto + - ./hf_cache:/root/.cache/huggingface depends_on: db: condition: service_healthy restart: always gpus: all + # --- Worker de traducción en CPU: SOLO procesa pendientes --- + translator_cpu: + build: + context: . + args: + TORCH_CUDA: cu121 + container_name: rss_translator_cpu + command: bash -lc "python translation_worker.py" + environment: + - DB_HOST=db + - DB_PORT=5432 + - DB_NAME=${DB_NAME} + - DB_USER=${DB_USER} + - DB_PASS=${DB_PASS} + - TARGET_LANGS=es + - TRANSLATOR_BATCH=8 # batch más pequeño para CPU + - ENQUEUE=0 # NO encola nuevas traducciones + - TRANSLATOR_SLEEP_IDLE=5 + - MAX_SRC_TOKENS=680 + - MAX_NEW_TOKENS=400 + - NUM_BEAMS_TITLE=1 + - NUM_BEAMS_BODY=1 + - UNIVERSAL_MODEL=facebook/nllb-200-distilled-600M + - CHUNK_BY_SENTENCES=True + - CHUNK_MAX_TOKENS=400 + - CHUNK_OVERLAP_SENTS=1 + - CLEAN_ARTICLE=1 + - DEVICE=cpu # Fuerza CPU + - PYTHONUNBUFFERED=1 + - HF_HOME=/root/.cache/huggingface + - TOKENIZERS_PARALLELISM=false + volumes: + - ./hf_cache:/root/.cache/huggingface + depends_on: + db: + condition: service_healthy + restart: always + ner: build: context: . @@ -141,7 +183,7 @@ services: - DB_NAME=${DB_NAME} - DB_USER=${DB_USER} - DB_PASS=${DB_PASS} - - EMB_MODEL=sentence-transformers/paraphrase-multilingual-mpnet-base-v2 + - EMB_MODEL=sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2 - EMB_BATCH=256 - EMB_SLEEP_IDLE=5 - EMB_LANGS=es @@ -151,7 +193,8 @@ services: - HF_HOME=/root/.cache/huggingface - TOKENIZERS_PARALLELISM=false volumes: - - /datos/rss/hf_cache:/root/.cache/huggingface + # Reutiliza el mismo cache HF + - ./hf_cache:/root/.cache/huggingface depends_on: db: condition: service_healthy diff --git a/estructura.sql b/estructura.sql deleted file mode 100644 index 51a1a5f..0000000 --- a/estructura.sql +++ /dev/null @@ -1,364 +0,0 @@ --- --- PostgreSQL database dump --- - --- Dumped from database version 16.9 (Ubuntu 16.9-0ubuntu0.24.04.1) --- Dumped by pg_dump version 16.9 (Ubuntu 16.9-0ubuntu0.24.04.1) - -SET statement_timeout = 0; -SET lock_timeout = 0; -SET idle_in_transaction_session_timeout = 0; -SET client_encoding = 'UTF8'; -SET standard_conforming_strings = on; -SELECT pg_catalog.set_config('search_path', '', false); -SET check_function_bodies = false; -SET xmloption = content; -SET client_min_messages = warning; -SET row_security = off; - --- --- Name: noticias_tsv_trigger(); Type: FUNCTION; Schema: public; Owner: rss --- - -CREATE FUNCTION public.noticias_tsv_trigger() RETURNS trigger - LANGUAGE plpgsql - AS $$ begin new.tsv := setweight(to_tsvector('spanish', coalesce(new.titulo,'')), 'A') || setweight(to_tsvector('spanish', coalesce(new.resumen,'')), 'B'); return new; end $$; - - -ALTER FUNCTION public.noticias_tsv_trigger() OWNER TO rss; - -SET default_tablespace = ''; - -SET default_table_access_method = heap; - --- --- Name: categorias; Type: TABLE; Schema: public; Owner: rss --- - -CREATE TABLE public.categorias ( - id integer NOT NULL, - nombre character varying(100) NOT NULL -); - - -ALTER TABLE public.categorias OWNER TO rss; - --- --- Name: categorias_id_seq; Type: SEQUENCE; Schema: public; Owner: rss --- - -CREATE SEQUENCE public.categorias_id_seq - AS integer - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; - - -ALTER SEQUENCE public.categorias_id_seq OWNER TO rss; - --- --- Name: categorias_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: rss --- - -ALTER SEQUENCE public.categorias_id_seq OWNED BY public.categorias.id; - - --- --- Name: continentes; Type: TABLE; Schema: public; Owner: rss --- - -CREATE TABLE public.continentes ( - id integer NOT NULL, - nombre character varying(50) NOT NULL -); - - -ALTER TABLE public.continentes OWNER TO rss; - --- --- Name: continentes_id_seq; Type: SEQUENCE; Schema: public; Owner: rss --- - -CREATE SEQUENCE public.continentes_id_seq - AS integer - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; - - -ALTER SEQUENCE public.continentes_id_seq OWNER TO rss; - --- --- Name: continentes_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: rss --- - -ALTER SEQUENCE public.continentes_id_seq OWNED BY public.continentes.id; - - --- --- Name: feeds; Type: TABLE; Schema: public; Owner: rss --- - -CREATE TABLE public.feeds ( - id integer NOT NULL, - nombre character varying(255), - descripcion text, - url text NOT NULL, - categoria_id integer, - pais_id integer, - idioma character(2), - activo boolean DEFAULT true, - fallos integer DEFAULT 0, - last_etag character varying(255), - last_modified character varying(255) -); - - -ALTER TABLE public.feeds OWNER TO rss; - --- --- Name: feeds_id_seq; Type: SEQUENCE; Schema: public; Owner: rss --- - -CREATE SEQUENCE public.feeds_id_seq - AS integer - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; - - -ALTER SEQUENCE public.feeds_id_seq OWNER TO rss; - --- --- Name: feeds_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: rss --- - -ALTER SEQUENCE public.feeds_id_seq OWNED BY public.feeds.id; - - --- --- Name: noticias; Type: TABLE; Schema: public; Owner: rss --- - -CREATE TABLE public.noticias ( - id character varying(32) NOT NULL, - titulo text, - resumen text, - url text NOT NULL, - fecha timestamp without time zone, - imagen_url text, - categoria_id integer, - pais_id integer, - tsv tsvector -); - - -ALTER TABLE public.noticias OWNER TO rss; - --- --- Name: paises; Type: TABLE; Schema: public; Owner: rss --- - -CREATE TABLE public.paises ( - id integer NOT NULL, - nombre character varying(100) NOT NULL, - continente_id integer -); - - -ALTER TABLE public.paises OWNER TO rss; - --- --- Name: paises_id_seq; Type: SEQUENCE; Schema: public; Owner: rss --- - -CREATE SEQUENCE public.paises_id_seq - AS integer - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; - - -ALTER SEQUENCE public.paises_id_seq OWNER TO rss; - --- --- Name: paises_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: rss --- - -ALTER SEQUENCE public.paises_id_seq OWNED BY public.paises.id; - - --- --- Name: categorias id; Type: DEFAULT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.categorias ALTER COLUMN id SET DEFAULT nextval('public.categorias_id_seq'::regclass); - - --- --- Name: continentes id; Type: DEFAULT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.continentes ALTER COLUMN id SET DEFAULT nextval('public.continentes_id_seq'::regclass); - - --- --- Name: feeds id; Type: DEFAULT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.feeds ALTER COLUMN id SET DEFAULT nextval('public.feeds_id_seq'::regclass); - - --- --- Name: paises id; Type: DEFAULT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.paises ALTER COLUMN id SET DEFAULT nextval('public.paises_id_seq'::regclass); - - --- --- Name: categorias categorias_nombre_key; Type: CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.categorias - ADD CONSTRAINT categorias_nombre_key UNIQUE (nombre); - - --- --- Name: categorias categorias_pkey; Type: CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.categorias - ADD CONSTRAINT categorias_pkey PRIMARY KEY (id); - - --- --- Name: continentes continentes_nombre_key; Type: CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.continentes - ADD CONSTRAINT continentes_nombre_key UNIQUE (nombre); - - --- --- Name: continentes continentes_pkey; Type: CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.continentes - ADD CONSTRAINT continentes_pkey PRIMARY KEY (id); - - --- --- Name: feeds feeds_pkey; Type: CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.feeds - ADD CONSTRAINT feeds_pkey PRIMARY KEY (id); - - --- --- Name: feeds feeds_url_key; Type: CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.feeds - ADD CONSTRAINT feeds_url_key UNIQUE (url); - - --- --- Name: noticias noticias_pkey; Type: CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.noticias - ADD CONSTRAINT noticias_pkey PRIMARY KEY (id); - - --- --- Name: noticias noticias_url_key; Type: CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.noticias - ADD CONSTRAINT noticias_url_key UNIQUE (url); - - --- --- Name: paises paises_nombre_key; Type: CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.paises - ADD CONSTRAINT paises_nombre_key UNIQUE (nombre); - - --- --- Name: paises paises_pkey; Type: CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.paises - ADD CONSTRAINT paises_pkey PRIMARY KEY (id); - - --- --- Name: noticias_tsv_idx; Type: INDEX; Schema: public; Owner: rss --- - -CREATE INDEX noticias_tsv_idx ON public.noticias USING gin (tsv); - - --- --- Name: noticias tsvectorupdate; Type: TRIGGER; Schema: public; Owner: rss --- - -CREATE TRIGGER tsvectorupdate BEFORE INSERT ON public.noticias FOR EACH ROW EXECUTE FUNCTION public.noticias_tsv_trigger(); - - --- --- Name: feeds feeds_categoria_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.feeds - ADD CONSTRAINT feeds_categoria_id_fkey FOREIGN KEY (categoria_id) REFERENCES public.categorias(id) ON DELETE SET NULL; - - --- --- Name: feeds feeds_pais_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.feeds - ADD CONSTRAINT feeds_pais_id_fkey FOREIGN KEY (pais_id) REFERENCES public.paises(id) ON DELETE SET NULL; - - --- --- Name: noticias noticias_categoria_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.noticias - ADD CONSTRAINT noticias_categoria_id_fkey FOREIGN KEY (categoria_id) REFERENCES public.categorias(id) ON DELETE SET NULL; - - --- --- Name: noticias noticias_pais_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.noticias - ADD CONSTRAINT noticias_pais_id_fkey FOREIGN KEY (pais_id) REFERENCES public.paises(id) ON DELETE SET NULL; - - --- --- Name: paises paises_continente_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: rss --- - -ALTER TABLE ONLY public.paises - ADD CONSTRAINT paises_continente_id_fkey FOREIGN KEY (continente_id) REFERENCES public.continentes(id) ON DELETE SET NULL; - - --- --- PostgreSQL database dump complete --- - diff --git a/init-db/08-embeddings.sql b/init-db/08-embeddings.sql index 5b9a821..88e03cf 100644 --- a/init-db/08-embeddings.sql +++ b/init-db/08-embeddings.sql @@ -29,7 +29,7 @@ SELECT te.dim, te.embedding AS vec FROM traduccion_embeddings te -WHERE te.model = 'sentence-transformers/paraphrase-multilingual-mpnet-base-v2'; +WHERE te.model = 'sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2'; CREATE TABLE IF NOT EXISTS related_noticias ( traduccion_id INT NOT NULL REFERENCES traducciones(id) ON DELETE CASCADE, diff --git a/init-db/09-eventos.sql b/init-db/09-eventos.sql index 39c6443..660910b 100644 --- a/init-db/09-eventos.sql +++ b/init-db/09-eventos.sql @@ -31,11 +31,13 @@ ALTER TABLE traducciones -- --------------------------------------------- -- 3. TABLA RELACIÓN EVENTO <-> NOTICIA <-> TRADUCCIÓN +-- (tipos alineados con noticias.id (VARCHAR(32)) +-- y traducciones.id (INTEGER)) -- --------------------------------------------- CREATE TABLE IF NOT EXISTS eventos_noticias ( - evento_id BIGINT NOT NULL REFERENCES eventos(id) ON DELETE CASCADE, - noticia_id CHAR(32) NOT NULL REFERENCES noticias(id) ON DELETE CASCADE, - traduccion_id BIGINT NOT NULL REFERENCES traducciones(id) ON DELETE CASCADE, + evento_id BIGINT NOT NULL REFERENCES eventos(id) ON DELETE CASCADE, + noticia_id VARCHAR(32) NOT NULL REFERENCES noticias(id) ON DELETE CASCADE, + traduccion_id INTEGER NOT NULL REFERENCES traducciones(id) ON DELETE CASCADE, PRIMARY KEY (evento_id, traduccion_id) ); diff --git a/scheduler.py b/scheduler.py index 255173f..a47a80b 100644 --- a/scheduler.py +++ b/scheduler.py @@ -24,7 +24,7 @@ if __name__ == '__main__': scheduler.add_job( fetch_and_store_all, "interval", - minutes=10, + minutes=3, id="rss_job", next_run_time=datetime.utcnow() + timedelta(seconds=10) ) diff --git a/translation_worker.py b/translation_worker.py index fb16ab7..1725527 100644 --- a/translation_worker.py +++ b/translation_worker.py @@ -360,7 +360,7 @@ def _token_chunks(tokenizer, text: str, max_tokens: int) -> List[str]: return [text] chunks = [] for i in range(0, len(ids), max_tokens): - sub = ids[i : i + max_tokens] + sub = ids[i: i + max_tokens] piece = tokenizer.decode(sub, skip_special_tokens=True, clean_up_tokenization_spaces=True) if piece.strip(): chunks.append(piece.strip()) @@ -413,6 +413,90 @@ def _forced_bos_id(tokenizer: AutoTokenizer, model: AutoModelForSeq2SeqLM, tgt_c return getattr(tokenizer, "eos_token_id", None) or getattr(tokenizer, "bos_token_id", None) or 0 +@torch.inference_mode() +def _translate_texts_simple( + src_lang: str, + tgt_lang: str, + texts: List[str], + num_beams: int = 1, + _tries: int = 0, +) -> List[str]: + if not texts: + return [] + + cleaned = [(t or "").strip() for t in texts] + if all(not t for t in cleaned): + return ["" for _ in cleaned] + + tok, mdl, device = get_universal_components() + + src_code = map_to_nllb(src_lang) or "eng_Latn" + tgt_code = map_to_nllb(tgt_lang) or "spa_Latn" + + try: + tok.src_lang = src_code + except Exception: + pass + + forced_bos = _forced_bos_id(tok, mdl, tgt_code) + safe_len = _safe_src_len(tok) + + try: + autocast_ctx = ( + torch.amp.autocast("cuda", dtype=torch.float16) + if device.type == "cuda" + else contextlib.nullcontext() + ) + + enc = tok( + cleaned, + return_tensors="pt", + padding=True, + truncation=True, + max_length=safe_len, + ) + enc = {k: v.to(device) for k, v in enc.items()} + + gen_kwargs = dict( + forced_bos_token_id=forced_bos, + max_new_tokens=MAX_NEW_TOKENS, + num_beams=max(1, int(num_beams)), + do_sample=False, + use_cache=False, + ) + if int(num_beams) > 1: + gen_kwargs["early_stopping"] = True + + with autocast_ctx: + generated = mdl.generate(**enc, **gen_kwargs) + + outs = tok.batch_decode(generated, skip_special_tokens=True) + outs = [o.strip() for o in outs] + + del enc, generated + if device.type == "cuda": + _free_cuda() + + return outs + + except Exception as e: + if device.type == "cuda" and _is_cuda_mem_error(e) and _tries < 2: + LOG.warning("CUDA OOM/allocator (batch): intento de recuperación %d. Detalle: %s", _tries + 1, e) + global _MODEL, _DEVICE, _CUDA_DISABLED + _CUDA_DISABLED = True + try: + if _MODEL is not None: + del _MODEL + except Exception: + pass + _free_cuda() + _MODEL = None + _DEVICE = None + time.sleep(1.0) + return _translate_texts_simple(src_lang, tgt_lang, texts, num_beams=num_beams, _tries=_tries + 1) + raise + + @torch.inference_mode() def translate_text(src_lang: str, tgt_lang: str, text: str, num_beams: int = 1, _tries: int = 0) -> str: if not text or not text.strip(): @@ -495,7 +579,7 @@ def _pack_sentences_to_token_chunks( ids = tokenizer(s, add_special_tokens=False).input_ids step = max_tokens for i in range(0, len(ids), step): - sub = tokenizer.decode(ids[i : i + step], skip_special_tokens=True) + sub = tokenizer.decode(ids[i: i + step], skip_special_tokens=True) if cur: chunks.append(cur) cur = [] @@ -536,6 +620,75 @@ def _smart_concatenate(parts: List[str], tail_window: int = 120) -> str: return out +def translate_articles_full_batch( + src_lang: str, + tgt_lang: str, + texts: List[str], + num_beams: int, +) -> List[str]: + if not texts: + return [] + + if not CHUNK_BY_SENTENCES: + return _translate_texts_simple(src_lang, tgt_lang, texts, num_beams=num_beams) + + tok, _, _ = get_universal_components() + safe_len = _safe_src_len(tok) + max_chunk_tokens = min(CHUNK_MAX_TOKENS, safe_len) + + all_chunk_texts: List[str] = [] + per_article_chunk_ids: List[List[int]] = [] + + for text in texts: + text = (text or "").strip() + if not text: + per_article_chunk_ids.append([]) + continue + + sents = split_into_sentences(text) + if not sents: + per_article_chunk_ids.append([]) + continue + + chunks_sents = _pack_sentences_to_token_chunks( + tok, + sents, + max_tokens=max_chunk_tokens, + overlap_sents=CHUNK_OVERLAP_SENTS, + ) + + ids_for_this_article: List[int] = [] + for group in chunks_sents: + chunk_text = " ".join(group).strip() + if not chunk_text: + continue + idx = len(all_chunk_texts) + all_chunk_texts.append(chunk_text) + ids_for_this_article.append(idx) + + per_article_chunk_ids.append(ids_for_this_article) + + if not all_chunk_texts: + return ["" for _ in texts] + + translated_chunks = _translate_texts_simple( + src_lang, + tgt_lang, + all_chunk_texts, + num_beams=num_beams, + ) + + outs: List[str] = [] + for chunk_ids in per_article_chunk_ids: + if not chunk_ids: + outs.append("") + continue + parts = [translated_chunks[i] for i in chunk_ids] + outs.append(_smart_concatenate([p for p in parts if p])) + + return outs + + def translate_article_full( src_lang: str, tgt_lang: str, @@ -570,9 +723,15 @@ def translate_article_full( def process_batch(conn, rows): + batch_size = len(rows) + LOG.info("Iniciando traducción de batch con %d filas…", batch_size) + t0 = time.time() + done_rows = [] error_rows = [] + enriched_rows = [] + for r in rows: tr_id = r["tr_id"] lang_to = normalize_lang(r["lang_to"], "es") or "es" @@ -581,23 +740,54 @@ def process_batch(conn, rows): title = (r["titulo"] or "").strip() body = (r["resumen"] or "").strip() - if (map_to_nllb(lang_from) or "eng_Latn") == (map_to_nllb(lang_to) or "spa_Latn"): + src_code = map_to_nllb(lang_from) or "eng_Latn" + tgt_code = map_to_nllb(lang_to) or "spa_Latn" + + if src_code == tgt_code: done_rows.append((title, body, lang_from, tr_id)) continue + enriched_rows.append( + { + "tr_id": tr_id, + "lang_from": lang_from, + "lang_to": lang_to, + "title": title, + "body": body, + } + ) + + from collections import defaultdict + + groups = defaultdict(list) + for er in enriched_rows: + key = (er["lang_from"], er["lang_to"]) + groups[key].append(er) + + for (lang_from, lang_to), items in groups.items(): + titles = [it["title"] for it in items] + bodies = [it["body"] for it in items] + try: - title_tr = translate_text(lang_from, lang_to, title, num_beams=NUM_BEAMS_TITLE) if title else "" - body_tr = translate_article_full(lang_from, lang_to, body, num_beams=NUM_BEAMS_BODY) if body else "" + titles_tr = _translate_texts_simple(lang_from, lang_to, titles, num_beams=NUM_BEAMS_TITLE) + bodies_tr = translate_articles_full_batch(lang_from, lang_to, bodies, num_beams=NUM_BEAMS_BODY) - if _norm(title_tr) == _norm(title): - title_tr = "" - if _norm(body_tr) == _norm(body): - body_tr = "" + for it, t_tr, b_tr in zip(items, titles_tr, bodies_tr): + title_orig = it["title"] + body_orig = it["body"] + + if _norm(t_tr) == _norm(title_orig): + t_tr = "" + if _norm(b_tr) == _norm(body_orig): + b_tr = "" + + done_rows.append((t_tr, b_tr, lang_from, it["tr_id"])) - done_rows.append((title_tr, body_tr, lang_from, tr_id)) except Exception as e: - LOG.exception("Error traduciendo fila") - error_rows.append((str(e)[:1500], tr_id)) + LOG.exception("Error traduciendo lote %s -> %s", lang_from, lang_to) + err_msg = str(e)[:1500] + for it in items: + error_rows.append((err_msg, it["tr_id"])) with conn.cursor() as cur: if done_rows: @@ -630,6 +820,28 @@ def process_batch(conn, rows): ) conn.commit() + dt = time.time() - t0 + try: + _, _, device = get_universal_components() + dev_label = device.type.upper() if device is not None else "UNK" + except Exception: + dev_label = "UNK" + + if batch_size > 0: + LOG.info( + "[%s] Batch de %d filas traducido en %.2f s (%.2f s/noticia)", + dev_label, + batch_size, + dt, + dt / batch_size, + ) + else: + LOG.info( + "[%s] Batch vacío, nada que traducir (%.2f s)", + dev_label, + dt, + ) + def main(): LOG.info(