901 lines
42 KiB
Python
901 lines
42 KiB
Python
import os
|
||
import sys
|
||
import csv
|
||
import math
|
||
from io import StringIO, BytesIO
|
||
from datetime import datetime
|
||
import logging
|
||
import atexit
|
||
import zipfile
|
||
from contextlib import contextmanager
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from tqdm import tqdm
|
||
|
||
from flask import Flask, render_template, request, redirect, url_for, Response, flash, make_response
|
||
import psycopg2
|
||
import psycopg2.extras
|
||
import psycopg2.pool
|
||
import bleach
|
||
|
||
from feed_processor import process_single_feed
|
||
from url_processor import process_newspaper_url
|
||
|
||
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='[%(asctime)s] %(levelname)s in %(module)s: %(message)s')
|
||
|
||
app = Flask(__name__)
|
||
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', os.urandom(24))
|
||
|
||
DB_CONFIG = {
|
||
"host": os.environ.get("DB_HOST", "localhost"),
|
||
"port": int(os.environ.get("DB_PORT", 5432)),
|
||
"dbname": os.environ.get("DB_NAME", "rss"),
|
||
"user": os.environ.get("DB_USER", "rss"),
|
||
"password": os.environ.get("DB_PASS", "x")
|
||
}
|
||
|
||
MAX_WORKERS = int(os.environ.get("RSS_MAX_WORKERS", 20))
|
||
SINGLE_FEED_TIMEOUT = int(os.environ.get("RSS_FEED_TIMEOUT", 30))
|
||
MAX_FALLOS = int(os.environ.get("RSS_MAX_FAILURES", 5))
|
||
|
||
# Tamaño de página configurable (límite en 10–100 por seguridad)
|
||
NEWS_PER_PAGE = int(os.environ.get("NEWS_PER_PAGE", 20))
|
||
|
||
# Idioma/traducción por defecto
|
||
DEFAULT_TRANSLATION_LANG = os.environ.get("DEFAULT_TRANSLATION_LANG", "es").strip().lower()
|
||
DEFAULT_LANG = os.environ.get("DEFAULT_LANG", DEFAULT_TRANSLATION_LANG).strip().lower()
|
||
WEB_TRANSLATED_DEFAULT = os.environ.get("WEB_TRANSLATED_DEFAULT", "1").strip().lower() in ("1", "true", "yes")
|
||
|
||
db_pool = None
|
||
try:
|
||
db_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=10, **DB_CONFIG)
|
||
app.logger.info("Pool de conexiones a la base de datos creado exitosamente.")
|
||
except psycopg2.OperationalError as e:
|
||
logging.error(f"FATAL: No se pudo conectar a la base de datos para crear el pool: {e}")
|
||
|
||
@contextmanager
|
||
def get_conn():
|
||
if not db_pool:
|
||
raise ConnectionError("El pool de la base de datos no está disponible.")
|
||
conn = None
|
||
try:
|
||
conn = db_pool.getconn()
|
||
yield conn
|
||
conn.commit()
|
||
except Exception as e:
|
||
if conn:
|
||
conn.rollback()
|
||
raise e
|
||
finally:
|
||
if conn:
|
||
db_pool.putconn(conn)
|
||
|
||
@atexit.register
|
||
def shutdown_hooks():
|
||
if db_pool:
|
||
db_pool.closeall()
|
||
app.logger.info("Pool de conexiones de la base de datos cerrado.")
|
||
|
||
@app.template_filter('safe_html')
|
||
def safe_html(text):
|
||
if not text:
|
||
return ""
|
||
return bleach.clean(
|
||
text,
|
||
tags={'a', 'b', 'strong', 'i', 'em', 'p', 'br'},
|
||
attributes={'a': ['href', 'title']},
|
||
strip=True
|
||
)
|
||
|
||
def _get_form_dependencies(cursor):
|
||
cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre")
|
||
categorias = cursor.fetchall()
|
||
cursor.execute("SELECT id, nombre, continente_id FROM paises ORDER BY nombre")
|
||
paises = cursor.fetchall()
|
||
return categorias, paises
|
||
|
||
def _get_lang_and_flags():
|
||
"""
|
||
Determina el idioma preferido y si se debe usar traducción por defecto.
|
||
Permite forzar original con ?orig=1 y cambiar idioma con ?lang=xx (se guarda en cookie).
|
||
"""
|
||
qlang = request.args.get("lang", "").strip().lower()
|
||
cookie_lang = (request.cookies.get("lang") or "").strip().lower()
|
||
lang = qlang or cookie_lang or DEFAULT_LANG or "es"
|
||
|
||
force_orig = request.args.get("orig") == "1"
|
||
use_translation = (not force_orig) and WEB_TRANSLATED_DEFAULT
|
||
return lang, use_translation, bool(qlang)
|
||
|
||
def _build_news_query(args, *, count=False, limit=None, offset=None, lang="es", use_translation=True):
|
||
"""
|
||
Construye la consulta SQL y los parámetros basados en los argumentos de la petición.
|
||
Si count=True => SELECT COUNT(*)
|
||
Si count=False => SELECT columnas con ORDER + LIMIT/OFFSET.
|
||
Integra traducciones vía LEFT JOIN LATERAL cuando use_translation=True (status='done', lang_to=lang).
|
||
"""
|
||
# Para controlar orden de parámetros según apariciones de %s:
|
||
select_rank_params = []
|
||
from_params = []
|
||
where_params = []
|
||
tail_params = []
|
||
|
||
conditions = []
|
||
|
||
q = args.get("q", "").strip()
|
||
cat_id = args.get("categoria_id")
|
||
cont_id = args.get("continente_id")
|
||
pais_id = args.get("pais_id")
|
||
fecha_filtro = args.get("fecha")
|
||
|
||
# FROM base
|
||
sql_from = """
|
||
FROM noticias n
|
||
LEFT JOIN categorias c ON n.categoria_id = c.id
|
||
LEFT JOIN paises p ON n.pais_id = p.id
|
||
LEFT JOIN continentes co ON p.continente_id = co.id
|
||
"""
|
||
|
||
# LEFT JOIN LATERAL traducción (solo en SELECT de página; el conteo no la necesita)
|
||
if (not count) and use_translation:
|
||
sql_from += """
|
||
LEFT JOIN LATERAL (
|
||
SELECT titulo_trad, resumen_trad
|
||
FROM traducciones
|
||
WHERE traducciones.noticia_id = n.id
|
||
AND traducciones.lang_to = %s
|
||
AND traducciones.status = 'done'
|
||
ORDER BY id DESC
|
||
LIMIT 1
|
||
) t ON TRUE
|
||
"""
|
||
from_params.append(lang)
|
||
|
||
# WHERE dinámico
|
||
if q:
|
||
# Buscar por relevancia en el tsvector de la noticia original
|
||
conditions.append("n.tsv @@ plainto_tsquery('spanish', %s)")
|
||
where_params.append(q)
|
||
|
||
if cat_id:
|
||
conditions.append("n.categoria_id = %s")
|
||
where_params.append(cat_id)
|
||
|
||
if pais_id:
|
||
conditions.append("n.pais_id = %s")
|
||
where_params.append(pais_id)
|
||
elif cont_id:
|
||
conditions.append("p.continente_id = %s")
|
||
where_params.append(cont_id)
|
||
|
||
if fecha_filtro:
|
||
try:
|
||
fecha_obj = datetime.strptime(fecha_filtro, '%Y-%m-%d')
|
||
conditions.append("n.fecha::date = %s")
|
||
where_params.append(fecha_obj.date())
|
||
except ValueError:
|
||
flash("Formato de fecha no válido. Use AAAA-MM-DD.", "error")
|
||
|
||
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
|
||
|
||
if count:
|
||
# Conteo total (sin necesidad de traducciones)
|
||
sql_count = "SELECT COUNT(*) " + sql_from + where_clause
|
||
sql_params = from_params + where_params # from_params estará vacío en count
|
||
return sql_count, sql_params
|
||
|
||
# Selección de columnas para página
|
||
if use_translation:
|
||
select_cols = """
|
||
SELECT n.fecha,
|
||
COALESCE(t.titulo_trad, n.titulo) AS titulo,
|
||
COALESCE(t.resumen_trad, n.resumen) AS resumen,
|
||
n.url, n.imagen_url, n.fuente_nombre,
|
||
c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente,
|
||
(t.titulo_trad IS NOT NULL OR t.resumen_trad IS NOT NULL) AS usa_trad
|
||
"""
|
||
else:
|
||
select_cols = """
|
||
SELECT n.fecha, n.titulo, n.resumen,
|
||
n.url, n.imagen_url, n.fuente_nombre,
|
||
c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente,
|
||
FALSE AS usa_trad
|
||
"""
|
||
|
||
order_clause = " ORDER BY n.fecha DESC NULLS LAST"
|
||
|
||
if q:
|
||
# Ranking por relevancia (primer placeholder)
|
||
select_cols = select_cols.replace(
|
||
"SELECT",
|
||
"SELECT ts_rank(n.tsv, plainto_tsquery('spanish', %s)) AS rank,"
|
||
)
|
||
select_rank_params.append(q)
|
||
order_clause = " ORDER BY rank DESC, n.fecha DESC NULLS LAST"
|
||
|
||
# Paginación
|
||
if limit is not None:
|
||
order_clause += " LIMIT %s"
|
||
tail_params.append(limit)
|
||
if offset is not None:
|
||
order_clause += " OFFSET %s"
|
||
tail_params.append(offset)
|
||
|
||
sql_page = select_cols + sql_from + where_clause + order_clause
|
||
sql_params = select_rank_params + from_params + where_params + tail_params
|
||
return sql_page, sql_params
|
||
|
||
@app.route("/")
|
||
def home():
|
||
noticias, categorias, continentes, paises = [], [], [], []
|
||
|
||
# Estado de filtros (para mantenerlos en la UI)
|
||
q = request.args.get("q", "").strip()
|
||
cat_id = request.args.get("categoria_id")
|
||
cont_id = request.args.get("continente_id")
|
||
pais_id = request.args.get("pais_id")
|
||
fecha_filtro = request.args.get("fecha")
|
||
|
||
# Preferencias idioma/uso de traducción
|
||
lang, use_tr, set_cookie = _get_lang_and_flags()
|
||
|
||
# Paginación
|
||
page = request.args.get("page", default=1, type=int)
|
||
per_page = request.args.get("per_page", default=NEWS_PER_PAGE, type=int)
|
||
# límites de seguridad
|
||
if per_page is None or per_page <= 0:
|
||
per_page = NEWS_PER_PAGE
|
||
per_page = 100 if per_page > 100 else (10 if per_page < 10 else per_page)
|
||
if page is None or page <= 0:
|
||
page = 1
|
||
offset = (page - 1) * per_page
|
||
|
||
total_results = 0
|
||
total_pages = 0
|
||
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
||
# Dependencias de UI
|
||
cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre")
|
||
categorias = cursor.fetchall()
|
||
cursor.execute("SELECT id, nombre FROM continentes ORDER BY nombre")
|
||
continentes = cursor.fetchall()
|
||
cursor.execute("SELECT id, nombre, continente_id FROM paises ORDER BY nombre")
|
||
paises = cursor.fetchall()
|
||
|
||
# 1) Conteo total (no requiere join de traducciones)
|
||
sql_count, params_count = _build_news_query(
|
||
request.args, count=True, lang=lang, use_translation=use_tr
|
||
)
|
||
cursor.execute(sql_count, tuple(params_count))
|
||
total_results = cursor.fetchone()[0] or 0
|
||
total_pages = math.ceil(total_results / per_page) if total_results else 0
|
||
|
||
# 2) Página actual (con COALESCE a traducción si procede)
|
||
sql_page, params_page = _build_news_query(
|
||
request.args,
|
||
count=False,
|
||
limit=per_page,
|
||
offset=offset,
|
||
lang=lang,
|
||
use_translation=use_tr
|
||
)
|
||
cursor.execute(sql_page, tuple(params_page))
|
||
noticias = cursor.fetchall()
|
||
|
||
except psycopg2.Error as db_err:
|
||
app.logger.error(f"[DB ERROR] Al leer noticias: {db_err}", exc_info=True)
|
||
flash("Error de base de datos al cargar las noticias.", "error")
|
||
|
||
ctx = dict(
|
||
noticias=noticias, categorias=categorias, continentes=continentes, paises=paises,
|
||
cat_id=int(cat_id) if cat_id else None, cont_id=int(cont_id) if cont_id else None,
|
||
pais_id=int(pais_id) if pais_id else None, fecha_filtro=fecha_filtro, q=q,
|
||
page=page, per_page=per_page, total_pages=total_pages, total_results=total_results,
|
||
lang=lang, use_tr=use_tr
|
||
)
|
||
|
||
# Respuesta parcial para AJAX
|
||
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
|
||
resp = make_response(render_template('_noticias_list.html', **ctx))
|
||
if set_cookie:
|
||
resp.set_cookie("lang", lang, max_age=60*60*24*365)
|
||
return resp
|
||
|
||
# Render completo
|
||
html = render_template("noticias.html", **ctx)
|
||
resp = make_response(html)
|
||
if set_cookie:
|
||
resp.set_cookie("lang", lang, max_age=60*60*24*365)
|
||
return resp
|
||
|
||
@app.route("/dashboard")
|
||
def dashboard():
|
||
stats = {'feeds_totales': 0, 'noticias_totales': 0, 'feeds_caidos': 0}
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("SELECT COUNT(*) FROM feeds")
|
||
stats['feeds_totales'] = cursor.fetchone()[0]
|
||
cursor.execute("SELECT COUNT(*) FROM noticias")
|
||
stats['noticias_totales'] = cursor.fetchone()[0]
|
||
cursor.execute("SELECT COUNT(*) FROM feeds WHERE activo = FALSE")
|
||
stats['feeds_caidos'] = cursor.fetchone()[0]
|
||
except psycopg2.Error as db_err:
|
||
app.logger.error(f"[DB ERROR] Al calcular estadísticas: {db_err}")
|
||
flash("Error al conectar con la base de datos.", "error")
|
||
return render_template("dashboard.html", stats=stats)
|
||
|
||
@app.route("/feeds/manage")
|
||
def manage_feeds():
|
||
page = request.args.get('page', 1, type=int)
|
||
per_page = 20
|
||
offset = (page - 1) * per_page
|
||
feeds_list, total_feeds = [], 0
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
||
cursor.execute("SELECT COUNT(*) FROM feeds")
|
||
total_feeds = cursor.fetchone()[0]
|
||
cursor.execute("""
|
||
SELECT f.id, f.nombre, f.url, c.nombre as categoria, p.nombre as pais, f.idioma, f.activo, f.fallos
|
||
FROM feeds f
|
||
LEFT JOIN categorias c ON f.categoria_id = c.id
|
||
LEFT JOIN paises p ON f.pais_id = p.id
|
||
ORDER BY f.nombre LIMIT %s OFFSET %s
|
||
""", (per_page, offset))
|
||
feeds_list = cursor.fetchall()
|
||
except psycopg2.Error as db_err:
|
||
app.logger.error(f"[DB ERROR] Al obtener lista de feeds: {db_err}")
|
||
flash("Error al obtener la lista de feeds.", "error")
|
||
total_pages = math.ceil(total_feeds / per_page) if total_feeds > 0 else 0
|
||
return render_template("feeds_list.html", feeds=feeds_list, page=page, total_pages=total_pages, total_feeds=total_feeds)
|
||
|
||
@app.route("/feeds/add", methods=['GET', 'POST'])
|
||
def add_feed():
|
||
if request.method == 'POST':
|
||
nombre = request.form.get("nombre")
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cursor:
|
||
categoria_id = int(request.form.get("categoria_id")) if request.form.get("categoria_id") else None
|
||
pais_id = int(request.form.get("pais_id")) if request.form.get("pais_id") else None
|
||
cursor.execute(
|
||
"INSERT INTO feeds (nombre, descripcion, url, categoria_id, pais_id, idioma) VALUES (%s, %s, %s, %s, %s, %s)",
|
||
(nombre, request.form.get("descripcion"), request.form.get("url"), categoria_id, pais_id, (request.form.get("idioma", "").strip() or None))
|
||
)
|
||
flash(f"Feed '{nombre}' añadido correctamente.", "success")
|
||
except psycopg2.Error as db_err:
|
||
app.logger.error(f"[DB ERROR] Al agregar feed: {db_err}", exc_info=True)
|
||
flash(f"Error al añadir el feed: {db_err}", "error")
|
||
return redirect(url_for("manage_feeds"))
|
||
|
||
categorias, paises = [], []
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
||
categorias, paises = _get_form_dependencies(cursor)
|
||
except psycopg2.Error as db_err:
|
||
app.logger.error(f"[DB ERROR] Al cargar formulario: {db_err}")
|
||
flash("No se pudieron cargar las categorías o países.", "error")
|
||
return render_template("add_feed.html", categorias=categorias, paises=paises)
|
||
|
||
@app.route("/feeds/edit/<int:feed_id>", methods=["GET", "POST"])
|
||
def edit_feed(feed_id):
|
||
if request.method == "POST":
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cursor:
|
||
categoria_id = int(request.form.get("categoria_id")) if request.form.get("categoria_id") else None
|
||
pais_id = int(request.form.get("pais_id")) if request.form.get("pais_id") else None
|
||
idioma = request.form.get("idioma", "").strip() or None
|
||
activo = "activo" in request.form
|
||
cursor.execute(
|
||
"UPDATE feeds SET nombre=%s, descripcion=%s, url=%s, categoria_id=%s, pais_id=%s, idioma=%s, activo=%s WHERE id=%s",
|
||
(request.form.get("nombre"), request.form.get("descripcion"), request.form.get("url"), categoria_id, pais_id, idioma, activo, feed_id)
|
||
)
|
||
flash("Feed actualizado correctamente.", "success")
|
||
except psycopg2.Error as db_err:
|
||
app.logger.error(f"[DB ERROR] Al actualizar feed: {db_err}", exc_info=True)
|
||
flash(f"Error al actualizar el feed: {db_err}", "error")
|
||
return redirect(url_for("manage_feeds"))
|
||
|
||
feed, categorias, paises = None, [], []
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
||
cursor.execute("SELECT * FROM feeds WHERE id = %s", (feed_id,))
|
||
feed = cursor.fetchone()
|
||
if not feed:
|
||
flash("No se encontró el feed solicitado.", "error")
|
||
return redirect(url_for("manage_feeds"))
|
||
categorias, paises = _get_form_dependencies(cursor)
|
||
except psycopg2.Error as db_err:
|
||
flash("Error al cargar el feed para editar.", "error")
|
||
app.logger.error(f"Error al cargar feed {feed_id} para editar: {db_err}")
|
||
return redirect(url_for("manage_feeds"))
|
||
return render_template("edit_feed.html", feed=feed, categorias=categorias, paises=paises)
|
||
|
||
@app.route("/feeds/delete/<int:feed_id>")
|
||
def delete_feed(feed_id):
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("DELETE FROM feeds WHERE id=%s", (feed_id,))
|
||
flash("Feed eliminado correctamente.", "success")
|
||
except psycopg2.Error as db_err:
|
||
app.logger.error(f"[DB ERROR] Al eliminar feed: {db_err}", exc_info=True)
|
||
flash(f"Error al eliminar el feed: {db_err}", "error")
|
||
return redirect(url_for("manage_feeds"))
|
||
|
||
@app.route("/feeds/reactivar/<int:feed_id>")
|
||
def reactivar_feed(feed_id):
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("UPDATE feeds SET activo = TRUE, fallos = 0 WHERE id = %s", (feed_id,))
|
||
flash("Feed reactivado.", "success")
|
||
except psycopg2.Error as db_err:
|
||
flash(f"Error al reactivar feed: {db_err}", "error")
|
||
return redirect(url_for("manage_feeds"))
|
||
|
||
@app.route("/urls/manage")
|
||
def manage_urls():
|
||
fuentes = []
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
||
cursor.execute("""
|
||
SELECT f.id, f.nombre, f.url, c.nombre as categoria, p.nombre as pais, f.idioma
|
||
FROM fuentes_url f
|
||
LEFT JOIN categorias c ON f.categoria_id = c.id
|
||
LEFT JOIN paises p ON f.pais_id = p.id
|
||
ORDER BY f.nombre
|
||
""")
|
||
fuentes = cursor.fetchall()
|
||
except psycopg2.Error as db_err:
|
||
app.logger.error(f"[DB ERROR] Al obtener lista de fuentes URL: {db_err}")
|
||
flash("Error al obtener la lista de fuentes URL.", "error")
|
||
return render_template("urls_list.html", fuentes=fuentes)
|
||
|
||
@app.route("/urls/add", methods=['GET', 'POST'])
|
||
def add_url_source():
|
||
if request.method == 'POST':
|
||
nombre = request.form.get("nombre")
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cursor:
|
||
categoria_id = int(request.form.get("categoria_id")) if request.form.get("categoria_id") else None
|
||
pais_id = int(request.form.get("pais_id")) if request.form.get("pais_id") else None
|
||
idioma = request.form.get("idioma", "es").strip().lower()
|
||
cursor.execute(
|
||
"INSERT INTO fuentes_url (nombre, url, categoria_id, pais_id, idioma) VALUES (%s, %s, %s, %s, %s)",
|
||
(nombre, request.form.get("url"), categoria_id, pais_id, idioma)
|
||
)
|
||
flash(f"Fuente URL '{nombre}' añadida correctamente.", "success")
|
||
except psycopg2.Error as db_err:
|
||
app.logger.error(f"[DB ERROR] Al agregar fuente URL: {db_err}", exc_info=True)
|
||
flash(f"Error al añadir la fuente URL: {db_err}", "error")
|
||
return redirect(url_for("manage_urls"))
|
||
|
||
categorias, paises = [], []
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
||
categorias, paises = _get_form_dependencies(cursor)
|
||
except psycopg2.Error as db_err:
|
||
app.logger.error(f"[DB ERROR] Al cargar formulario: {db_err}")
|
||
flash("No se pudieron cargar las categorías o países.", "error")
|
||
return render_template("add_url_source.html", categorias=categorias, paises=paises)
|
||
|
||
@app.route("/urls/edit/<int:url_id>", methods=["GET", "POST"])
|
||
def edit_url_source(url_id):
|
||
if request.method == "POST":
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cursor:
|
||
categoria_id = int(request.form.get("categoria_id")) if request.form.get("categoria_id") else None
|
||
pais_id = int(request.form.get("pais_id")) if request.form.get("pais_id") else None
|
||
idioma = request.form.get("idioma", "es").strip().lower()
|
||
cursor.execute(
|
||
"UPDATE fuentes_url SET nombre=%s, url=%s, categoria_id=%s, pais_id=%s, idioma=%s WHERE id=%s",
|
||
(request.form.get("nombre"), request.form.get("url"), categoria_id, pais_id, idioma, url_id)
|
||
)
|
||
flash("Fuente URL actualizada correctamente.", "success")
|
||
except psycopg2.Error as db_err:
|
||
app.logger.error(f"[DB ERROR] Al actualizar fuente URL: {db_err}", exc_info=True)
|
||
flash(f"Error al actualizar la fuente URL: {db_err}", "error")
|
||
return redirect(url_for("manage_urls"))
|
||
|
||
fuente, categorias, paises = None, [], []
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
||
cursor.execute("SELECT * FROM fuentes_url WHERE id = %s", (url_id,))
|
||
fuente = cursor.fetchone()
|
||
if not fuente:
|
||
flash("No se encontró la fuente URL solicitada.", "error")
|
||
return redirect(url_for("manage_urls"))
|
||
categorias, paises = _get_form_dependencies(cursor)
|
||
except psycopg2.Error as db_err:
|
||
flash("Error al cargar la fuente URL para editar.", "error")
|
||
app.logger.error(f"Error al cargar fuente URL {url_id} para editar: {db_err}")
|
||
return redirect(url_for("manage_urls"))
|
||
return render_template("edit_url_source.html", fuente=fuente, categorias=categorias, paises=paises)
|
||
|
||
@app.route("/urls/delete/<int:url_id>")
|
||
def delete_url_source(url_id):
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("DELETE FROM fuentes_url WHERE id=%s", (url_id,))
|
||
flash("Fuente URL eliminada correctamente.", "success")
|
||
except psycopg2.Error as db_err:
|
||
flash(f"Error al eliminar la fuente URL: {db_err}", "error")
|
||
return redirect(url_for("manage_urls"))
|
||
|
||
def fetch_and_store_all():
|
||
logging.info("--- INICIANDO CICLO DE CAPTURA GLOBAL (RSS y URL) ---")
|
||
todas_las_noticias = []
|
||
feeds_fallidos = []
|
||
feeds_exitosos = []
|
||
feeds_para_actualizar_headers = []
|
||
|
||
# --- Parte 1: Procesando Feeds RSS ---
|
||
logging.info("=> Parte 1: Procesando Feeds RSS...")
|
||
feeds_to_process = []
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
||
cursor.execute("SELECT id, nombre, url, categoria_id, pais_id, last_etag, last_modified FROM feeds WHERE activo = TRUE")
|
||
feeds_to_process = cursor.fetchall()
|
||
logging.info(f"Encontrados {len(feeds_to_process)} feeds RSS activos para procesar.")
|
||
except psycopg2.Error as db_err:
|
||
logging.error(f"Error de BD al obtener feeds RSS: {db_err}")
|
||
return
|
||
|
||
if feeds_to_process:
|
||
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
||
future_to_feed = {executor.submit(process_single_feed, dict(feed)): feed for feed in feeds_to_process}
|
||
for future in tqdm(as_completed(future_to_feed), total=len(feeds_to_process), desc="Procesando Fuentes RSS"):
|
||
original_feed_data = future_to_feed[future]
|
||
feed_id = original_feed_data['id']
|
||
try:
|
||
_, noticias_encontradas, new_etag, new_modified, success = future.result(timeout=SINGLE_FEED_TIMEOUT)
|
||
if success:
|
||
feeds_exitosos.append(feed_id)
|
||
if noticias_encontradas:
|
||
todas_las_noticias.extend(noticias_encontradas)
|
||
if (new_etag and new_etag != original_feed_data.get('last_etag')) or \
|
||
(new_modified and new_modified != original_feed_data.get('last_modified')):
|
||
feeds_para_actualizar_headers.append({'id': feed_id, 'etag': new_etag, 'modified': new_modified})
|
||
else:
|
||
feeds_fallidos.append(feed_id)
|
||
except Exception as exc:
|
||
logging.error(f"Excepción en feed {original_feed_data['url']} (ID: {feed_id}): {exc}")
|
||
feeds_fallidos.append(feed_id)
|
||
|
||
noticias_desde_rss_count = len(todas_las_noticias)
|
||
logging.info(f"=> Parte 1 Finalizada. Noticias desde RSS: {noticias_desde_rss_count}. Éxitos: {len(feeds_exitosos)}. Fallos: {len(feeds_fallidos)}.")
|
||
|
||
# --- Parte 2: Procesando Fuentes URL ---
|
||
logging.info("=> Parte 2: Procesando Fuentes URL...")
|
||
urls_to_process = []
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
||
cursor.execute("SELECT * FROM fuentes_url")
|
||
urls_to_process = cursor.fetchall()
|
||
logging.info(f"Encontradas {len(urls_to_process)} fuentes URL para scrapear.")
|
||
except Exception as e:
|
||
logging.error(f"Error de BD al obtener fuentes URL: {e}")
|
||
|
||
# Paraleliza la captura desde newspaper3k
|
||
if urls_to_process:
|
||
with ThreadPoolExecutor(max_workers=10) as executor:
|
||
future_to_url = {
|
||
executor.submit(
|
||
process_newspaper_url,
|
||
source['nombre'], source['url'], source['categoria_id'],
|
||
source['pais_id'], source['idioma']
|
||
): source for source in urls_to_process
|
||
}
|
||
for future in tqdm(as_completed(future_to_url), total=len(urls_to_process), desc="Procesando Fuentes URL"):
|
||
source = future_to_url[future]
|
||
try:
|
||
noticias_encontradas, _ = future.result()
|
||
if noticias_encontradas:
|
||
todas_las_noticias.extend(noticias_encontradas)
|
||
except Exception as exc:
|
||
logging.error(f"Fallo al procesar la fuente URL {source['nombre']}: {exc}")
|
||
|
||
noticias_desde_urls_count = len(todas_las_noticias) - noticias_desde_rss_count
|
||
logging.info(f"=> Parte 2 Finalizada. Noticias encontradas desde URLs: {noticias_desde_urls_count}.")
|
||
|
||
# --- Parte 3: Actualizando la base de datos ---
|
||
logging.info("=> Parte 3: Actualizando la base de datos...")
|
||
if not any([todas_las_noticias, feeds_fallidos, feeds_exitosos, feeds_para_actualizar_headers]):
|
||
logging.info("No se encontraron nuevas noticias ni cambios en los feeds. Nada que actualizar.")
|
||
logging.info("--- CICLO DE CAPTURA GLOBAL FINALIZADO ---")
|
||
return
|
||
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cursor:
|
||
if feeds_fallidos:
|
||
cursor.execute("UPDATE feeds SET fallos = fallos + 1 WHERE id IN %s", (tuple(feeds_fallidos),))
|
||
cursor.execute("UPDATE feeds SET activo = FALSE WHERE fallos >= %s AND id IN %s", (MAX_FALLOS, tuple(feeds_fallidos)))
|
||
logging.info(f"Incrementado contador de fallos para {len(feeds_fallidos)} feeds.")
|
||
|
||
if feeds_exitosos:
|
||
cursor.execute("UPDATE feeds SET fallos = 0 WHERE id IN %s", (tuple(feeds_exitosos),))
|
||
logging.info(f"Reseteado contador de fallos para {len(feeds_exitosos)} feeds.")
|
||
|
||
if feeds_para_actualizar_headers:
|
||
psycopg2.extras.execute_values(
|
||
cursor,
|
||
"UPDATE feeds SET last_etag = data.etag, last_modified = data.modified FROM (VALUES %s) AS data(id, etag, modified) WHERE feeds.id = data.id",
|
||
[(f['id'], f['etag'], f['modified']) for f in feeds_para_actualizar_headers]
|
||
)
|
||
logging.info(f"Actualizados headers para {len(feeds_para_actualizar_headers)} feeds.")
|
||
|
||
if todas_las_noticias:
|
||
logging.info(f"Intentando insertar/ignorar {len(todas_las_noticias)} noticias en total.")
|
||
insert_query = """
|
||
INSERT INTO noticias (id, titulo, resumen, url, fecha, imagen_url, fuente_nombre, categoria_id, pais_id)
|
||
VALUES %s
|
||
ON CONFLICT (url) DO NOTHING;
|
||
"""
|
||
psycopg2.extras.execute_values(cursor, insert_query, todas_las_noticias, page_size=200)
|
||
logging.info(f"Inserción de noticias finalizada. {cursor.rowcount} filas podrían haber sido afectadas.")
|
||
|
||
logging.info("=> Parte 3 Finalizada. Base de datos actualizada correctamente.")
|
||
except Exception as e:
|
||
logging.error(f"Error de BD en la actualización masiva final: {e}", exc_info=True)
|
||
|
||
logging.info("--- CICLO DE CAPTURA GLOBAL FINALIZADO ---")
|
||
|
||
# --- Funciones de Backup y Restore (sin cambios) ---
|
||
|
||
@app.route("/backup_feeds")
|
||
def backup_feeds():
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
||
cursor.execute("""
|
||
SELECT f.id, f.nombre, f.descripcion, f.url, f.categoria_id, c.nombre AS categoria,
|
||
f.pais_id, p.nombre AS pais, f.idioma, f.activo, f.fallos
|
||
FROM feeds f
|
||
LEFT JOIN categorias c ON f.categoria_id = c.id
|
||
LEFT JOIN paises p ON f.pais_id = p.id
|
||
ORDER BY f.id
|
||
""")
|
||
feeds_ = cursor.fetchall()
|
||
if not feeds_:
|
||
flash("No hay feeds para exportar.", "warning")
|
||
return redirect(url_for("dashboard"))
|
||
|
||
fieldnames = list(feeds_[0].keys())
|
||
output = StringIO()
|
||
writer = csv.DictWriter(output, fieldnames=fieldnames)
|
||
writer.writeheader()
|
||
writer.writerows([dict(feed) for feed in feeds_])
|
||
return Response(output.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=feeds_backup.csv"})
|
||
except Exception as e:
|
||
app.logger.error(f"[ERROR] Al hacer backup de feeds: {e}", exc_info=True)
|
||
flash(f"Error interno al generar el backup: {e}", "error")
|
||
return redirect(url_for("dashboard"))
|
||
|
||
@app.route("/backup_urls")
|
||
def backup_urls():
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
||
cursor.execute("""
|
||
SELECT f.id, f.nombre, f.url, f.categoria_id, c.nombre AS categoria, f.pais_id, p.nombre AS pais, f.idioma
|
||
FROM fuentes_url f
|
||
LEFT JOIN categorias c ON f.categoria_id = c.id
|
||
LEFT JOIN paises p ON f.pais_id = p.id
|
||
ORDER BY f.id
|
||
""")
|
||
fuentes = cursor.fetchall()
|
||
if not fuentes:
|
||
flash("No hay fuentes URL para exportar.", "warning")
|
||
return redirect(url_for("dashboard"))
|
||
|
||
fieldnames = list(fuentes[0].keys())
|
||
output = StringIO()
|
||
writer = csv.DictWriter(output, fieldnames=fieldnames)
|
||
writer.writeheader()
|
||
writer.writerows([dict(fuente) for fuente in fuentes])
|
||
return Response(
|
||
output.getvalue(),
|
||
mimetype="text/csv",
|
||
headers={"Content-Disposition": "attachment;filename=fuentes_url_backup.csv"}
|
||
)
|
||
except Exception as e:
|
||
app.logger.error(f"[ERROR] Al hacer backup de fuentes URL: {e}", exc_info=True)
|
||
flash(f"Error interno al generar el backup de fuentes URL: {e}", "error")
|
||
return redirect(url_for("dashboard"))
|
||
|
||
@app.route("/backup_noticias")
|
||
def backup_noticias():
|
||
try:
|
||
with get_conn() as conn:
|
||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
||
cursor.execute("""
|
||
SELECT n.id, n.titulo, n.resumen, n.url, n.fecha, n.imagen_url, n.fuente_nombre,
|
||
c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente
|
||
FROM noticias n
|
||
LEFT JOIN categorias c ON n.categoria_id = c.id
|
||
LEFT JOIN paises p ON n.pais_id = p.id
|
||
LEFT JOIN continentes co ON p.continente_id = co.id
|
||
ORDER BY n.fecha DESC
|
||
""")
|
||
noticias = cursor.fetchall()
|
||
if not noticias:
|
||
flash("No hay noticias para exportar.", "warning")
|
||
return redirect(url_for("dashboard"))
|
||
|
||
fieldnames_noticias = list(noticias[0].keys())
|
||
output = StringIO()
|
||
writer = csv.DictWriter(output, fieldnames=fieldnames_noticias)
|
||
writer.writeheader()
|
||
writer.writerows([dict(noticia) for noticia in noticias])
|
||
return Response(output.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=noticias_backup.csv"})
|
||
except Exception as e:
|
||
app.logger.error(f"[ERROR] Al hacer backup de noticias: {e}", exc_info=True)
|
||
flash(f"Error interno al generar el backup: {e}", "error")
|
||
return redirect(url_for("dashboard"))
|
||
|
||
@app.route("/backup_completo")
|
||
def backup_completo():
|
||
try:
|
||
memory_buffer = BytesIO()
|
||
with zipfile.ZipFile(memory_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||
with get_conn() as conn:
|
||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
|
||
# Backup Feeds
|
||
cursor.execute("SELECT * FROM feeds ORDER BY id")
|
||
feeds_data = cursor.fetchall()
|
||
if feeds_data:
|
||
output_feeds = StringIO()
|
||
writer_feeds = csv.DictWriter(output_feeds, fieldnames=list(feeds_data[0].keys()))
|
||
writer_feeds.writeheader()
|
||
writer_feeds.writerows([dict(f) for f in feeds_data])
|
||
zipf.writestr("feeds.csv", output_feeds.getvalue())
|
||
|
||
# Backup Fuentes URL
|
||
cursor.execute("SELECT * FROM fuentes_url ORDER BY id")
|
||
fuentes_data = cursor.fetchall()
|
||
if fuentes_data:
|
||
output_fuentes = StringIO()
|
||
writer_fuentes = csv.DictWriter(output_fuentes, fieldnames=list(fuentes_data[0].keys()))
|
||
writer_fuentes.writeheader()
|
||
writer_fuentes.writerows([dict(f) for f in fuentes_data])
|
||
zipf.writestr("fuentes_url.csv", output_fuentes.getvalue())
|
||
|
||
# Backup Noticias
|
||
cursor.execute("SELECT * FROM noticias ORDER BY fecha DESC")
|
||
noticias_data = cursor.fetchall()
|
||
if noticias_data:
|
||
output_noticias = StringIO()
|
||
writer_noticias = csv.DictWriter(output_noticias, fieldnames=list(noticias_data[0].keys()))
|
||
writer_noticias.writeheader()
|
||
writer_noticias.writerows([dict(n) for n in noticias_data])
|
||
zipf.writestr("noticias.csv", output_noticias.getvalue())
|
||
|
||
memory_buffer.seek(0)
|
||
return Response(memory_buffer, mimetype="application/zip", headers={"Content-Disposition": "attachment;filename=rss_backup_completo.zip"})
|
||
except Exception as e:
|
||
app.logger.error(f"[ERROR] Al hacer backup completo: {e}", exc_info=True)
|
||
flash(f"Error interno al generar el backup: {e}", "error")
|
||
return redirect(url_for("dashboard"))
|
||
|
||
@app.route("/restore_feeds", methods=["GET", "POST"])
|
||
def restore_feeds():
|
||
if request.method == "POST":
|
||
file = request.files.get("file")
|
||
if not file or not file.filename.endswith(".csv"):
|
||
flash("Archivo no válido. Sube un .csv.", "error")
|
||
return redirect(url_for("restore_feeds"))
|
||
|
||
try:
|
||
file_stream = StringIO(file.read().decode("utf-8", errors='ignore'))
|
||
reader = csv.DictReader(file_stream)
|
||
rows = list(reader)
|
||
n_ok, n_err = 0, 0
|
||
with get_conn() as conn:
|
||
for row in rows:
|
||
with conn.cursor() as cursor:
|
||
try:
|
||
cursor.execute("SAVEPOINT restore_feed_row")
|
||
activo = str(row.get("activo", "")).strip().lower() in ["1", "true", "t", "yes", "on"]
|
||
cat_id = int(row["categoria_id"]) if row.get("categoria_id") and row["categoria_id"].strip() else None
|
||
pais_id = int(row["pais_id"]) if row.get("pais_id") and row["pais_id"].strip() else None
|
||
cursor.execute(
|
||
"""
|
||
INSERT INTO feeds (id, nombre, descripcion, url, categoria_id, pais_id, idioma, activo, fallos)
|
||
VALUES (%(id)s, %(nombre)s, %(descripcion)s, %(url)s, %(categoria_id)s, %(pais_id)s, %(idioma)s, %(activo)s, %(fallos)s)
|
||
ON CONFLICT (id) DO UPDATE SET
|
||
nombre=EXCLUDED.nombre, descripcion=EXCLUDED.descripcion, url=EXCLUDED.url, categoria_id=EXCLUDED.categoria_id,
|
||
pais_id=EXCLUDED.pais_id, idioma=EXCLUDED.idioma, activo=EXCLUDED.activo, fallos=EXCLUDED.fallos;
|
||
""",
|
||
{
|
||
"id": int(row["id"]), "nombre": row.get("nombre"), "descripcion": row.get("descripcion") or "", "url": row.get("url"),
|
||
"categoria_id": cat_id, "pais_id": pais_id, "idioma": row.get("idioma") or None, "activo": activo,
|
||
"fallos": int(row.get("fallos", 0) or 0)
|
||
}
|
||
)
|
||
n_ok += 1
|
||
cursor.execute("RELEASE SAVEPOINT restore_feed_row")
|
||
except Exception as e:
|
||
cursor.execute("ROLLBACK TO SAVEPOINT restore_feed_row")
|
||
n_err += 1
|
||
app.logger.error(f"Error procesando fila (se omite): {row} - Error: {e}")
|
||
flash(f"Restauración completada. Feeds procesados: {n_ok}. Errores: {n_err}.", "success" if n_err == 0 else "warning")
|
||
except Exception as e:
|
||
app.logger.error(f"Error al restaurar feeds desde CSV: {e}", exc_info=True)
|
||
flash(f"Ocurrió un error general al procesar el archivo: {e}", "error")
|
||
return redirect(url_for("dashboard"))
|
||
|
||
return render_template("restore_feeds.html")
|
||
|
||
@app.route("/restore_urls", methods=["GET", "POST"])
|
||
def restore_urls():
|
||
if request.method == "POST":
|
||
file = request.files.get("file")
|
||
if not file or not file.filename.endswith(".csv"):
|
||
flash("Archivo no válido. Sube un .csv.", "error")
|
||
return redirect(url_for("restore_urls"))
|
||
|
||
try:
|
||
file_stream = StringIO(file.read().decode("utf-8", errors='ignore'))
|
||
reader = csv.DictReader(file_stream)
|
||
rows = list(reader)
|
||
n_ok, n_err = 0, 0
|
||
with get_conn() as conn:
|
||
for row in rows:
|
||
with conn.cursor() as cursor:
|
||
try:
|
||
cursor.execute("SAVEPOINT restore_url_row")
|
||
cat_id = int(row["categoria_id"]) if row.get("categoria_id") and row["categoria_id"].strip() else None
|
||
pais_id = int(row["pais_id"]) if row.get("pais_id") and row["pais_id"].strip() else None
|
||
cursor.execute(
|
||
"""
|
||
INSERT INTO fuentes_url (id, nombre, url, categoria_id, pais_id, idioma)
|
||
VALUES (%(id)s, %(nombre)s, %(url)s, %(categoria_id)s, %(pais_id)s, %(idioma)s)
|
||
ON CONFLICT (id) DO UPDATE SET
|
||
nombre=EXCLUDED.nombre, url=EXCLUDED.url, categoria_id=EXCLUDED.categoria_id,
|
||
pais_id=EXCLUDED.pais_id, idioma=EXCLUDED.idioma;
|
||
""",
|
||
{
|
||
"id": int(row["id"]),
|
||
"nombre": row.get("nombre"),
|
||
"url": row.get("url"),
|
||
"categoria_id": cat_id,
|
||
"pais_id": pais_id,
|
||
"idioma": row.get("idioma") or None
|
||
}
|
||
)
|
||
n_ok += 1
|
||
cursor.execute("RELEASE SAVEPOINT restore_url_row")
|
||
except Exception as e:
|
||
cursor.execute("ROLLBACK TO SAVEPOINT restore_url_row")
|
||
n_err += 1
|
||
app.logger.error(f"Error procesando fila de fuente URL (se omite): {row} - Error: {e}")
|
||
flash(f"Restauración de Fuentes URL completada. Procesadas: {n_ok}. Errores: {n_err}.", "success" if n_err == 0 else "warning")
|
||
except Exception as e:
|
||
app.logger.error(f"Error al restaurar fuentes URL desde CSV: {e}", exc_info=True)
|
||
flash(f"Ocurrió un error general al procesar el archivo: {e}", "error")
|
||
return redirect(url_for("dashboard"))
|
||
|
||
return render_template("restore_urls.html")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
if not db_pool:
|
||
app.logger.error("La aplicación no puede arrancar sin una conexión a la base de datos.")
|
||
sys.exit(1)
|
||
app.run(host="0.0.0.0", port=8001, debug=True)
|
||
|