# -*- coding: utf-8 -*- """Flask RSS aggregator — versión PostgreSQL (Copyright tuyo 😉, con mejoras de estabilidad, seguridad y gestión de DB) """ import os import sys from flask import Flask, render_template, request, redirect, url_for, Response, flash from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime import feedparser import hashlib import re import psycopg2 import psycopg2.extras import csv from io import StringIO import bleach # Para la seguridad (prevenir XSS) import logging # Configuración del logging logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='[%(asctime)s] %(levelname)s in %(module)s: %(message)s') app = Flask(__name__) # Es necesaria para usar los mensajes flash de forma segura. app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', os.urandom(24)) # --------------------------------------------------------------------------- # Configuración de la base de datos PostgreSQL # --------------------------------------------------------------------------- DB_CONFIG = { "host": "localhost", "port": 5432, "dbname": "rss", "user": "rss", "password": "x", } def get_conn(): """Devuelve una conexión nueva usando psycopg2 y el diccionario DB_CONFIG.""" return psycopg2.connect(**DB_CONFIG) MAX_FALLOS = 5 # ====================================== # Filtro de Plantilla para HTML Seguro # ====================================== @app.template_filter('safe_html') def safe_html(text): if not text: return "" allowed_tags = {'a', 'abbr', 'b', 'strong', 'i', 'em', 'p', 'br', 'img'} allowed_attrs = {'a': ['href', 'title'], 'img': ['src', 'alt']} return bleach.clean(text, tags=allowed_tags, attributes=allowed_attrs, strip=True) # ====================================== # Rutas de la Aplicación # ====================================== @app.route("/") def home(): noticias, categorias, continentes, paises = [], [], [], [] cat_id = request.args.get("categoria_id") cont_id = request.args.get("continente_id") pais_id = request.args.get("pais_id") try: with get_conn() as conn: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre") categorias = cursor.fetchall() cursor.execute("SELECT id, nombre FROM continentes ORDER BY nombre") continentes = cursor.fetchall() if cont_id: cursor.execute("SELECT id, nombre, continente_id FROM paises WHERE continente_id = %s ORDER BY nombre", (cont_id,)) else: cursor.execute("SELECT id, nombre, continente_id FROM paises ORDER BY nombre") paises = cursor.fetchall() sql_params = [] sql_base = """ SELECT n.fecha, n.titulo, n.resumen, n.url, n.imagen_url, c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente FROM noticias n LEFT JOIN categorias c ON n.categoria_id = c.id LEFT JOIN paises p ON n.pais_id = p.id LEFT JOIN continentes co ON p.continente_id = co.id """ conditions = [] if cat_id: conditions.append("n.categoria_id = %s") sql_params.append(cat_id) if pais_id: conditions.append("n.pais_id = %s") sql_params.append(pais_id) elif cont_id: conditions.append("p.continente_id = %s") sql_params.append(cont_id) if conditions: sql_base += " WHERE " + " AND ".join(conditions) sql_final = sql_base + " ORDER BY n.fecha DESC NULLS LAST LIMIT 50" cursor.execute(sql_final, tuple(sql_params)) noticias = cursor.fetchall() except psycopg2.Error as db_err: app.logger.error(f"[DB ERROR] Al leer noticias: {db_err}", exc_info=True) flash("Error de base de datos al cargar las noticias.", "error") return render_template("noticias.html", noticias=noticias, categorias=categorias, continentes=continentes, paises=paises, cat_id=int(cat_id) if cat_id else None, cont_id=int(cont_id) if cont_id else None, pais_id=int(pais_id) if pais_id else None) @app.route("/feeds") def feeds(): feeds_, categorias, continentes, paises = [], [], [], [] try: with get_conn() as conn: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute(""" SELECT f.id, f.nombre, f.descripcion, f.url, f.categoria_id, f.pais_id, f.activo, f.fallos, c.nombre as cat_nom, p.nombre as pais_nom FROM feeds f LEFT JOIN categorias c ON f.categoria_id = c.id LEFT JOIN paises p ON f.pais_id = p.id ORDER BY f.nombre """) feeds_ = cursor.fetchall() cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre") categorias = cursor.fetchall() cursor.execute("SELECT id, nombre FROM continentes ORDER BY nombre") continentes = cursor.fetchall() cursor.execute("SELECT id, nombre, continente_id FROM paises ORDER BY nombre") paises = cursor.fetchall() except psycopg2.Error as db_err: app.logger.error(f"[DB ERROR] Al leer feeds: {db_err}", exc_info=True) flash("Error de base de datos al cargar la gestión de feeds.", "error") return render_template("index.html", feeds=feeds_, categorias=categorias, continentes=continentes, paises=paises) @app.route("/add", methods=["POST"]) def add_feed(): nombre = request.form.get("nombre") try: with get_conn() as conn: with conn.cursor() as cursor: cursor.execute( "INSERT INTO feeds (nombre, descripcion, url, categoria_id, pais_id, idioma) VALUES (%s, %s, %s, %s, %s, %s)", (nombre, request.form.get("descripcion"), request.form.get("url"), request.form.get("categoria_id"), request.form.get("pais_id"), request.form.get("idioma") or None) ) flash(f"Feed '{nombre}' añadido correctamente.", "success") except psycopg2.Error as db_err: app.logger.error(f"[DB ERROR] Al agregar feed: {db_err}", exc_info=True) flash(f"Error al añadir el feed: {db_err}", "error") return redirect(url_for("feeds")) @app.route("/edit/", methods=["GET", "POST"]) def edit_feed(feed_id): feed, categorias, paises = None, [], [] try: with get_conn() as conn: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: if request.method == "POST": activo = "activo" in request.form cursor.execute( """UPDATE feeds SET nombre=%s, descripcion=%s, url=%s, categoria_id=%s, pais_id=%s, idioma=%s, activo=%s WHERE id=%s""", (request.form.get("nombre"), request.form.get("descripcion"), request.form.get("url"), request.form.get("categoria_id"), request.form.get("pais_id"), request.form.get("idioma") or None, activo, feed_id) ) flash("Feed actualizado correctamente.", "success") return redirect(url_for("feeds")) cursor.execute("SELECT * FROM feeds WHERE id = %s", (feed_id,)) feed = cursor.fetchone() cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre") categorias = cursor.fetchall() cursor.execute("SELECT id, nombre, continente_id FROM paises ORDER BY nombre") paises = cursor.fetchall() except psycopg2.Error as db_err: app.logger.error(f"[DB ERROR] Al editar feed: {db_err}", exc_info=True) flash(f"Error al editar el feed: {db_err}", "error") return redirect(url_for("feeds")) if not feed: flash("No se encontró el feed solicitado.", "error") return redirect(url_for("feeds")) return render_template("edit_feed.html", feed=feed, categorias=categorias, paises=paises) @app.route("/delete/") def delete_feed(feed_id): try: with get_conn() as conn: with conn.cursor() as cursor: cursor.execute("DELETE FROM feeds WHERE id=%s", (feed_id,)) flash("Feed eliminado correctamente.", "success") except psycopg2.Error as db_err: app.logger.error(f"[DB ERROR] Al eliminar feed: {db_err}", exc_info=True) flash(f"Error al eliminar el feed: {db_err}", "error") return redirect(url_for("feeds")) @app.route("/reactivar_feed/") def reactivar_feed(feed_id): try: with get_conn() as conn: with conn.cursor() as cursor: cursor.execute("UPDATE feeds SET activo = TRUE, fallos = 0 WHERE id = %s", (feed_id,)) flash("Feed reactivado y contador de fallos reseteado.", "success") except psycopg2.Error as db_err: app.logger.error(f"[DB ERROR] Al reactivar feed: {db_err}", exc_info=True) flash(f"Error al reactivar el feed: {db_err}", "error") return redirect(url_for("feeds")) # =================================================================== # [INICIO DEL CÓDIGO AÑADIDO] Backup y Restauración de Feeds # =================================================================== @app.route("/backup_feeds") def backup_feeds(): """Exporta todos los feeds a un archivo CSV.""" try: with get_conn() as conn: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute(""" SELECT f.id, f.nombre, f.descripcion, f.url, f.categoria_id, c.nombre AS categoria, f.pais_id, p.nombre AS pais, f.idioma, f.activo, f.fallos FROM feeds f LEFT JOIN categorias c ON f.categoria_id = c.id LEFT JOIN paises p ON f.pais_id = p.id ORDER BY f.id """) feeds_ = cursor.fetchall() if not feeds_: flash("No hay feeds para exportar.", "warning") return redirect(url_for("feeds")) si = StringIO() writer = csv.DictWriter(si, fieldnames=[desc[0] for desc in cursor.description]) writer.writeheader() writer.writerows([dict(row) for row in feeds_]) output = si.getvalue() si.close() return Response( output, mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=feeds_backup.csv"}, ) except Exception as e: app.logger.error(f"[ERROR] Al hacer backup de feeds: {e}", exc_info=True) flash("Error al generar el backup.", "error") return redirect(url_for("feeds")) @app.route("/restore_feeds", methods=["GET", "POST"]) def restore_feeds(): """Muestra el formulario de restauración y procesa el archivo CSV subido.""" if request.method == "POST": file = request.files.get("file") if not file or not file.filename.endswith(".csv"): flash("Archivo no válido. Por favor, sube un archivo .csv.", "error") return redirect(url_for("restore_feeds")) try: file_stream = StringIO(file.read().decode("utf-8")) reader = csv.DictReader(file_stream) rows = list(reader) n_ok, n_err = 0, 0 with get_conn() as conn: with conn.cursor() as cursor: for row in rows: try: activo_val = str(row.get("activo", "")).strip().lower() activo = activo_val in ["1", "true", "t", "yes", "on"] cursor.execute( """ INSERT INTO feeds (id, nombre, descripcion, url, categoria_id, pais_id, idioma, activo, fallos) VALUES (%(id)s, %(nombre)s, %(descripcion)s, %(url)s, %(categoria_id)s, %(pais_id)s, %(idioma)s, %(activo)s, %(fallos)s) ON CONFLICT (id) DO UPDATE SET nombre = EXCLUDED.nombre, descripcion = EXCLUDED.descripcion, url = EXCLUDED.url, categoria_id = EXCLUDED.categoria_id, pais_id = EXCLUDED.pais_id, idioma = EXCLUDED.idioma, activo = EXCLUDED.activo, fallos = EXCLUDED.fallos; """, { "id": int(row.get("id")), "nombre": row.get("nombre"), "descripcion": row.get("descripcion") or "", "url": row.get("url"), "categoria_id": int(row["categoria_id"]) if row.get("categoria_id") else None, "pais_id": int(row["pais_id"]) if row.get("pais_id") else None, "idioma": row.get("idioma") or None, "activo": activo, "fallos": int(row.get("fallos", 0)), } ) n_ok += 1 except Exception as e: n_err += 1 app.logger.error(f"Error procesando fila del CSV: {row} - Error: {e}") flash(f"Restauración completada. Feeds procesados: {n_ok}. Errores: {n_err}.", "success" if n_err == 0 else "warning") except Exception as e: app.logger.error(f"Error al restaurar feeds desde CSV: {e}", exc_info=True) flash(f"Ocurrió un error general al procesar el archivo: {e}", "error") return redirect(url_for("feeds")) return render_template("restore_feeds.html") # =================================================================== # [FIN DEL CÓDIGO AÑADIDO] # =================================================================== # ================================ # Lógica de procesado de feeds # ================================ def sumar_fallo_feed(cursor, feed_id): cursor.execute("UPDATE feeds SET fallos = fallos + 1 WHERE id = %s RETURNING fallos", (feed_id,)) fallos = cursor.fetchone()[0] if fallos >= MAX_FALLOS: cursor.execute("UPDATE feeds SET activo = FALSE WHERE id = %s", (feed_id,)) return fallos def resetear_fallos_feed(cursor, feed_id): cursor.execute("UPDATE feeds SET fallos = 0 WHERE id = %s", (feed_id,)) def fetch_and_store(): with app.app_context(): app.logger.info("Iniciando ciclo de actualización de feeds...") try: with get_conn() as conn: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute("SELECT id, url, categoria_id, pais_id FROM feeds WHERE activo = TRUE") feeds_to_process = cursor.fetchall() for feed in feeds_to_process: try: app.logger.info(f"Procesando feed: {feed['url']}") parsed = feedparser.parse(feed['url']) if getattr(parsed, "bozo", False): app.logger.warning(f"[BOZO] Feed mal formado: {feed['url']}") sumar_fallo_feed(cursor, feed['id']) continue resetear_fallos_feed(cursor, feed['id']) for entry in parsed.entries: try: link = entry.get("link") if not link: continue noticia_id = hashlib.md5(link.encode()).hexdigest() titulo = entry.get("title", "") resumen = entry.get("summary", "") imagen_url = "" if "media_content" in entry and entry.media_content: imagen_url = entry.media_content[0].get("url", "") elif "