Añadido .gitignore, corregido systemd, aumentado el timeout de Gunicorn y solucionado conflicto con el scheduler.
307 lines
14 KiB
Python
307 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Flask RSS aggregator — versión PostgreSQL
|
|
|
|
(Copyright tuyo 😉, con mejoras de estabilidad, seguridad y gestión de DB)
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
from flask import Flask, render_template, request, redirect, url_for, Response, flash
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from datetime import datetime
|
|
import feedparser
|
|
import hashlib
|
|
import re
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import csv
|
|
from io import StringIO
|
|
import bleach # Para la seguridad (prevenir XSS)
|
|
import logging
|
|
|
|
# Configuración del logging
|
|
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='[%(asctime)s] %(levelname)s in %(module)s: %(message)s')
|
|
|
|
app = Flask(__name__)
|
|
# Es necesaria para usar los mensajes flash de forma segura.
|
|
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', os.urandom(24))
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuración de la base de datos PostgreSQL
|
|
# ---------------------------------------------------------------------------
|
|
DB_CONFIG = {
|
|
"host": "localhost",
|
|
"port": 5432,
|
|
"dbname": "rss",
|
|
"user": "rss",
|
|
"password": "x",
|
|
}
|
|
|
|
def get_conn():
|
|
"""Devuelve una conexión nueva usando psycopg2 y el diccionario DB_CONFIG."""
|
|
return psycopg2.connect(**DB_CONFIG)
|
|
|
|
MAX_FALLOS = 5 # Número máximo de fallos antes de desactivar el feed
|
|
|
|
# ======================================
|
|
# Filtro de Plantilla para HTML Seguro
|
|
# ======================================
|
|
@app.template_filter('safe_html')
|
|
def safe_html(text):
|
|
"""Sanitiza el HTML para prevenir ataques XSS, permitiendo etiquetas seguras."""
|
|
if not text:
|
|
return ""
|
|
allowed_tags = {'a', 'abbr', 'b', 'strong', 'i', 'em', 'p', 'br', 'img'}
|
|
allowed_attrs = {'a': ['href', 'title'], 'img': ['src', 'alt']}
|
|
return bleach.clean(text, tags=allowed_tags, attributes=allowed_attrs, strip=True)
|
|
|
|
# ======================================
|
|
# Rutas de la Aplicación
|
|
# ======================================
|
|
@app.route("/")
|
|
def home():
|
|
noticias, categorias, continentes, paises = [], [], [], []
|
|
cat_id = request.args.get("categoria_id")
|
|
cont_id = request.args.get("continente_id")
|
|
pais_id = request.args.get("pais_id")
|
|
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
|
cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre")
|
|
categorias = cursor.fetchall()
|
|
cursor.execute("SELECT id, nombre FROM continentes ORDER BY nombre")
|
|
continentes = cursor.fetchall()
|
|
|
|
if cont_id:
|
|
cursor.execute("SELECT id, nombre, continente_id FROM paises WHERE continente_id = %s ORDER BY nombre", (cont_id,))
|
|
else:
|
|
cursor.execute("SELECT id, nombre, continente_id FROM paises ORDER BY nombre")
|
|
paises = cursor.fetchall()
|
|
|
|
sql_params = []
|
|
sql_base = """
|
|
SELECT n.fecha, n.titulo, n.resumen, n.url, n.imagen_url,
|
|
c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente
|
|
FROM noticias n
|
|
LEFT JOIN categorias c ON n.categoria_id = c.id
|
|
LEFT JOIN paises p ON n.pais_id = p.id
|
|
LEFT JOIN continentes co ON p.continente_id = co.id
|
|
"""
|
|
|
|
conditions = []
|
|
if cat_id:
|
|
conditions.append("n.categoria_id = %s")
|
|
sql_params.append(cat_id)
|
|
if pais_id:
|
|
conditions.append("n.pais_id = %s")
|
|
sql_params.append(pais_id)
|
|
elif cont_id:
|
|
conditions.append("p.continente_id = %s")
|
|
sql_params.append(cont_id)
|
|
|
|
if conditions:
|
|
sql_base += " WHERE " + " AND ".join(conditions)
|
|
|
|
sql_final = sql_base + " ORDER BY n.fecha DESC NULLS LAST LIMIT 50"
|
|
cursor.execute(sql_final, tuple(sql_params))
|
|
noticias = cursor.fetchall()
|
|
|
|
except psycopg2.Error as db_err:
|
|
app.logger.error(f"[DB ERROR] Al leer noticias: {db_err}", exc_info=True)
|
|
flash("Error de base de datos al cargar las noticias.", "error")
|
|
|
|
return render_template("noticias.html", noticias=noticias, categorias=categorias, continentes=continentes, paises=paises,
|
|
cat_id=int(cat_id) if cat_id else None, cont_id=int(cont_id) if cont_id else None, pais_id=int(pais_id) if pais_id else None)
|
|
|
|
# Aquí incluyo el resto de rutas que habías creado, con la gestión de conexión mejorada
|
|
# (El código es largo, si quieres omitir alguna parte que no uses, puedes hacerlo)
|
|
|
|
@app.route("/feeds")
|
|
def feeds():
|
|
feeds_, categorias, continentes, paises = [], [], [], []
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
|
cursor.execute("""
|
|
SELECT f.id, f.nombre, f.descripcion, f.url, f.categoria_id, f.pais_id,
|
|
f.activo, f.fallos, c.nombre as cat_nom, p.nombre as pais_nom
|
|
FROM feeds f
|
|
LEFT JOIN categorias c ON f.categoria_id = c.id
|
|
LEFT JOIN paises p ON f.pais_id = p.id
|
|
ORDER BY f.nombre
|
|
""")
|
|
feeds_ = cursor.fetchall()
|
|
cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre")
|
|
categorias = cursor.fetchall()
|
|
cursor.execute("SELECT id, nombre FROM continentes ORDER BY nombre")
|
|
continentes = cursor.fetchall()
|
|
cursor.execute("SELECT id, nombre, continente_id FROM paises ORDER BY nombre")
|
|
paises = cursor.fetchall()
|
|
except psycopg2.Error as db_err:
|
|
app.logger.error(f"[DB ERROR] Al leer feeds: {db_err}", exc_info=True)
|
|
flash("Error de base de datos al cargar la gestión de feeds.", "error")
|
|
return render_template("index.html", feeds=feeds_, categorias=categorias, continentes=continentes, paises=paises)
|
|
|
|
@app.route("/add", methods=["POST"])
|
|
def add_feed():
|
|
nombre = request.form.get("nombre")
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(
|
|
"INSERT INTO feeds (nombre, descripcion, url, categoria_id, pais_id, idioma) VALUES (%s, %s, %s, %s, %s, %s)",
|
|
(nombre, request.form.get("descripcion"), request.form.get("url"), request.form.get("categoria_id"), request.form.get("pais_id"), request.form.get("idioma") or None)
|
|
)
|
|
flash(f"Feed '{nombre}' añadido correctamente.", "success")
|
|
except psycopg2.Error as db_err:
|
|
app.logger.error(f"[DB ERROR] Al agregar feed: {db_err}", exc_info=True)
|
|
flash(f"Error al añadir el feed: {db_err}", "error")
|
|
return redirect(url_for("feeds"))
|
|
|
|
@app.route("/edit/<int:feed_id>", methods=["GET", "POST"])
|
|
def edit_feed(feed_id):
|
|
feed, categorias, paises = None, [], []
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
|
if request.method == "POST":
|
|
activo = "activo" in request.form
|
|
cursor.execute(
|
|
"""UPDATE feeds SET nombre=%s, descripcion=%s, url=%s, categoria_id=%s, pais_id=%s, idioma=%s, activo=%s WHERE id=%s""",
|
|
(request.form.get("nombre"), request.form.get("descripcion"), request.form.get("url"), request.form.get("categoria_id"), request.form.get("pais_id"), request.form.get("idioma") or None, activo, feed_id)
|
|
)
|
|
flash("Feed actualizado correctamente.", "success")
|
|
return redirect(url_for("feeds"))
|
|
cursor.execute("SELECT * FROM feeds WHERE id = %s", (feed_id,))
|
|
feed = cursor.fetchone()
|
|
cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre")
|
|
categorias = cursor.fetchall()
|
|
cursor.execute("SELECT id, nombre, continente_id FROM paises ORDER BY nombre")
|
|
paises = cursor.fetchall()
|
|
except psycopg2.Error as db_err:
|
|
app.logger.error(f"[DB ERROR] Al editar feed: {db_err}", exc_info=True)
|
|
flash(f"Error al editar el feed: {db_err}", "error")
|
|
return redirect(url_for("feeds"))
|
|
if not feed:
|
|
flash("No se encontró el feed solicitado.", "error")
|
|
return redirect(url_for("feeds"))
|
|
return render_template("edit_feed.html", feed=feed, categorias=categorias, paises=paises)
|
|
|
|
@app.route("/delete/<int:feed_id>")
|
|
def delete_feed(feed_id):
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute("DELETE FROM feeds WHERE id=%s", (feed_id,))
|
|
flash("Feed eliminado correctamente.", "success")
|
|
except psycopg2.Error as db_err:
|
|
app.logger.error(f"[DB ERROR] Al eliminar feed: {db_err}", exc_info=True)
|
|
flash(f"Error al eliminar el feed: {db_err}", "error")
|
|
return redirect(url_for("feeds"))
|
|
|
|
@app.route("/reactivar_feed/<int:feed_id>")
|
|
def reactivar_feed(feed_id):
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute("UPDATE feeds SET activo = TRUE, fallos = 0 WHERE id = %s", (feed_id,))
|
|
flash("Feed reactivado y contador de fallos reseteado.", "success")
|
|
except psycopg2.Error as db_err:
|
|
app.logger.error(f"[DB ERROR] Al reactivar feed: {db_err}", exc_info=True)
|
|
flash(f"Error al reactivar el feed: {db_err}", "error")
|
|
return redirect(url_for("feeds"))
|
|
|
|
|
|
# ================================
|
|
# Lógica de procesado de feeds
|
|
# ================================
|
|
def sumar_fallo_feed(cursor, feed_id):
|
|
cursor.execute("UPDATE feeds SET fallos = fallos + 1 WHERE id = %s RETURNING fallos", (feed_id,))
|
|
fallos = cursor.fetchone()[0]
|
|
if fallos >= MAX_FALLOS:
|
|
cursor.execute("UPDATE feeds SET activo = FALSE WHERE id = %s", (feed_id,))
|
|
return fallos
|
|
|
|
def resetear_fallos_feed(cursor, feed_id):
|
|
cursor.execute("UPDATE feeds SET fallos = 0 WHERE id = %s", (feed_id,))
|
|
|
|
def fetch_and_store():
|
|
with app.app_context():
|
|
app.logger.info("Iniciando ciclo de actualización de feeds...")
|
|
try:
|
|
with get_conn() as conn:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
|
cursor.execute("SELECT id, url, categoria_id, pais_id FROM feeds WHERE activo = TRUE")
|
|
feeds_to_process = cursor.fetchall()
|
|
|
|
for feed in feeds_to_process:
|
|
try:
|
|
app.logger.info(f"Procesando feed: {feed['url']}")
|
|
parsed = feedparser.parse(feed['url'])
|
|
|
|
if getattr(parsed, "bozo", False):
|
|
app.logger.warning(f"[BOZO] Feed mal formado: {feed['url']}")
|
|
sumar_fallo_feed(cursor, feed['id'])
|
|
continue
|
|
|
|
resetear_fallos_feed(cursor, feed['id'])
|
|
|
|
for entry in parsed.entries:
|
|
try:
|
|
link = entry.get("link")
|
|
if not link: continue
|
|
|
|
noticia_id = hashlib.md5(link.encode()).hexdigest()
|
|
titulo = entry.get("title", "")
|
|
resumen = entry.get("summary", "")
|
|
|
|
imagen_url = ""
|
|
if "media_content" in entry and entry.media_content:
|
|
imagen_url = entry.media_content[0].get("url", "")
|
|
elif "<img" in resumen:
|
|
img_search = re.search(r'src="([^"]+)"', resumen)
|
|
if img_search:
|
|
imagen_url = img_search.group(1)
|
|
|
|
fecha_publicacion = None
|
|
if "published_parsed" in entry:
|
|
fecha_publicacion = datetime(*entry.published_parsed[:6])
|
|
elif "updated_parsed" in entry:
|
|
fecha_publicacion = datetime(*entry.updated_parsed[:6])
|
|
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO noticias (id, titulo, resumen, url, fecha, imagen_url, categoria_id, pais_id)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (id) DO NOTHING
|
|
""",
|
|
(noticia_id, titulo, resumen, link, fecha_publicacion, imagen_url, feed['categoria_id'], feed['pais_id'])
|
|
)
|
|
except Exception as entry_err:
|
|
app.logger.error(f"Error procesando entrada de feed {feed['url']}: {entry_err}", exc_info=True)
|
|
except Exception as e:
|
|
app.logger.error(f"[PARSE ERROR] En feed {feed['url']}: {e}", exc_info=True)
|
|
sumar_fallo_feed(cursor, feed['id'])
|
|
|
|
app.logger.info(f"Ciclo de feeds completado.")
|
|
except psycopg2.Error as db_err:
|
|
app.logger.error(f"[DB ERROR] Fallo en el ciclo de actualización de feeds: {db_err}", exc_info=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lanzador de la aplicación + scheduler
|
|
# ---------------------------------------------------------------------------
|
|
if __name__ == "__main__":
|
|
# Evita que el scheduler se inicie dos veces en modo debug con el re-cargador
|
|
if os.environ.get('WERKZEUG_RUN_MAIN') != 'true':
|
|
scheduler = BackgroundScheduler(daemon=True)
|
|
scheduler.add_job(fetch_and_store, "interval", minutes=2, id="rss_job", misfire_grace_time=60)
|
|
scheduler.start()
|
|
app.logger.info("Scheduler iniciado correctamente.")
|
|
|
|
# Apagar el scheduler de forma limpia al salir
|
|
import atexit
|
|
atexit.register(lambda: scheduler.shutdown())
|
|
|
|
app.run(host="0.0.0.0", port=5000, debug=True, use_reloader=True)
|