rss/app.py
2025-06-09 11:48:00 +02:00

334 lines
18 KiB
Python

# -*- coding: utf-8 -*-
import os
import sys
import hashlib
import re
import csv
import math
from io import StringIO
from datetime import datetime, timedelta
import logging
import atexit
from flask import Flask, render_template, request, redirect, url_for, Response, flash
from apscheduler.schedulers.background import BackgroundScheduler
import psycopg2
import psycopg2.extras
import feedparser
import bleach
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='[%(asctime)s] %(levelname)s in %(module)s: %(message)s')
app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', os.urandom(24))
DB_CONFIG = {"host": "localhost", "port": 5432, "dbname": "rss", "user": "rss", "password": "x"}
MAX_FALLOS = 5
def get_conn():
return psycopg2.connect(**DB_CONFIG)
@app.template_filter('safe_html')
def safe_html(text):
if not text: return ""
allowed_tags = {'a', 'b', 'strong', 'i', 'em', 'p', 'br', 'img'}
allowed_attrs = {'a': ['href', 'title'], 'img': ['src', 'alt']}
return bleach.clean(text, tags=allowed_tags, attributes=allowed_attrs, strip=True)
@app.route("/")
def home():
noticias, categorias, continentes, paises = [], [], [], []
cat_id, cont_id, pais_id = request.args.get("categoria_id"), request.args.get("continente_id"), request.args.get("pais_id")
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre")
categorias = cursor.fetchall()
cursor.execute("SELECT id, nombre FROM continentes ORDER BY nombre")
continentes = cursor.fetchall()
if cont_id:
cursor.execute("SELECT id, nombre, continente_id FROM paises WHERE continente_id = %s ORDER BY nombre", (cont_id,))
else:
cursor.execute("SELECT id, nombre, continente_id FROM paises ORDER BY nombre")
paises = cursor.fetchall()
sql_params, conditions = [], []
sql_base = "SELECT n.fecha, n.titulo, n.resumen, n.url, n.imagen_url, c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente FROM noticias n LEFT JOIN categorias c ON n.categoria_id = c.id LEFT JOIN paises p ON n.pais_id = p.id LEFT JOIN continentes co ON p.continente_id = co.id"
if cat_id: conditions.append("n.categoria_id = %s"); sql_params.append(cat_id)
if pais_id: conditions.append("n.pais_id = %s"); sql_params.append(pais_id)
elif cont_id: conditions.append("p.continente_id = %s"); sql_params.append(cont_id)
if conditions: sql_base += " WHERE " + " AND ".join(conditions)
sql_final = sql_base + " ORDER BY n.fecha DESC NULLS LAST LIMIT 50"
cursor.execute(sql_final, tuple(sql_params))
noticias = cursor.fetchall()
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al leer noticias: {db_err}", exc_info=True)
flash("Error de base de datos al cargar las noticias.", "error")
return render_template("noticias.html", noticias=noticias, categorias=categorias, continentes=continentes, paises=paises,
cat_id=int(cat_id) if cat_id else None, cont_id=int(cont_id) if cont_id else None, pais_id=int(pais_id) if pais_id else None)
@app.route("/feeds")
def dashboard():
stats = {'feeds_totales': 0, 'noticias_totales': 0, 'feeds_caidos': 0}
try:
with get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM feeds;")
stats['feeds_totales'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM noticias;")
stats['noticias_totales'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM feeds WHERE activo = FALSE;")
stats['feeds_caidos'] = cursor.fetchone()[0]
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al calcular estadísticas del dashboard: {db_err}")
flash("Error al conectar con la base de datos para mostrar el resumen.", "error")
return render_template("dashboard.html", stats=stats)
@app.route("/feeds/manage")
def manage_feeds():
page = request.args.get('page', 1, type=int)
per_page = 10
offset = (page - 1) * per_page
feeds_list = []
total_feeds = 0
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT COUNT(*) FROM feeds")
total_feeds = cursor.fetchone()[0]
cursor.execute("SELECT * FROM feeds ORDER BY nombre LIMIT %s OFFSET %s", (per_page, offset))
feeds_list = cursor.fetchall()
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al obtener lista de feeds: {db_err}")
flash("Error al obtener la lista de feeds.", "error")
total_pages = math.ceil(total_feeds / per_page)
return render_template("feeds_list.html", feeds=feeds_list, page=page, total_pages=total_pages, total_feeds=total_feeds)
@app.route("/feeds/add", methods=['GET', 'POST'])
def add_feed():
if request.method == 'POST':
nombre = request.form.get("nombre")
try:
with get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO feeds (nombre, descripcion, url, categoria_id, pais_id, idioma) VALUES (%s, %s, %s, %s, %s, %s)",
(nombre, request.form.get("descripcion"), request.form.get("url"), request.form.get("categoria_id"), request.form.get("pais_id"), (request.form.get("idioma", "").strip() or None))
)
flash(f"Feed '{nombre}' añadido correctamente.", "success")
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al agregar feed: {db_err}", exc_info=True)
flash(f"Error al añadir el feed: {db_err}", "error")
return redirect(url_for("dashboard"))
categorias, paises = [], []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre")
categorias = cursor.fetchall()
cursor.execute("SELECT id, nombre FROM paises ORDER BY nombre")
paises = cursor.fetchall()
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al cargar formulario para añadir feed: {db_err}")
flash("No se pudieron cargar las categorías o países.", "error")
return render_template("add_feed.html", categorias=categorias, paises=paises)
@app.route("/edit/<int:feed_id>", methods=["GET", "POST"])
def edit_feed(feed_id):
feed, categorias, paises = None, [], []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
if request.method == "POST":
idioma = request.form.get("idioma", "").strip() or None
activo = "activo" in request.form
cursor.execute(
"""UPDATE feeds SET nombre=%s, descripcion=%s, url=%s, categoria_id=%s, pais_id=%s, idioma=%s, activo=%s WHERE id=%s""",
(request.form.get("nombre"), request.form.get("descripcion"), request.form.get("url"), request.form.get("categoria_id"), request.form.get("pais_id"), idioma, activo, feed_id)
)
flash("Feed actualizado correctamente.", "success")
return redirect(url_for("manage_feeds"))
cursor.execute("SELECT * FROM feeds WHERE id = %s", (feed_id,))
feed = cursor.fetchone()
cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre")
categorias = cursor.fetchall()
cursor.execute("SELECT id, nombre, continente_id FROM paises ORDER BY nombre")
paises = cursor.fetchall()
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al editar feed: {db_err}", exc_info=True)
flash(f"Error al editar el feed: {db_err}", "error")
return redirect(url_for("manage_feeds"))
if not feed:
flash("No se encontró el feed solicitado.", "error")
return redirect(url_for("manage_feeds"))
return render_template("edit_feed.html", feed=feed, categorias=categorias, paises=paises)
@app.route("/delete/<int:feed_id>")
def delete_feed(feed_id):
try:
with get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM feeds WHERE id=%s", (feed_id,))
flash("Feed eliminado correctamente.", "success")
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al eliminar feed: {db_err}", exc_info=True)
flash(f"Error al eliminar el feed: {db_err}", "error")
return redirect(url_for("manage_feeds"))
@app.route("/reactivar_feed/<int:feed_id>")
def reactivar_feed(feed_id):
try:
with get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("UPDATE feeds SET activo = TRUE, fallos = 0 WHERE id = %s", (feed_id,))
flash("Feed reactivado y contador de fallos reseteado.", "success")
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al reactivar feed: {db_err}", exc_info=True)
flash(f"Error al reactivar el feed: {db_err}", "error")
return redirect(url_for("manage_feeds"))
@app.route("/backup_feeds")
def backup_feeds():
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("""
SELECT f.id, f.nombre, f.descripcion, f.url, f.categoria_id, c.nombre AS categoria,
f.pais_id, p.nombre AS pais, f.idioma, f.activo, f.fallos
FROM feeds f
LEFT JOIN categorias c ON f.categoria_id = c.id
LEFT JOIN paises p ON f.pais_id = p.id
ORDER BY f.id
""")
feeds_ = cursor.fetchall()
if not feeds_:
flash("No hay feeds para exportar.", "warning")
return redirect(url_for("dashboard"))
si = StringIO()
writer = csv.DictWriter(si, fieldnames=[desc[0] for desc in cursor.description])
writer.writeheader()
writer.writerows([dict(row) for row in feeds_])
output = si.getvalue()
si.close()
return Response(output, mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=feeds_backup.csv"})
except Exception as e:
app.logger.error(f"[ERROR] Al hacer backup de feeds: {e}", exc_info=True)
flash("Error al generar el backup.", "error")
return redirect(url_for("dashboard"))
@app.route("/restore_feeds", methods=["GET", "POST"])
def restore_feeds():
if request.method == "POST":
file = request.files.get("file")
if not file or not file.filename.endswith(".csv"):
flash("Archivo no válido. Por favor, sube un archivo .csv.", "error")
return redirect(url_for("restore_feeds"))
try:
file_stream = StringIO(file.read().decode("utf-8"))
reader = csv.DictReader(file_stream)
rows = list(reader)
n_ok, n_err = 0, 0
with get_conn() as conn:
with conn.cursor() as cursor:
for row in rows:
try:
activo_val = str(row.get("activo", "")).strip().lower()
activo = activo_val in ["1", "true", "t", "yes", "on"]
cursor.execute(
"""
INSERT INTO feeds (id, nombre, descripcion, url, categoria_id, pais_id, idioma, activo, fallos)
VALUES (%(id)s, %(nombre)s, %(descripcion)s, %(url)s, %(categoria_id)s, %(pais_id)s, %(idioma)s, %(activo)s, %(fallos)s)
ON CONFLICT (id) DO UPDATE SET
nombre = EXCLUDED.nombre, descripcion = EXCLUDED.descripcion, url = EXCLUDED.url,
categoria_id = EXCLUDED.categoria_id, pais_id = EXCLUDED.pais_id, idioma = EXCLUDED.idioma,
activo = EXCLUDED.activo, fallos = EXCLUDED.fallos;
""",
{
"id": int(row.get("id")), "nombre": row.get("nombre"), "descripcion": row.get("descripcion") or "",
"url": row.get("url"), "categoria_id": int(row["categoria_id"]) if row.get("categoria_id") else None,
"pais_id": int(row["pais_id"]) if row.get("pais_id") else None, "idioma": row.get("idioma") or None,
"activo": activo, "fallos": int(row.get("fallos", 0)),
}
)
n_ok += 1
except Exception as e:
n_err += 1
app.logger.error(f"Error procesando fila del CSV: {row} - Error: {e}")
flash(f"Restauración completada. Feeds procesados: {n_ok}. Errores: {n_err}.", "success" if n_err == 0 else "warning")
except Exception as e:
app.logger.error(f"Error al restaurar feeds desde CSV: {e}", exc_info=True)
flash(f"Ocurrió un error general al procesar el archivo: {e}", "error")
return redirect(url_for("dashboard"))
return render_template("restore_feeds.html")
def sumar_fallo_feed(cursor, feed_id):
cursor.execute("UPDATE feeds SET fallos = fallos + 1 WHERE id = %s RETURNING fallos", (feed_id,))
fallos = cursor.fetchone()[0]
if fallos >= MAX_FALLOS:
cursor.execute("UPDATE feeds SET activo = FALSE WHERE id = %s", (feed_id,))
return fallos
def resetear_fallos_feed(cursor, feed_id):
cursor.execute("UPDATE feeds SET fallos = 0 WHERE id = %s", (feed_id,))
def fetch_and_store():
with app.app_context():
app.logger.info("Iniciando ciclo de actualización de feeds...")
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT id, url, categoria_id, pais_id FROM feeds WHERE activo = TRUE")
feeds_to_process = cursor.fetchall()
if not feeds_to_process:
app.logger.info("No hay feeds activos para procesar.")
return
for feed in feeds_to_process:
try:
app.logger.info(f"Procesando feed: {feed['url']}")
parsed = feedparser.parse(feed['url'])
if getattr(parsed, "bozo", False):
app.logger.warning(f"[BOZO] Feed mal formado: {feed['url']} - Excepción: {parsed.bozo_exception}")
sumar_fallo_feed(cursor, feed['id'])
continue
resetear_fallos_feed(cursor, feed['id'])
for entry in parsed.entries:
try:
link = entry.get("link")
if not link: continue
noticia_id = hashlib.md5(link.encode()).hexdigest()
titulo = entry.get("title", "")
resumen = entry.get("summary", "")
imagen_url = ""
if "media_content" in entry and entry.media_content:
imagen_url = entry.media_content[0].get("url", "")
elif "<img" in resumen:
img_search = re.search(r'src="([^"]+)"', resumen)
if img_search: imagen_url = img_search.group(1)
fecha_publicacion = None
if "published_parsed" in entry and entry.published_parsed: fecha_publicacion = datetime(*entry.published_parsed[:6])
elif "updated_parsed" in entry and entry.updated_parsed: fecha_publicacion = datetime(*entry.updated_parsed[:6])
cursor.execute(
"INSERT INTO noticias (id, titulo, resumen, url, fecha, imagen_url, categoria_id, pais_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (id) DO NOTHING",
(noticia_id, titulo, resumen, link, fecha_publicacion, imagen_url, feed['categoria_id'], feed['pais_id'])
)
except Exception as entry_err:
app.logger.error(f"Error en entrada de feed {feed['url']}: {entry_err}")
except Exception as e:
app.logger.error(f"[PARSE ERROR] En feed {feed['url']}: {e}")
sumar_fallo_feed(cursor, feed['id'])
app.logger.info("Ciclo de feeds completado.")
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Fallo en ciclo de actualización: {db_err}")
scheduler = BackgroundScheduler(daemon=True)
run_time = datetime.now() + timedelta(seconds=20)
scheduler.add_job(fetch_and_store, "interval", minutes=15, id="rss_job", next_run_time=run_time)
scheduler.start()
atexit.register(lambda: scheduler.shutdown())
app.logger.info("Scheduler configurado. Primera ejecución en 20 segundos.")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True, use_reloader=False)