diff --git a/app.py b/app.py index b158540..925e50f 100644 --- a/app.py +++ b/app.py @@ -1310,7 +1310,6 @@ def eventos_pais(): if pais_id: with conn.cursor(cursor_factory=extras.DictCursor) as cur: - # 1) Eventos que tienen al menos una traducción cuya noticia es de ese país cur.execute( """ SELECT @@ -1333,7 +1332,6 @@ def eventos_pais(): ) eventos = cur.fetchall() - # 2) Total de eventos distintos para ese país cur.execute( """ SELECT COUNT(DISTINCT e.id) @@ -1346,7 +1344,6 @@ def eventos_pais(): ) total_eventos = cur.fetchone()[0] if cur.rowcount else 0 - # 3) Cargar noticias asociadas a esos eventos (desde traducciones + noticias) if eventos: evento_ids = [e["id"] for e in eventos] @@ -1381,10 +1378,8 @@ def eventos_pais(): for r in rows: noticias_por_evento.setdefault(r["evento_id"], []).append(r) - # Nombre del país (todos los eventos en esta vista son del mismo país filtrado) pais_nombre = eventos[0]["pais_nombre"] else: - # Si no hay eventos, al menos sacamos el nombre del país desde la lista for p in paises: if p["id"] == int(pais_id): pais_nombre = p["nombre"] diff --git a/cluster_worker.py b/cluster_worker.py index c808d82..ce1b0fe 100644 --- a/cluster_worker.py +++ b/cluster_worker.py @@ -34,7 +34,7 @@ EVENT_DIST_THRESHOLD = float(os.environ.get("EVENT_DIST_THRESHOLD", "0.25")) EMB_MODEL = os.environ.get( "EMB_MODEL", - "sentence-transformers/paraphrase-multilingual-mpnet-base-v2", + "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", ) @@ -43,12 +43,6 @@ def get_conn(): def ensure_schema(conn): - """ - Asumimos que las tablas y columnas (eventos, traducciones.evento_id, - eventos_noticias, función/trigger) ya existen por los scripts init-db. - Aquí solo nos aseguramos de que existan ciertos índices clave - (idempotente). - """ with conn.cursor() as cur: cur.execute( """ @@ -66,10 +60,6 @@ def ensure_schema(conn): def fetch_pending_traducciones(conn) -> List[int]: - """ - Traducciones con status 'done', sin evento asignado - y que ya tienen embedding en traduccion_embeddings para EMB_MODEL. - """ with conn.cursor() as cur: cur.execute( """ @@ -91,10 +81,6 @@ def fetch_pending_traducciones(conn) -> List[int]: def fetch_embeddings_for(conn, tr_ids: List[int]) -> Dict[int, np.ndarray]: - """ - Devuelve un diccionario {traduccion_id: vector_numpy} - leyendo de traduccion_embeddings.embedding para el EMB_MODEL. - """ if not tr_ids: return {} @@ -122,10 +108,6 @@ def fetch_embeddings_for(conn, tr_ids: List[int]) -> Dict[int, np.ndarray]: def fetch_centroids(conn) -> List[Dict[str, Any]]: - """ - Carga todos los centroides actuales desde eventos. - Solo usamos campos de clustering: id, centroid, total_traducciones. - """ with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute( """ @@ -142,7 +124,6 @@ def fetch_centroids(conn) -> List[Dict[str, Any]]: raw = r["centroid"] cnt = int(r["total_traducciones"] or 1) if not isinstance(raw, (list, tuple)): - # centroid se almacena como JSONB array → en Python suele llegar como list continue arr = np.array([float(x or 0.0) for x in raw], dtype="float32") if arr.size == 0: @@ -167,12 +148,6 @@ def cosine_distance(a: np.ndarray, b: np.ndarray) -> float: def fetch_traduccion_info(conn, tr_id: int) -> Optional[Dict[str, Any]]: - """ - Devuelve info básica para un tr_id: - - noticia_id - - fecha de la noticia - - un título “representativo” para el evento (traducido u original). - """ with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute( """ @@ -199,16 +174,13 @@ def fetch_traduccion_info(conn, tr_id: int) -> Optional[Dict[str, Any]]: def _insert_evento_noticia(cur, evento_id: int, info: Dict[str, Any]) -> None: - """ - Inserta relación en eventos_noticias (idempotente). - """ if not info or not info.get("noticia_id"): return cur.execute( """ INSERT INTO eventos_noticias (evento_id, noticia_id, traduccion_id) VALUES (%s, %s, %s) - ON CONFLICT (evento_id, traduccion_id) DO NOTHING; + ON CONFLICT (evento_id, noticia_id) DO NOTHING; """, (evento_id, info["noticia_id"], info["traduccion_id"]), ) @@ -220,20 +192,11 @@ def assign_to_event( vec: np.ndarray, centroids: List[Dict[str, Any]], ) -> None: - """ - Asigna una traducción a un evento existente (si distancia <= umbral) - o crea un evento nuevo con este vector como centroide. - - Además: - - Actualiza fecha_inicio, fecha_fin, n_noticias del evento. - - Rellena eventos_noticias (evento_id, noticia_id, traduccion_id). - """ if vec is None or vec.size == 0: return info = fetch_traduccion_info(conn, tr_id) - # Si no hay centroides todavía → primer evento if not centroids: centroid_list = [float(x) for x in vec.tolist()] with conn.cursor() as cur: @@ -255,7 +218,6 @@ def assign_to_event( ), ) else: - # Fallback mínimo si no hay info de noticia cur.execute( """ INSERT INTO eventos (centroid, total_traducciones) @@ -267,19 +229,16 @@ def assign_to_event( new_id = cur.fetchone()[0] - # Vincular traducción al evento cur.execute( "UPDATE traducciones SET evento_id = %s WHERE id = %s;", (new_id, tr_id), ) - # Rellenar tabla de relación _insert_evento_noticia(cur, new_id, info or {}) centroids.append({"id": new_id, "vec": vec.copy(), "n": 1}) return - # Buscar el centroide más cercano best_idx: Optional[int] = None best_dist: float = 1.0 @@ -290,7 +249,6 @@ def assign_to_event( best_idx = i with conn.cursor() as cur: - # Asignar a evento existente si está por debajo del umbral if best_idx is not None and best_dist <= EVENT_DIST_THRESHOLD: c = centroids[best_idx] n_old = c["n"] @@ -323,7 +281,6 @@ def assign_to_event( ), ) else: - # Sin info de fecha: solo actualizamos centroid/contador cur.execute( """ UPDATE eventos @@ -334,7 +291,6 @@ def assign_to_event( (Json(centroid_list), c["id"]), ) - # Vincular traducción y relación cur.execute( "UPDATE traducciones SET evento_id = %s WHERE id = %s;", (c["id"], tr_id), @@ -343,7 +299,6 @@ def assign_to_event( return - # Si no hay evento adecuado → crear uno nuevo centroid_list = [float(x) for x in vec.tolist()] if info and info.get("fecha"): diff --git a/docker-compose.yml b/docker-compose.yml index 33d8f22..d61cf79 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -238,6 +238,8 @@ services: - DB_NAME=${DB_NAME} - DB_USER=${DB_USER} - DB_PASS=${DB_PASS} + - EVENT_DIST_THRESHOLD=0.35 + depends_on: db: condition: service_healthy diff --git a/embeddings_worker.py b/embeddings_worker.py index 105050e..3d2571b 100644 --- a/embeddings_worker.py +++ b/embeddings_worker.py @@ -23,7 +23,7 @@ DB = dict( EMB_MODEL = os.environ.get( "EMB_MODEL", - "sentence-transformers/paraphrase-multilingual-mpnet-base-v2", + "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", ) EMB_BATCH = int(os.environ.get("EMB_BATCH", "128")) SLEEP_IDLE = float(os.environ.get("EMB_SLEEP_IDLE", "5.0")) diff --git a/init-db/09-eventos.sql b/init-db/09-eventos.sql index 660910b..b70aa00 100644 --- a/init-db/09-eventos.sql +++ b/init-db/09-eventos.sql @@ -1,51 +1,27 @@ BEGIN; --- ============================================= --- Sistema de eventos (clustering incremental) --- ============================================= - --- --------------------------------------------- --- 1. TABLA DE EVENTOS (CLUSTERS) --- --------------------------------------------- CREATE TABLE IF NOT EXISTS eventos ( id BIGSERIAL PRIMARY KEY, creado_en TIMESTAMPTZ NOT NULL DEFAULT NOW(), actualizado_en TIMESTAMPTZ NOT NULL DEFAULT NOW(), - - -- Datos "semánticos" del evento (para la web) titulo TEXT, fecha_inicio TIMESTAMPTZ, fecha_fin TIMESTAMPTZ, n_noticias INTEGER NOT NULL DEFAULT 0, - - -- Datos de clustering centroid JSONB NOT NULL, total_traducciones INTEGER NOT NULL DEFAULT 1 ); --- --------------------------------------------- --- 2. COLUMNA evento_id EN TRADUCCIONES --- --------------------------------------------- ALTER TABLE traducciones ADD COLUMN IF NOT EXISTS evento_id BIGINT REFERENCES eventos(id); --- --------------------------------------------- --- 3. TABLA RELACIÓN EVENTO <-> NOTICIA <-> TRADUCCIÓN --- (tipos alineados con noticias.id (VARCHAR(32)) --- y traducciones.id (INTEGER)) --- --------------------------------------------- CREATE TABLE IF NOT EXISTS eventos_noticias ( evento_id BIGINT NOT NULL REFERENCES eventos(id) ON DELETE CASCADE, noticia_id VARCHAR(32) NOT NULL REFERENCES noticias(id) ON DELETE CASCADE, traduccion_id INTEGER NOT NULL REFERENCES traducciones(id) ON DELETE CASCADE, - PRIMARY KEY (evento_id, traduccion_id) + PRIMARY KEY (evento_id, noticia_id) ); --- --------------------------------------------- --- 4. ÍNDICES ÚTILES --- --------------------------------------------- - --- Consultar traducciones por evento CREATE INDEX IF NOT EXISTS idx_traducciones_evento ON traducciones(evento_id); @@ -55,11 +31,9 @@ CREATE INDEX IF NOT EXISTS idx_traducciones_evento_fecha CREATE INDEX IF NOT EXISTS idx_trad_id ON traducciones(id); --- Ordenar eventos por fecha de inicio CREATE INDEX IF NOT EXISTS idx_eventos_fecha_inicio ON eventos (fecha_inicio DESC NULLS LAST); --- Relación evento <-> noticia / traducción CREATE INDEX IF NOT EXISTS idx_eventos_noticias_evento ON eventos_noticias (evento_id); @@ -69,9 +43,6 @@ CREATE INDEX IF NOT EXISTS idx_eventos_noticias_noticia CREATE INDEX IF NOT EXISTS idx_eventos_noticias_traduccion ON eventos_noticias (traduccion_id); --- --------------------------------------------- --- 5. TRIGGER PARA actualizar "actualizado_en" --- --------------------------------------------- CREATE OR REPLACE FUNCTION actualizar_evento_modificado() RETURNS TRIGGER AS $$ BEGIN diff --git a/templates/base.html b/templates/base.html index c01c8a0..1864556 100644 --- a/templates/base.html +++ b/templates/base.html @@ -94,26 +94,6 @@ {% block content %}{% endblock %} - - diff --git a/templates/eventos_pais.html b/templates/eventos_pais.html index a46ad7c..00b9e31 100644 --- a/templates/eventos_pais.html +++ b/templates/eventos_pais.html @@ -58,6 +58,18 @@ {% for e in eventos %} {% set lista = noticias_por_evento.get(e.id) or [] %} {% set primera = lista[0] if lista else None %} + {% set titulo_evento = e.titulo %} + {% if not titulo_evento %} + {% if primera %} + {% if primera.titulo_trad %} + {% set titulo_evento = primera.titulo_trad %} + {% else %} + {% set titulo_evento = primera.titulo_orig %} + {% endif %} + {% else %} + {% set titulo_evento = 'Evento' %} + {% endif %} + {% endif %}
  • {% if primera and primera.imagen_url %} @@ -70,7 +82,7 @@

    - {{ e.titulo or (primera.titulo_trad or primera.titulo_orig if primera else 'Evento') }} + {{ titulo_evento }} {% if e.n_noticias %} {{ e.n_noticias }} noticias @@ -88,7 +100,7 @@ {% endif %} {% endif %} {% if e.fecha_fin and e.fecha_fin != e.fecha_inicio %} - – + – {% if e.fecha_fin is string %} {{ e.fecha_fin }} {% else %} diff --git a/translation_worker.py b/translation_worker.py index 1725527..39a33fe 100644 --- a/translation_worker.py +++ b/translation_worker.py @@ -105,6 +105,8 @@ CHUNK_BY_SENTENCES = _env_bool("CHUNK_BY_SENTENCES", default=True) CHUNK_MAX_TOKENS = _env_int("CHUNK_MAX_TOKENS", default=900) CHUNK_OVERLAP_SENTS = _env_int("CHUNK_OVERLAP_SENTS", default=0) +IDENTITY_PAISES_ES = _env_int("IDENTITY_PAISES_ES", default=58) + _ABBR = ("Sr", "Sra", "Dr", "Dra", "Ing", "Lic", "pág", "etc") _ABBR_MARK = "§" @@ -216,6 +218,8 @@ def ensure_indexes(conn): def ensure_pending(conn, lang_to: str, enqueue_limit: int): + if enqueue_limit <= 0: + return with conn.cursor() as cur: cur.execute( """ @@ -236,7 +240,44 @@ def ensure_pending(conn, lang_to: str, enqueue_limit: int): conn.commit() +def ensure_identity_spanish(conn, lang_to: str, enqueue_limit: int): + lang_to = normalize_lang(lang_to, "es") or "es" + if lang_to != "es": + return + if enqueue_limit <= 0: + return + + LOG.info( + "Creando traducciones identidad ES para pais_id=%s (hasta %s noticias)…", + IDENTITY_PAISES_ES, + enqueue_limit, + ) + + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO traducciones (noticia_id, lang_from, lang_to, titulo_trad, resumen_trad, status) + SELECT sub.id, 'es', %s, sub.titulo, sub.resumen, 'done' + FROM ( + SELECT n.id, n.titulo, n.resumen + FROM noticias n + LEFT JOIN traducciones t + ON t.noticia_id = n.id AND t.lang_to = %s + WHERE t.id IS NULL + AND n.pais_id = %s + ORDER BY n.fecha DESC NULLS LAST, n.id + LIMIT %s + ) AS sub; + """, + (lang_to, lang_to, IDENTITY_PAISES_ES, enqueue_limit), + ) + conn.commit() + + def fetch_pending_batch(conn, lang_to: str, batch_size: int): + if batch_size <= 0: + return [] + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute( """ @@ -865,7 +906,10 @@ def main(): ensure_indexes(conn) for lt in TARGET_LANGS: lt = normalize_lang(lt, "es") or "es" + + ensure_identity_spanish(conn, lt, ENQUEUE_MAX) ensure_pending(conn, lt, ENQUEUE_MAX) + while True: rows = fetch_pending_batch(conn, lt, BATCH_SIZE) if not rows: