rss/app.py

486 lines
26 KiB
Python

# app.py - Versión final solo para la web
import os
import sys
import hashlib # Keep if used elsewhere (e.g., if generating IDs in other parts of the app), otherwise remove
import csv
import math
from io import StringIO, BytesIO
from datetime import datetime, timedelta
import logging
import atexit
import zipfile
from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from flask import Flask, render_template, request, redirect, url_for, Response, flash
import psycopg2
import psycopg2.extras
import psycopg2.pool
import bleach
# Import the processing function from the new module
from feed_processor import process_single_feed
# --- Configuración de Logging ---
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='[%(asctime)s] %(levelname)s in %(module)s: %(message)s')
# --- Inicialización de la App Flask ---
app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', os.urandom(24))
# --- Configuración de la Base de Datos y Constantes ---
DB_CONFIG = {
"host": os.environ.get("DB_HOST", "localhost"),
"port": int(os.environ.get("DB_PORT", 5432)),
"dbname": os.environ.get("DB_NAME", "rss"),
"user": os.environ.get("DB_USER", "rss"),
"password": os.environ.get("DB_PASS", "x")
}
# Define worker constants here or in a separate config
MAX_WORKERS = int(os.environ.get("RSS_MAX_WORKERS", 20)) # Changed default to 20 concurrent workers
SINGLE_FEED_TIMEOUT = int(os.environ.get("RSS_FEED_TIMEOUT", 30)) # Example: 30 seconds per feed process
MAX_FALLOS = int(os.environ.get("RSS_MAX_FAILURES", 5)) # Number of failures before deactivating a feed
# --- Pool de Conexiones a la Base de Datos ---
db_pool = None
try:
# Aumentamos el pool para dar cabida a los workers del servidor web
db_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=10, **DB_CONFIG)
app.logger.info("Pool de conexiones a la base de datos creado exitosamente.")
except psycopg2.OperationalError as e:
logging.error(f"FATAL: No se pudo conectar a la base de datos para crear el pool: {e}")
# Consider sys.exit(1) here if DB connection is absolutely critical for app startup
@contextmanager
def get_conn():
if not db_pool: raise ConnectionError("El pool de la base de datos no está disponible.")
conn = None
try:
conn = db_pool.getconn()
yield conn
conn.commit()
except Exception as e:
if conn: conn.rollback()
raise e
finally:
if conn: db_pool.putconn(conn)
# --- Hook de Cierre ---
@atexit.register
def shutdown_hooks():
if db_pool:
db_pool.closeall()
app.logger.info("Pool de conexiones de la base de datos cerrado.")
# --- Filtros y Rutas ---
@app.template_filter('safe_html')
def safe_html(text):
if not text: return ""
return bleach.clean(text, tags={'a', 'b', 'strong', 'i', 'em', 'p', 'br'}, attributes={'a': ['href', 'title']}, strip=True)
@app.route("/")
def home():
cat_id, cont_id, pais_id, fecha_filtro = request.args.get("categoria_id"), request.args.get("continente_id"), request.args.get("pais_id"), request.args.get("fecha")
q = request.args.get("q", "").strip()
noticias, categorias, continentes, paises = [], [], [], []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre")
categorias = cursor.fetchall()
cursor.execute("SELECT id, nombre FROM continentes ORDER BY nombre")
continentes = cursor.fetchall()
cursor.execute("SELECT id, nombre, continente_id FROM paises ORDER BY nombre")
paises = cursor.fetchall()
sql_params, conditions = [], []
sql_base = "SELECT n.fecha, n.titulo, n.resumen, n.url, n.imagen_url, c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente FROM noticias n LEFT JOIN categorias c ON n.categoria_id = c.id LEFT JOIN paises p ON n.pais_id = p.id LEFT JOIN continentes co ON p.continente_id = co.id"
if q:
search_query = " & ".join(q.split())
conditions.append("n.tsv @@ to_tsquery('spanish', %s)")
sql_params.append(search_query)
if cat_id: conditions.append("n.categoria_id = %s"); sql_params.append(cat_id)
if pais_id: conditions.append("n.pais_id = %s"); sql_params.append(pais_id)
elif cont_id: conditions.append("p.continente_id = %s"); sql_params.append(cont_id)
if fecha_filtro:
try:
fecha_obj = datetime.strptime(fecha_filtro, '%Y-%m-%d')
conditions.append("n.fecha::date = %s")
sql_params.append(fecha_obj.date())
except ValueError:
flash("Formato de fecha no válido. Use AAAA-MM-DD.", "error")
if conditions: sql_base += " WHERE " + " AND ".join(conditions)
order_clause = " ORDER BY n.fecha DESC NULLS LAST"
if q:
search_query_ts = " & ".join(q.split())
order_clause = " ORDER BY ts_rank(n.tsv, to_tsquery('spanish', %s)) DESC, n.fecha DESC"
sql_params.append(search_query_ts)
sql_final = sql_base + order_clause + " LIMIT 50"
cursor.execute(sql_final, tuple(sql_params))
noticias = cursor.fetchall()
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al leer noticias: {db_err}", exc_info=True)
flash("Error de base de datos al cargar las noticias.", "error")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return render_template('_noticias_list.html', noticias=noticias)
return render_template("noticias.html",
noticias=noticias, categorias=categorias, continentes=continentes, paises=paises,
cat_id=int(cat_id) if cat_id else None, cont_id=int(cont_id) if cont_id else None,
pais_id=int(pais_id) if pais_id else None, fecha_filtro=fecha_filtro, q=q)
@app.route("/feeds")
def dashboard():
stats = {'feeds_totales': 0, 'noticias_totales': 0, 'feeds_caidos': 0}
try:
with get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM feeds")
stats['feeds_totales'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM noticias")
stats['noticias_totales'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM feeds WHERE activo = FALSE")
stats['feeds_caidos'] = cursor.fetchone()[0]
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al calcular estadísticas: {db_err}")
flash("Error al conectar con la base de datos.", "error")
return render_template("dashboard.html", stats=stats)
@app.route("/feeds/manage")
def manage_feeds():
page = request.args.get('page', 1, type=int)
per_page = 20
offset = (page - 1) * per_page
feeds_list, total_feeds = [], 0
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT COUNT(*) FROM feeds")
total_feeds = cursor.fetchone()[0]
cursor.execute("SELECT * FROM feeds ORDER BY nombre LIMIT %s OFFSET %s", (per_page, offset))
feeds_list = cursor.fetchall()
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al obtener lista de feeds: {db_err}")
flash("Error al obtener la lista de feeds.", "error")
total_pages = math.ceil(total_feeds / per_page) if total_feeds > 0 else 0
return render_template("feeds_list.html", feeds=feeds_list, page=page, total_pages=total_pages, total_feeds=total_feeds)
def _get_form_dependencies(cursor):
cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre")
categorias = cursor.fetchall()
cursor.execute("SELECT id, nombre, continente_id FROM paises ORDER BY nombre")
paises = cursor.fetchall()
return categorias, paises
@app.route("/feeds/add", methods=['GET', 'POST'])
def add_feed():
if request.method == 'POST':
nombre = request.form.get("nombre")
try:
with get_conn() as conn:
with conn.cursor() as cursor:
categoria_id = int(request.form.get("categoria_id")) if request.form.get("categoria_id") else None
pais_id = int(request.form.get("pais_id")) if request.form.get("pais_id") else None
cursor.execute(
"INSERT INTO feeds (nombre, descripcion, url, categoria_id, pais_id, idioma) VALUES (%s, %s, %s, %s, %s, %s)",
(nombre, request.form.get("descripcion"), request.form.get("url"), categoria_id, pais_id, (request.form.get("idioma", "").strip() or None))
)
flash(f"Feed '{nombre}' añadido correctamente.", "success")
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al agregar feed: {db_err}", exc_info=True)
flash(f"Error al añadir el feed: {db_err}", "error")
return redirect(url_for("dashboard"))
categorias, paises = [], []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
categorias, paises = _get_form_dependencies(cursor)
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al cargar formulario: {db_err}")
flash("No se pudieron cargar las categorías o países.", "error")
return render_template("add_feed.html", categorias=categorias, paises=paises)
@app.route("/edit/<int:feed_id>", methods=["GET", "POST"])
def edit_feed(feed_id):
if request.method == "POST":
try:
with get_conn() as conn:
with conn.cursor() as cursor:
categoria_id = int(request.form.get("categoria_id")) if request.form.get("categoria_id") else None
pais_id = int(request.form.get("pais_id")) if request.form.get("pais_id") else None
idioma = request.form.get("idioma", "").strip() or None
activo = "activo" in request.form
cursor.execute(
"UPDATE feeds SET nombre=%s, descripcion=%s, url=%s, categoria_id=%s, pais_id=%s, idioma=%s, activo=%s WHERE id=%s",
(request.form.get("nombre"), request.form.get("descripcion"), request.form.get("url"), categoria_id, pais_id, idioma, activo, feed_id)
)
flash("Feed actualizado correctamente.", "success")
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al actualizar feed: {db_err}", exc_info=True)
flash(f"Error al actualizar el feed: {db_err}", "error")
return redirect(url_for("manage_feeds"))
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT * FROM feeds WHERE id = %s", (feed_id,))
feed = cursor.fetchone()
if not feed:
flash("No se encontró el feed solicitado.", "error")
return redirect(url_for("manage_feeds"))
categorias, paises = _get_form_dependencies(cursor)
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al cargar feed para editar: {db_err}", exc_info=True)
flash("Error al cargar el feed para editar.", "error")
return redirect(url_for("manage_feeds"))
return render_template("edit_feed.html", feed=feed, categorias=categorias, paises=paises)
@app.route("/delete/<int:feed_id>")
def delete_feed(feed_id):
try:
with get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM feeds WHERE id=%s", (feed_id,))
flash("Feed eliminado correctamente.", "success")
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al eliminar feed: {db_err}", exc_info=True)
flash(f"Error al eliminar el feed: {db_err}", "error")
return redirect(url_for("manage_feeds"))
@app.route("/reactivar_feed/<int:feed_id>")
def reactivar_feed(feed_id):
try:
with get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("UPDATE feeds SET activo = TRUE, fallos = 0 WHERE id = %s", (feed_id,))
flash("Feed reactivado.", "success")
except psycopg2.Error as db_err:
flash(f"Error al reactivar feed: {db_err}", "error")
return redirect(url_for("manage_feeds"))
@app.route("/backup_feeds")
def backup_feeds():
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT f.id, f.nombre, f.descripcion, f.url, f.categoria_id, c.nombre AS categoria, f.pais_id, p.nombre AS pais, f.idioma, f.activo, f.fallos FROM feeds f LEFT JOIN categorias c ON f.categoria_id = c.id LEFT JOIN paises p ON f.pais_id = p.id ORDER BY f.id")
feeds_ = cursor.fetchall()
if not feeds_:
flash("No hay feeds para exportar.", "warning")
return redirect(url_for("dashboard"))
output = StringIO()
writer = csv.DictWriter(output, fieldnames=feeds_[0].keys())
writer.writeheader()
writer.writerows(feeds_)
return Response(output.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=feeds_backup.csv"})
except Exception as e:
app.logger.error(f"[ERROR] Al hacer backup de feeds: {e}", exc_info=True)
return redirect(url_for("dashboard"))
@app.route("/backup_noticias")
def backup_noticias():
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT n.id, n.titulo, n.resumen, n.url, n.fecha, n.imagen_url, c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente FROM noticias n LEFT JOIN categorias c ON n.categoria_id = c.id LEFT JOIN paises p ON n.pais_id = p.id LEFT JOIN continentes co ON p.continente_id = co.id ORDER BY n.fecha DESC")
noticias = cursor.fetchall()
if not noticias:
flash("No hay noticias para exportar.", "warning")
return redirect(url_for("dashboard"))
output = StringIO()
writer = csv.DictWriter(output, fieldnames=noticias[0].keys())
writer.writeheader()
writer.writerows(noticias)
return Response(output.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=noticias_backup.csv"})
except Exception as e:
app.logger.error(f"[ERROR] Al hacer backup de noticias: {e}", exc_info=True)
return redirect(url_for("dashboard"))
@app.route("/backup_completo")
def backup_completo():
try:
memory_buffer = BytesIO()
with zipfile.ZipFile(memory_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT f.id, f.nombre, f.descripcion, f.url, f.categoria_id, c.nombre AS categoria, f.pais_id, p.nombre AS pais, f.idioma, f.activo, f.fallos FROM feeds f LEFT JOIN categorias c ON f.categoria_id = c.id LEFT JOIN paises p ON f.pais_id = p.id ORDER BY f.id")
feeds_data = cursor.fetchall()
if feeds_data:
output = StringIO()
writer = csv.DictWriter(output, fieldnames=feeds_data[0].keys())
writer.writeheader()
writer.writerows(feeds_data)
zipf.writestr("feeds.csv", output.getvalue())
cursor.execute("SELECT n.id, n.titulo, n.resumen, n.url, n.fecha, n.imagen_url, c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente FROM noticias n LEFT JOIN categorias c ON n.categoria_id = c.id LEFT JOIN paises p ON n.pais_id = p.id LEFT JOIN continentes co ON p.continente_id = co.id ORDER BY n.fecha DESC")
noticias_data = cursor.fetchall()
if noticias_data:
output = StringIO()
writer = csv.DictWriter(output, fieldnames=noticias_data[0].keys())
writer.writeheader()
writer.writerows(noticias_data)
zipf.writestr("noticias.csv", output.getvalue())
memory_buffer.seek(0)
return Response(memory_buffer, mimetype="application/zip", headers={"Content-Disposition": "attachment;filename=rss_backup_completo.zip"})
except Exception as e:
app.logger.error(f"[ERROR] Al hacer backup completo: {e}", exc_info=True)
return redirect(url_for("dashboard"))
@app.route("/restore_feeds", methods=["GET", "POST"])
def restore_feeds():
if request.method == "POST":
file = request.files.get("file")
if not file or not file.filename.endswith(".csv"):
flash("Archivo no válido. Sube un .csv.", "error")
return redirect(url_for("restore_feeds"))
try:
file_stream = StringIO(file.read().decode("utf-8", errors='ignore'))
reader = csv.DictReader(file_stream)
rows = list(reader)
n_ok, n_err = 0, 0
with get_conn() as conn:
for row in rows:
with conn.cursor() as cursor:
try:
cursor.execute("SAVEPOINT restore_feed_row")
activo = str(row.get("activo", "")).strip().lower() in ["1", "true", "t", "yes", "on"]
cat_id = int(row["categoria_id"]) if row.get("categoria_id") and row["categoria_id"].strip() else None
pais_id = int(row["pais_id"]) if row.get("pais_id") and row["pais_id"].strip() else None
cursor.execute(
"""
INSERT INTO feeds (id, nombre, descripcion, url, categoria_id, pais_id, idioma, activo, fallos)
VALUES (%(id)s, %(nombre)s, %(descripcion)s, %(url)s, %(categoria_id)s, %(pais_id)s, %(idioma)s, %(activo)s, %(fallos)s)
ON CONFLICT (id) DO UPDATE SET
nombre=EXCLUDED.nombre, descripcion=EXCLUDED.descripcion, url=EXCLUDED.url, categoria_id=EXCLUDED.categoria_id,
pais_id=EXCLUDED.pais_id, idioma=EXCLUDED.idioma, activo=EXCLUDED.activo, fallos=EXCLUDED.fallos;
""",
{"id": int(row["id"]), "nombre": row.get("nombre"), "descripcion": row.get("descripcion") or "", "url": row.get("url"),
"categoria_id": cat_id, "pais_id": pais_id, "idioma": row.get("idioma") or None, "activo": activo,
"fallos": int(row.get("fallos", 0) or 0)}
)
n_ok += 1
cursor.execute("RELEASE SAVEPOINT restore_feed_row")
except Exception as e:
cursor.execute("ROLLBACK TO SAVEPOINT restore_feed_row")
n_err += 1
app.logger.error(f"Error procesando fila (se omite): {row} - Error: {e}")
flash(f"Restauración completada. Feeds procesados: {n_ok}. Errores: {n_err}.", "success" if n_err == 0 else "warning")
except Exception as e:
app.logger.error(f"Error al restaurar feeds desde CSV: {e}", exc_info=True)
flash(f"Ocurrió un error general al procesar el archivo: {e}", "error")
return redirect(url_for("dashboard"))
return render_template("restore_feeds.html")
# --- fetch_and_store function (modified slightly) ---
def fetch_and_store():
with app.app_context():
logging.info("--- INICIANDO CICLO DE CAPTURA ---")
feeds_to_process = []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
logging.info("Paso 1: Obteniendo lista de feeds...")
cursor.execute("SELECT id, url, categoria_id, pais_id, last_etag, last_modified FROM feeds WHERE activo = TRUE")
feeds_to_process = cursor.fetchall()
logging.info(f"Paso 2: {len(feeds_to_process)} feeds para procesar.")
except psycopg2.Error as db_err:
logging.error(f"Error de BD al obtener feeds: {db_err}")
return
if not feeds_to_process:
logging.info("No hay feeds activos para procesar.")
return
feeds_fallidos, feeds_exitosos, todas_las_noticias, feeds_para_actualizar_headers = [], [], [], []
logging.info(f"Paso 3: Iniciando procesamiento paralelo ({MAX_WORKERS} workers)...")
# Pass the dict form of feed data
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_feed = {executor.submit(process_single_feed, dict(feed)): feed for feed in feeds_to_process}
progress_bar = tqdm(as_completed(future_to_feed), total=len(feeds_to_process), desc="Procesando Feeds")
for future in progress_bar:
original_feed_data = future_to_feed[future] # This is the original DictRow
feed_id = original_feed_data['id']
try:
_, noticias_encontradas, new_etag, new_modified, success = future.result(timeout=SINGLE_FEED_TIMEOUT)
if success:
feeds_exitosos.append(feed_id)
if noticias_encontradas: todas_las_noticias.extend(noticias_encontradas)
# Only add if etag/modified actually changed or were initially None
if (new_etag is not None and new_etag != original_feed_data.get('last_etag')) or \
(new_modified is not None and new_modified != original_feed_data.get('last_modified')):
feeds_para_actualizar_headers.append({'id': feed_id, 'etag': new_etag, 'modified': new_modified})
else:
feeds_fallidos.append(feed_id)
except TimeoutError:
logging.error(f"!!! TIMEOUT en feed {original_feed_data['url']} (ID: {feed_id})")
feeds_fallidos.append(feed_id)
except Exception as exc:
logging.error(f"Excepción en feed {original_feed_data['url']} (ID: {feed_id}): {exc}", exc_info=True)
feeds_fallidos.append(feed_id)
logging.info(f"Paso 4: Procesamiento finalizado. Noticias nuevas: {len(todas_las_noticias)}, Feeds fallidos: {len(feeds_fallidos)}, Feeds actualizados: {len(feeds_para_actualizar_headers)}.")
if not any([todas_las_noticias, feeds_fallidos, feeds_exitosos, feeds_para_actualizar_headers]):
logging.info("Sin cambios que aplicar en la base de datos.")
return
try:
with get_conn() as conn: # This connection is for the entire transaction (commits/rolls back everything together)
logging.info("Paso 5: Actualizando BD...")
# --- Update feed status (fallos, activo) ---
if feeds_fallidos or feeds_exitosos: # Only create cursor if there's work
with conn.cursor() as cursor_feeds_status:
if feeds_fallidos:
cursor_feeds_status.execute("UPDATE feeds SET fallos = fallos + 1 WHERE id IN %s", (tuple(feeds_fallidos),))
cursor_feeds_status.execute("UPDATE feeds SET activo = FALSE WHERE fallos >= %s AND id IN %s", (MAX_FALLOS, tuple(feeds_fallidos)))
if feeds_exitosos:
cursor_feeds_status.execute("UPDATE feeds SET fallos = 0 WHERE id IN %s", (tuple(feeds_exitosos),))
# cursor_feeds_status is implicitly closed here
# --- Update feed headers (etag, modified) ---
if feeds_para_actualizar_headers: # Only create cursor if there's work
with conn.cursor() as cursor_headers:
psycopg2.extras.execute_values(
cursor_headers,
"UPDATE feeds SET last_etag = data.etag, last_modified = data.modified FROM (VALUES %s) AS data(id, etag, modified) WHERE feeds.id = data.id",
[(f['id'], f['etag'], f['modified']) for f in feeds_para_actualizar_headers]
)
# cursor_headers is implicitly closed here
# --- Insert new news articles ---
if todas_las_noticias: # Only create cursor if there's work
logging.info(f"Intentando insertar {len(todas_las_noticias)} noticias en la base de datos.")
with conn.cursor() as cursor_news_insert: # A fresh cursor specifically for news insertion
psycopg2.extras.execute_values(
cursor_news_insert,
"INSERT INTO noticias (id, titulo, resumen, url, fecha, imagen_url, categoria_id, pais_id) VALUES %s ON CONFLICT (id) DO NOTHING",
todas_las_noticias
)
rows_inserted = cursor_news_insert.rowcount
logging.info(f"Se insertaron/omitieron {rows_inserted} noticias (ON CONFLICT DO NOTHING).")
# cursor_news_insert is implicitly closed here
logging.info("--- CICLO DE CAPTURA FINALIZADO ---")
except psycopg2.Error as db_err:
logging.error(f"Error de BD en actualización masiva: {db_err}", exc_info=True)
# --- Arranque de la Aplicación (SOLO PARA DESARROLLO LOCAL) ---
if __name__ == "__main__":
if not db_pool:
app.logger.error("La aplicación no puede arrancar sin una conexión a la base de datos.")
sys.exit(1)
app.run(host="0.0.0.0", port=5000, debug=True)