diff --git a/app.py b/app.py index 6cdb04f..b158540 100644 --- a/app.py +++ b/app.py @@ -1291,6 +1291,121 @@ def restore_completo(): return redirect(url_for("dashboard")) +@app.route("/eventos_pais") +def eventos_pais(): + pais_id = request.args.get("pais_id") or None + page = max(int(request.args.get("page", 1) or 1), 1) + per_page = 30 + offset = (page - 1) * per_page + lang = (request.args.get("lang") or DEFAULT_TRANSLATION_LANG or DEFAULT_LANG).lower()[:5] + + with get_conn() as conn: + conn.autocommit = True + paises = get_paises(conn) + + eventos = [] + total_eventos = 0 + noticias_por_evento = {} + pais_nombre = None + + if pais_id: + with conn.cursor(cursor_factory=extras.DictCursor) as cur: + # 1) Eventos que tienen al menos una traducción cuya noticia es de ese país + cur.execute( + """ + SELECT + e.id, + e.titulo, + e.fecha_inicio, + e.fecha_fin, + e.n_noticias, + MAX(p.nombre) AS pais_nombre + FROM eventos e + JOIN traducciones t ON t.evento_id = e.id + JOIN noticias n ON n.id = t.noticia_id + JOIN paises p ON p.id = n.pais_id + WHERE n.pais_id = %s + GROUP BY e.id, e.titulo, e.fecha_inicio, e.fecha_fin, e.n_noticias + ORDER BY e.fecha_inicio DESC NULLS LAST, e.id DESC + LIMIT %s OFFSET %s; + """, + (int(pais_id), per_page, offset), + ) + eventos = cur.fetchall() + + # 2) Total de eventos distintos para ese país + cur.execute( + """ + SELECT COUNT(DISTINCT e.id) + FROM eventos e + JOIN traducciones t ON t.evento_id = e.id + JOIN noticias n ON n.id = t.noticia_id + WHERE n.pais_id = %s; + """, + (int(pais_id),), + ) + total_eventos = cur.fetchone()[0] if cur.rowcount else 0 + + # 3) Cargar noticias asociadas a esos eventos (desde traducciones + noticias) + if eventos: + evento_ids = [e["id"] for e in eventos] + + cur.execute( + """ + SELECT + t.evento_id, + n.id AS noticia_id, + n.url, + n.fecha, + n.imagen_url, + n.fuente_nombre, + n.titulo AS titulo_orig, + n.resumen AS resumen_orig, + t.id AS traduccion_id, + t.titulo_trad AS titulo_trad, + t.resumen_trad AS resumen_trad, + p.nombre AS pais_nombre + FROM traducciones t + JOIN noticias n ON n.id = t.noticia_id + LEFT JOIN paises p ON p.id = n.pais_id + WHERE t.evento_id = ANY(%s) + AND t.status = 'done' + AND t.lang_to = %s + ORDER BY t.evento_id, n.fecha DESC; + """, + (evento_ids, lang), + ) + rows = cur.fetchall() + + noticias_por_evento = {e["id"]: [] for e in eventos} + for r in rows: + noticias_por_evento.setdefault(r["evento_id"], []).append(r) + + # Nombre del país (todos los eventos en esta vista son del mismo país filtrado) + pais_nombre = eventos[0]["pais_nombre"] + else: + # Si no hay eventos, al menos sacamos el nombre del país desde la lista + for p in paises: + if p["id"] == int(pais_id): + pais_nombre = p["nombre"] + break + + total_pages = (total_eventos // per_page) + (1 if total_eventos % per_page else 0) + + return render_template( + "eventos_pais.html", + paises=paises, + eventos=eventos, + noticias_por_evento=noticias_por_evento, + pais_id=int(pais_id) if pais_id else None, + pais_nombre=pais_nombre, + total_eventos=total_eventos, + total_pages=total_pages, + page=page, + lang=lang, + ) + + if __name__ == "__main__": app.run(host="0.0.0.0", port=8001, debug=True) diff --git a/cluster_worker.py b/cluster_worker.py new file mode 100644 index 0000000..98438ff --- /dev/null +++ b/cluster_worker.py @@ -0,0 +1,309 @@ +import os +import time +import logging +from typing import List, Dict, Any, Optional + +import numpy as np +import psycopg2 +import psycopg2.extras + +logging.basicConfig( + level=logging.INFO, + format='[eventos] %(asctime)s %(levelname)s: %(message)s' +) +log = logging.getLogger(__name__) + +DB = dict( + host=os.environ.get("DB_HOST", "localhost"), + port=int(os.environ.get("DB_PORT", 5432)), + dbname=os.environ.get("DB_NAME", "rss"), + user=os.environ.get("DB_USER", "rss"), + password=os.environ.get("DB_PASS", "x"), +) + +EVENT_LANGS = [ + s.strip().lower() + for s in os.environ.get("EVENT_LANGS", "es").split(",") + if s.strip() +] + +EVENT_BATCH_IDS = int(os.environ.get("EVENT_BATCH_IDS", "200")) +EVENT_SLEEP_IDLE = float(os.environ.get("EVENT_SLEEP_IDLE", "5.0")) +EVENT_DIST_THRESHOLD = float(os.environ.get("EVENT_DIST_THRESHOLD", "0.25")) + + +def get_conn(): + return psycopg2.connect(**DB) + + +def ensure_schema(conn): + with conn.cursor() as cur: + cur.execute( + """ + CREATE TABLE IF NOT EXISTS eventos ( + id SERIAL PRIMARY KEY, + creado_en TIMESTAMP NOT NULL DEFAULT NOW(), + actualizado_en TIMESTAMP NOT NULL DEFAULT NOW(), + centroid JSONB NOT NULL, + total_traducciones INTEGER NOT NULL DEFAULT 1 + ); + """ + ) + cur.execute( + """ + ALTER TABLE traducciones + ADD COLUMN IF NOT EXISTS evento_id INTEGER REFERENCES eventos(id); + """ + ) + cur.execute( + """ + CREATE INDEX IF NOT EXISTS idx_traducciones_evento + ON traducciones(evento_id); + """ + ) + cur.execute( + """ + CREATE INDEX IF NOT EXISTS idx_traducciones_evento_fecha + ON traducciones(evento_id, noticia_id); + """ + ) + cur.execute( + """ + CREATE OR REPLACE FUNCTION actualizar_evento_modificado() + RETURNS TRIGGER AS $$ + BEGIN + NEW.actualizado_en = NOW(); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """ + ) + cur.execute("DROP TRIGGER IF EXISTS trg_evento_modificado ON eventos;") + cur.execute( + """ + CREATE TRIGGER trg_evento_modificado + BEFORE UPDATE ON eventos + FOR EACH ROW + EXECUTE FUNCTION actualizar_evento_modificado(); + """ + ) + conn.commit() + + +def fetch_pending_traducciones(conn) -> List[int]: + with conn.cursor() as cur: + cur.execute( + """ + SELECT t.id + FROM traducciones t + JOIN embeddings e ON e.traduccion_id = t.id + WHERE t.status = 'done' + AND t.evento_id IS NULL + AND t.lang_to = ANY(%s) + ORDER BY t.id DESC + LIMIT %s; + """, + (EVENT_LANGS, EVENT_BATCH_IDS), + ) + rows = cur.fetchall() + return [r[0] for r in rows] + + +def fetch_embeddings_for(conn, tr_ids: List[int]) -> Dict[int, np.ndarray]: + if not tr_ids: + return {} + with conn.cursor() as cur: + cur.execute( + """ + SELECT traduccion_id, vec + FROM embeddings + WHERE traduccion_id = ANY(%s); + """, + (tr_ids,), + ) + rows = cur.fetchall() + + out: Dict[int, np.ndarray] = {} + for tr_id, vec in rows: + if not vec: + continue + arr = np.array([float(x or 0.0) for x in vec], dtype="float32") + if arr.size == 0: + continue + out[int(tr_id)] = arr + return out + + +def fetch_centroids(conn) -> List[Dict[str, Any]]: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute( + """ + SELECT id, centroid, total_traducciones + FROM eventos + ORDER BY id; + """ + ) + rows = cur.fetchall() + + centroids: List[Dict[str, Any]] = [] + for r in rows: + cid = int(r["id"]) + raw = r["centroid"] + cnt = int(r["total_traducciones"] or 1) + if not isinstance(raw, (list, tuple)): + continue + arr = np.array([float(x or 0.0) for x in raw], dtype="float32") + if arr.size == 0: + continue + centroids.append({"id": cid, "vec": arr, "n": cnt}) + return centroids + + +def cosine_distance(a: np.ndarray, b: np.ndarray) -> float: + num = float(np.dot(a, b)) + da = float(np.linalg.norm(a)) + db = float(np.linalg.norm(b)) + denom = da * db + if denom <= 0.0: + return 1.0 + cos = num / denom + if cos > 1.0: + cos = 1.0 + if cos < -1.0: + cos = -1.0 + return 1.0 - cos + + +def assign_to_event( + conn, + tr_id: int, + vec: np.ndarray, + centroids: List[Dict[str, Any]], +) -> None: + from psycopg2.extras import Json + + if vec is None or vec.size == 0: + return + + if not centroids: + centroid_list = [float(x) for x in vec.tolist()] + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO eventos (centroid, total_traducciones) + VALUES (%s, %s) + RETURNING id; + """, + (Json(centroid_list), 1), + ) + new_id = cur.fetchone()[0] + cur.execute( + "UPDATE traducciones SET evento_id = %s WHERE id = %s;", + (new_id, tr_id), + ) + centroids.append({"id": new_id, "vec": vec.copy(), "n": 1}) + return + + best_idx: Optional[int] = None + best_dist: float = 1.0 + + for i, c in enumerate(centroids): + d = cosine_distance(vec, c["vec"]) + if d < best_dist: + best_dist = d + best_idx = i + + if best_idx is not None and best_dist <= EVENT_DIST_THRESHOLD: + c = centroids[best_idx] + n_old = c["n"] + new_n = n_old + 1 + new_vec = (c["vec"] * n_old + vec) / float(new_n) + + c["vec"] = new_vec + c["n"] = new_n + + centroid_list = [float(x) for x in new_vec.tolist()] + with conn.cursor() as cur: + cur.execute( + """ + UPDATE eventos + SET centroid = %s, + total_traducciones = total_traducciones + 1 + WHERE id = %s; + """, + (Json(centroid_list), c["id"]), + ) + cur.execute( + "UPDATE traducciones SET evento_id = %s WHERE id = %s;", + (c["id"], tr_id), + ) + return + + centroid_list = [float(x) for x in vec.tolist()] + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO eventos (centroid, total_traducciones) + VALUES (%s, %s) + RETURNING id; + """, + (Json(centroid_list), 1), + ) + new_id = cur.fetchone()[0] + cur.execute( + "UPDATE traducciones SET evento_id = %s WHERE id = %s;", + (new_id, tr_id), + ) + centroids.append({"id": new_id, "vec": vec.copy(), "n": 1}) + + +def main(): + log.info( + "Iniciando cluster_worker eventos " + "(EVENT_LANGS=%s, BATCH_IDS=%s, DIST_THRESHOLD=%.3f, SLEEP=%.1fs)", + ",".join(EVENT_LANGS), + EVENT_BATCH_IDS, + EVENT_DIST_THRESHOLD, + EVENT_SLEEP_IDLE, + ) + + while True: + try: + with get_conn() as conn: + ensure_schema(conn) + + pending_ids = fetch_pending_traducciones(conn) + if not pending_ids: + time.sleep(EVENT_SLEEP_IDLE) + continue + + log.info("Traducciones pendientes de asignar evento: %d", len(pending_ids)) + + emb_by_tr = fetch_embeddings_for(conn, pending_ids) + if not emb_by_tr: + log.warning("No se encontraron embeddings para las traducciones pendientes.") + time.sleep(EVENT_SLEEP_IDLE) + continue + + centroids = fetch_centroids(conn) + log.info("Centroides cargados: %d", len(centroids)) + + processed = 0 + for tr_id in pending_ids: + vec = emb_by_tr.get(tr_id) + if vec is None: + continue + assign_to_event(conn, tr_id, vec, centroids) + processed += 1 + + conn.commit() + log.info("Asignación de eventos completada. Traducciones procesadas: %d", processed) + + except Exception as e: + log.exception("Error en cluster_worker: %s", e) + time.sleep(EVENT_SLEEP_IDLE) + + +if __name__ == "__main__": + main() + diff --git a/docker-compose.yml b/docker-compose.yml index d81aec3..ed5d8f0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -182,6 +182,24 @@ services: condition: service_healthy restart: always + cluster: + build: + context: . + args: + TORCH_CUDA: cu121 + container_name: rss_cluster + command: bash -lc "python cluster_worker.py" + environment: + - DB_HOST=db + - DB_PORT=5432 + - DB_NAME=${DB_NAME} + - DB_USER=${DB_USER} + - DB_PASS=${DB_PASS} + depends_on: + db: + condition: service_healthy + restart: always + networks: default: name: rss_default diff --git a/init-db/09-eventos.sql b/init-db/09-eventos.sql new file mode 100644 index 0000000..39c6443 --- /dev/null +++ b/init-db/09-eventos.sql @@ -0,0 +1,89 @@ +BEGIN; + +-- ============================================= +-- Sistema de eventos (clustering incremental) +-- ============================================= + +-- --------------------------------------------- +-- 1. TABLA DE EVENTOS (CLUSTERS) +-- --------------------------------------------- +CREATE TABLE IF NOT EXISTS eventos ( + id BIGSERIAL PRIMARY KEY, + creado_en TIMESTAMPTZ NOT NULL DEFAULT NOW(), + actualizado_en TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + -- Datos "semánticos" del evento (para la web) + titulo TEXT, + fecha_inicio TIMESTAMPTZ, + fecha_fin TIMESTAMPTZ, + n_noticias INTEGER NOT NULL DEFAULT 0, + + -- Datos de clustering + centroid JSONB NOT NULL, + total_traducciones INTEGER NOT NULL DEFAULT 1 +); + +-- --------------------------------------------- +-- 2. COLUMNA evento_id EN TRADUCCIONES +-- --------------------------------------------- +ALTER TABLE traducciones + ADD COLUMN IF NOT EXISTS evento_id BIGINT REFERENCES eventos(id); + +-- --------------------------------------------- +-- 3. TABLA RELACIÓN EVENTO <-> NOTICIA <-> TRADUCCIÓN +-- --------------------------------------------- +CREATE TABLE IF NOT EXISTS eventos_noticias ( + evento_id BIGINT NOT NULL REFERENCES eventos(id) ON DELETE CASCADE, + noticia_id CHAR(32) NOT NULL REFERENCES noticias(id) ON DELETE CASCADE, + traduccion_id BIGINT NOT NULL REFERENCES traducciones(id) ON DELETE CASCADE, + PRIMARY KEY (evento_id, traduccion_id) +); + +-- --------------------------------------------- +-- 4. ÍNDICES ÚTILES +-- --------------------------------------------- + +-- Consultar traducciones por evento +CREATE INDEX IF NOT EXISTS idx_traducciones_evento + ON traducciones(evento_id); + +CREATE INDEX IF NOT EXISTS idx_traducciones_evento_fecha + ON traducciones(evento_id, noticia_id); + +CREATE INDEX IF NOT EXISTS idx_trad_id + ON traducciones(id); + +-- Ordenar eventos por fecha de inicio +CREATE INDEX IF NOT EXISTS idx_eventos_fecha_inicio + ON eventos (fecha_inicio DESC NULLS LAST); + +-- Relación evento <-> noticia / traducción +CREATE INDEX IF NOT EXISTS idx_eventos_noticias_evento + ON eventos_noticias (evento_id); + +CREATE INDEX IF NOT EXISTS idx_eventos_noticias_noticia + ON eventos_noticias (noticia_id); + +CREATE INDEX IF NOT EXISTS idx_eventos_noticias_traduccion + ON eventos_noticias (traduccion_id); + +-- --------------------------------------------- +-- 5. TRIGGER PARA actualizar "actualizado_en" +-- --------------------------------------------- +CREATE OR REPLACE FUNCTION actualizar_evento_modificado() +RETURNS TRIGGER AS $$ +BEGIN + NEW.actualizado_en = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS trg_evento_modificado ON eventos; + +CREATE TRIGGER trg_evento_modificado +BEFORE UPDATE ON eventos +FOR EACH ROW +EXECUTE FUNCTION actualizar_evento_modificado(); + +COMMIT; + diff --git a/templates/base.html b/templates/base.html index 1adb95c..c01c8a0 100644 --- a/templates/base.html +++ b/templates/base.html @@ -12,27 +12,58 @@ -
@@ -44,6 +75,7 @@