rss/app.py
2025-10-17 04:10:09 +02:00

931 lines
44 KiB
Python

import os
import sys
import csv
import math
from io import StringIO, BytesIO
from datetime import datetime
import logging
import atexit
import zipfile
from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from flask import Flask, render_template, request, redirect, url_for, Response, flash, make_response, abort
import psycopg2
import psycopg2.extras
import psycopg2.pool
import bleach
from feed_processor import process_single_feed
from url_processor import process_newspaper_url
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='[%(asctime)s] %(levelname)s in %(module)s: %(message)s')
app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', os.urandom(24))
DB_CONFIG = {
"host": os.environ.get("DB_HOST", "localhost"),
"port": int(os.environ.get("DB_PORT", 5432)),
"dbname": os.environ.get("DB_NAME", "rss"),
"user": os.environ.get("DB_USER", "rss"),
"password": os.environ.get("DB_PASS", "x")
}
MAX_WORKERS = int(os.environ.get("RSS_MAX_WORKERS", 20))
SINGLE_FEED_TIMEOUT = int(os.environ.get("RSS_FEED_TIMEOUT", 30))
MAX_FALLOS = int(os.environ.get("RSS_MAX_FAILURES", 5))
NEWS_PER_PAGE = int(os.environ.get("NEWS_PER_PAGE", 20))
DEFAULT_TRANSLATION_LANG = os.environ.get("DEFAULT_TRANSLATION_LANG", "es").strip().lower()
DEFAULT_LANG = os.environ.get("DEFAULT_LANG", DEFAULT_TRANSLATION_LANG).strip().lower()
WEB_TRANSLATED_DEFAULT = os.environ.get("WEB_TRANSLATED_DEFAULT", "1").strip().lower() in ("1", "true", "yes")
db_pool = None
try:
db_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=10, **DB_CONFIG)
app.logger.info("Pool de conexiones a la base de datos creado exitosamente.")
except psycopg2.OperationalError as e:
logging.error(f"FATAL: No se pudo conectar a la base de datos para crear el pool: {e}")
@contextmanager
def get_conn():
if not db_pool:
raise ConnectionError("El pool de la base de datos no está disponible.")
conn = None
try:
conn = db_pool.getconn()
yield conn
conn.commit()
except Exception as e:
if conn:
conn.rollback()
raise e
finally:
if conn:
db_pool.putconn(conn)
@atexit.register
def shutdown_hooks():
if db_pool:
db_pool.closeall()
app.logger.info("Pool de conexiones de la base de datos cerrado.")
@app.template_filter('safe_html')
def safe_html(text):
if not text:
return ""
return bleach.clean(
text,
tags={'a', 'b', 'strong', 'i', 'em', 'p', 'br', 'ul', 'ol', 'li', 'blockquote', 'h3', 'h4'},
attributes={'a': ['href', 'title', 'rel', 'target']},
strip=True
)
def _get_form_dependencies(cursor):
cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre")
categorias = cursor.fetchall()
cursor.execute("SELECT id, nombre, continente_id FROM paises ORDER BY nombre")
paises = cursor.fetchall()
return categorias, paises
def _get_lang_and_flags():
qlang = request.args.get("lang", "").strip().lower()
cookie_lang = (request.cookies.get("lang") or "").strip().lower()
lang = qlang or cookie_lang or DEFAULT_LANG or "es"
force_orig = request.args.get("orig") == "1"
use_translation = (not force_orig) and WEB_TRANSLATED_DEFAULT
return lang, use_translation, bool(qlang)
def _build_news_query(args, *, count=False, limit=None, offset=None, lang="es", use_translation=True):
select_rank_params = []
from_params = []
where_params = []
tail_params = []
conditions = []
q = args.get("q", "").strip()
cat_id = args.get("categoria_id")
cont_id = args.get("continente_id")
pais_id = args.get("pais_id")
fecha_filtro = args.get("fecha")
sql_from = """
FROM noticias n
LEFT JOIN categorias c ON n.categoria_id = c.id
LEFT JOIN paises p ON n.pais_id = p.id
LEFT JOIN continentes co ON p.continente_id = co.id
"""
if (not count) and use_translation:
sql_from += """
LEFT JOIN LATERAL (
SELECT id AS traduccion_id, titulo_trad, resumen_trad
FROM traducciones
WHERE traducciones.noticia_id = n.id
AND traducciones.lang_to = %s
AND traducciones.status = 'done'
ORDER BY id DESC
LIMIT 1
) t ON TRUE
"""
from_params.append(lang)
if q:
conditions.append("n.tsv @@ plainto_tsquery('spanish', %s)")
where_params.append(q)
if cat_id:
conditions.append("n.categoria_id = %s")
where_params.append(cat_id)
if pais_id:
conditions.append("n.pais_id = %s")
where_params.append(pais_id)
elif cont_id:
conditions.append("p.continente_id = %s")
where_params.append(cont_id)
if fecha_filtro:
try:
fecha_obj = datetime.strptime(fecha_filtro, '%Y-%m-%d')
conditions.append("n.fecha::date = %s")
where_params.append(fecha_obj.date())
except ValueError:
flash("Formato de fecha no válido. Use AAAA-MM-DD.", "error")
where_clause = " WHERE " + " AND ".join(conditions) if conditions else ""
if count:
sql_count = "SELECT COUNT(*) " + sql_from + where_clause
sql_params = from_params + where_params
return sql_count, sql_params
if use_translation:
select_cols = """
SELECT
COALESCE(t.traduccion_id, NULL) AS traduccion_id,
n.fecha,
n.titulo AS titulo_original,
n.resumen AS resumen_original,
t.titulo_trad AS titulo_traducido,
t.resumen_trad AS resumen_traducido,
COALESCE(t.titulo_trad, n.titulo) AS titulo,
COALESCE(t.resumen_trad, n.resumen) AS resumen,
n.url, n.imagen_url, n.fuente_nombre,
c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente,
(t.titulo_trad IS NOT NULL OR t.resumen_trad IS NOT NULL) AS tiene_traduccion
"""
else:
select_cols = """
SELECT
NULL::int AS traduccion_id,
n.fecha,
n.titulo AS titulo_original,
n.resumen AS resumen_original,
NULL::text AS titulo_traducido,
NULL::text AS resumen_traducido,
n.titulo AS titulo,
n.resumen AS resumen,
n.url, n.imagen_url, n.fuente_nombre,
c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente,
FALSE AS tiene_traduccion
"""
order_clause = " ORDER BY n.fecha DESC NULLS LAST"
if q:
select_cols = select_cols.replace(
"SELECT",
"SELECT ts_rank(n.tsv, plainto_tsquery('spanish', %s)) AS rank,"
)
select_rank_params.append(q)
order_clause = " ORDER BY rank DESC, n.fecha DESC NULLS LAST"
if limit is not None:
order_clause += " LIMIT %s"
tail_params.append(limit)
if offset is not None:
order_clause += " OFFSET %s"
tail_params.append(offset)
sql_page = select_cols + sql_from + where_clause + order_clause
sql_params = select_rank_params + from_params + where_params + tail_params
return sql_page, sql_params
@app.route("/")
def home():
noticias, categorias, continentes, paises = [], [], [], []
q = request.args.get("q", "").strip()
cat_id = request.args.get("categoria_id")
cont_id = request.args.get("continente_id")
pais_id = request.args.get("pais_id")
fecha_filtro = request.args.get("fecha")
lang, use_tr, set_cookie = _get_lang_and_flags()
page = request.args.get("page", default=1, type=int)
per_page = request.args.get("per_page", default=NEWS_PER_PAGE, type=int)
if per_page is None or per_page <= 0:
per_page = NEWS_PER_PAGE
per_page = 100 if per_page > 100 else (10 if per_page < 10 else per_page)
if page is None or page <= 0:
page = 1
offset = (page - 1) * per_page
total_results = 0
total_pages = 0
tags_por_trad = {}
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT id, nombre FROM categorias ORDER BY nombre")
categorias = cursor.fetchall()
cursor.execute("SELECT id, nombre FROM continentes ORDER BY nombre")
continentes = cursor.fetchall()
cursor.execute("SELECT id, nombre, continente_id FROM paises ORDER BY nombre")
paises = cursor.fetchall()
sql_count, params_count = _build_news_query(
request.args, count=True, lang=lang, use_translation=use_tr
)
cursor.execute(sql_count, tuple(params_count))
total_results = cursor.fetchone()[0] or 0
total_pages = math.ceil(total_results / per_page) if total_results else 0
sql_page, params_page = _build_news_query(
request.args,
count=False,
limit=per_page,
offset=offset,
lang=lang,
use_translation=use_tr
)
cursor.execute(sql_page, tuple(params_page))
noticias = cursor.fetchall()
# Cargar tags por traducción (si aplica)
tr_ids = [row['traduccion_id'] for row in noticias if row.get('traduccion_id')]
if tr_ids:
cursor.execute("""
SELECT tn.traduccion_id, tg.valor, tg.tipo
FROM tags_noticia tn
JOIN tags tg ON tg.id = tn.tag_id
WHERE tn.traduccion_id = ANY(%s)
ORDER BY tg.tipo, tg.valor
""", (tr_ids,))
for trid, valor, tipo in cursor.fetchall():
tags_por_trad.setdefault(trid, []).append((valor, tipo))
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al leer noticias: {db_err}", exc_info=True)
flash("Error de base de datos al cargar las noticias.", "error")
ctx = dict(
noticias=noticias, categorias=categorias, continentes=continentes, paises=paises,
cat_id=int(cat_id) if cat_id else None, cont_id=int(cont_id) if cont_id else None,
pais_id=int(pais_id) if pais_id else None, fecha_filtro=fecha_filtro, q=q,
page=page, per_page=per_page, total_pages=total_pages, total_results=total_results,
lang=lang, use_tr=use_tr,
tags_por_trad=tags_por_trad
)
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
resp = make_response(render_template('_noticias_list.html', **ctx))
if set_cookie:
resp.set_cookie("lang", lang, max_age=60*60*24*365)
return resp
html = render_template("noticias.html", **ctx)
resp = make_response(html)
if set_cookie:
resp.set_cookie("lang", lang, max_age=60*60*24*365)
return resp
@app.get("/noticia/<int:tr_id>")
def noticia(tr_id):
with get_conn() as conn, conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(
"""
SELECT
t.id,
n.id AS noticia_id,
n.fecha,
n.titulo AS titulo_original,
n.resumen AS cuerpo_original,
t.titulo_trad AS titulo_traducido,
t.resumen_trad AS cuerpo_traducido,
n.url AS fuente_url,
n.fuente_nombre,
p.nombre AS pais,
co.nombre AS continente,
c.nombre AS categoria,
t.lang_to,
t.status
FROM traducciones t
JOIN noticias n ON n.id = t.noticia_id
LEFT JOIN paises p ON n.pais_id = p.id
LEFT JOIN continentes co ON p.continente_id = co.id
LEFT JOIN categorias c ON n.categoria_id = c.id
WHERE t.id = %s
""",
(tr_id,)
)
row = cur.fetchone()
if not row:
abort(404)
return render_template("noticia.html", r=row)
@app.route("/dashboard")
def dashboard():
stats = {'feeds_totales': 0, 'noticias_totales': 0, 'feeds_caidos': 0}
top_tags = []
try:
with get_conn() as conn:
# Usamos DictCursor aquí para poder usar t.valor / t.tipo / t.apariciones en Jinja
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT COUNT(*) FROM feeds")
stats['feeds_totales'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM noticias")
stats['noticias_totales'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM feeds WHERE activo = FALSE")
stats['feeds_caidos'] = cursor.fetchone()[0]
cursor.execute("""
SELECT valor, tipo, apariciones
FROM v_tag_counts_24h
ORDER BY apariciones DESC, valor
LIMIT 20
""")
top_tags = cursor.fetchall()
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al calcular estadísticas: {db_err}")
flash("Error al conectar con la base de datos.", "error")
return render_template("dashboard.html", stats=stats, top_tags=top_tags)
@app.route("/feeds/manage")
def manage_feeds():
page = request.args.get('page', 1, type=int)
per_page = 20
offset = (page - 1) * per_page
feeds_list, total_feeds = [], 0
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT COUNT(*) FROM feeds")
total_feeds = cursor.fetchone()[0]
cursor.execute("""
SELECT f.id, f.nombre, f.url, c.nombre as categoria, p.nombre as pais, f.idioma, f.activo, f.fallos
FROM feeds f
LEFT JOIN categorias c ON f.categoria_id = c.id
LEFT JOIN paises p ON f.pais_id = p.id
ORDER BY f.nombre LIMIT %s OFFSET %s
""", (per_page, offset))
feeds_list = cursor.fetchall()
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al obtener lista de feeds: {db_err}")
flash("Error al obtener la lista de feeds.", "error")
total_pages = math.ceil(total_feeds / per_page) if total_feeds > 0 else 0
return render_template("feeds_list.html", feeds=feeds_list, page=page, total_pages=total_pages, total_feeds=total_feeds)
@app.route("/feeds/add", methods=['GET', 'POST'])
def add_feed():
if request.method == 'POST':
nombre = request.form.get("nombre")
try:
with get_conn() as conn:
with conn.cursor() as cursor:
categoria_id = int(request.form.get("categoria_id")) if request.form.get("categoria_id") else None
pais_id = int(request.form.get("pais_id")) if request.form.get("pais_id") else None
cursor.execute(
"INSERT INTO feeds (nombre, descripcion, url, categoria_id, pais_id, idioma) VALUES (%s, %s, %s, %s, %s, %s)",
(nombre, request.form.get("descripcion"), request.form.get("url"), categoria_id, pais_id, (request.form.get("idioma", "").strip() or None))
)
flash(f"Feed '{nombre}' añadido correctamente.", "success")
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al agregar feed: {db_err}", exc_info=True)
flash(f"Error al añadir el feed: {db_err}", "error")
return redirect(url_for("manage_feeds"))
categorias, paises = [], []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
categorias, paises = _get_form_dependencies(cursor)
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al cargar formulario: {db_err}")
flash("No se pudieron cargar las categorías o países.", "error")
return render_template("add_feed.html", categorias=categorias, paises=paises)
@app.route("/feeds/edit/<int:feed_id>", methods=["GET", "POST"])
def edit_feed(feed_id):
if request.method == "POST":
try:
with get_conn() as conn:
with conn.cursor() as cursor:
categoria_id = int(request.form.get("categoria_id")) if request.form.get("categoria_id") else None
pais_id = int(request.form.get("pais_id")) if request.form.get("pais_id") else None
idioma = request.form.get("idioma", "").strip() or None
activo = "activo" in request.form
cursor.execute(
"UPDATE feeds SET nombre=%s, descripcion=%s, url=%s, categoria_id=%s, pais_id=%s, idioma=%s, activo=%s WHERE id=%s",
(request.form.get("nombre"), request.form.get("descripcion"), request.form.get("url"), categoria_id, pais_id, idioma, activo, feed_id)
)
flash("Feed actualizado correctamente.", "success")
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al actualizar feed: {db_err}", exc_info=True)
flash(f"Error al actualizar el feed: {db_err}", "error")
return redirect(url_for("manage_feeds"))
feed, categorias, paises = None, [], []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT * FROM feeds WHERE id = %s", (feed_id,))
feed = cursor.fetchone()
if not feed:
flash("No se encontró el feed solicitado.", "error")
return redirect(url_for("manage_feeds"))
categorias, paises = _get_form_dependencies(cursor)
except psycopg2.Error as db_err:
flash("Error al cargar el feed para editar.", "error")
app.logger.error(f"Error al cargar feed {feed_id} para editar: {db_err}")
return redirect(url_for("manage_feeds"))
return render_template("edit_feed.html", feed=feed, categorias=categorias, paises=paises)
@app.route("/feeds/delete/<int:feed_id>")
def delete_feed(feed_id):
try:
with get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM feeds WHERE id=%s", (feed_id,))
flash("Feed eliminado correctamente.", "success")
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al eliminar feed: {db_err}", exc_info=True)
flash(f"Error al eliminar el feed: {db_err}", "error")
return redirect(url_for("manage_feeds"))
@app.route("/feeds/reactivar/<int:feed_id>")
def reactivar_feed(feed_id):
try:
with get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("UPDATE feeds SET activo = TRUE, fallos = 0 WHERE id = %s", (feed_id,))
flash("Feed reactivado.", "success")
except psycopg2.Error as db_err:
flash(f"Error al reactivar feed: {db_err}", "error")
return redirect(url_for("manage_feeds"))
@app.route("/urls/manage")
def manage_urls():
fuentes = []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("""
SELECT f.id, f.nombre, f.url, c.nombre as categoria, p.nombre as pais, f.idioma
FROM fuentes_url f
LEFT JOIN categorias c ON f.categoria_id = c.id
LEFT JOIN paises p ON f.pais_id = p.id
ORDER BY f.nombre
""")
fuentes = cursor.fetchall()
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al obtener lista de fuentes URL: {db_err}")
flash("Error al obtener la lista de fuentes URL.", "error")
return render_template("urls_list.html", fuentes=fuentes)
@app.route("/urls/add", methods=['GET', 'POST'])
def add_url_source():
if request.method == 'POST':
nombre = request.form.get("nombre")
try:
with get_conn() as conn:
with conn.cursor() as cursor:
categoria_id = int(request.form.get("categoria_id")) if request.form.get("categoria_id") else None
pais_id = int(request.form.get("pais_id")) if request.form.get("pais_id") else None
idioma = request.form.get("idioma", "es").strip().lower()
cursor.execute(
"INSERT INTO fuentes_url (nombre, url, categoria_id, pais_id, idioma) VALUES (%s, %s, %s, %s, %s)",
(nombre, request.form.get("url"), categoria_id, pais_id, idioma)
)
flash(f"Fuente URL '{nombre}' añadida correctamente.", "success")
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al agregar fuente URL: {db_err}", exc_info=True)
flash(f"Error al añadir la fuente URL: {db_err}", "error")
return redirect(url_for("manage_urls"))
categorias, paises = [], []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
categorias, paises = _get_form_dependencies(cursor)
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al cargar formulario: {db_err}")
flash("No se pudieron cargar las categorías o países.", "error")
return render_template("add_url_source.html", categorias=categorias, paises=paises)
@app.route("/urls/edit/<int:url_id>", methods=["GET", "POST"])
def edit_url_source(url_id):
if request.method == "POST":
try:
with get_conn() as conn:
with conn.cursor() as cursor:
categoria_id = int(request.form.get("categoria_id")) if request.form.get("categoria_id") else None
pais_id = int(request.form.get("pais_id")) if request.form.get("pais_id") else None
idioma = request.form.get("idioma", "es").strip().lower()
cursor.execute(
"UPDATE fuentes_url SET nombre=%s, url=%s, categoria_id=%s, pais_id=%s, idioma=%s WHERE id=%s",
(request.form.get("nombre"), request.form.get("url"), categoria_id, pais_id, idioma, url_id)
)
flash("Fuente URL actualizada correctamente.", "success")
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al actualizar fuente URL: {db_err}", exc_info=True)
flash(f"Error al actualizar la fuente URL: {db_err}", "error")
return redirect(url_for("manage_urls"))
fuente, categorias, paises = None, [], []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT * FROM fuentes_url WHERE id = %s", (url_id,))
fuente = cursor.fetchone()
if not fuente:
flash("No se encontró la fuente URL solicitada.", "error")
return redirect(url_for("manage_urls"))
categorias, paises = _get_form_dependencies(cursor)
except psycopg2.Error as db_err:
flash("Error al cargar la fuente URL para editar.", "error")
app.logger.error(f"Error al cargar fuente URL {url_id} para editar: {db_err}")
return redirect(url_for("manage_urls"))
return render_template("edit_url_source.html", fuente=fuente, categorias=categorias, paises=paises)
@app.route("/urls/delete/<int:url_id>")
def delete_url_source(url_id):
try:
with get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM fuentes_url WHERE id=%s", (url_id,))
flash("Fuente URL eliminada correctamente.", "success")
except psycopg2.Error as db_err:
flash(f"Error al eliminar la fuente URL: {db_err}", "error")
return redirect(url_for("manage_urls"))
def fetch_and_store_all():
logging.info("--- INICIANDO CICLO DE CAPTURA GLOBAL (RSS y URL) ---")
todas_las_noticias = []
feeds_fallidos = []
feeds_exitosos = []
feeds_para_actualizar_headers = []
logging.info("=> Parte 1: Procesando Feeds RSS...")
feeds_to_process = []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT id, nombre, url, categoria_id, pais_id, last_etag, last_modified FROM feeds WHERE activo = TRUE")
feeds_to_process = cursor.fetchall()
logging.info(f"Encontrados {len(feeds_to_process)} feeds RSS activos para procesar.")
except psycopg2.Error as db_err:
logging.error(f"Error de BD al obtener feeds RSS: {db_err}")
return
if feeds_to_process:
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_feed = {executor.submit(process_single_feed, dict(feed)): feed for feed in feeds_to_process}
for future in tqdm(as_completed(future_to_feed), total=len(feeds_to_process), desc="Procesando Fuentes RSS"):
original_feed_data = future_to_feed[future]
feed_id = original_feed_data['id']
try:
_, noticias_encontradas, new_etag, new_modified, success = future.result(timeout=SINGLE_FEED_TIMEOUT)
if success:
feeds_exitosos.append(feed_id)
if noticias_encontradas:
todas_las_noticias.extend(noticias_encontradas)
if (new_etag and new_etag != original_feed_data.get('last_etag')) or \
(new_modified and new_modified != original_feed_data.get('last_modified')):
feeds_para_actualizar_headers.append({'id': feed_id, 'etag': new_etag, 'modified': new_modified})
else:
feeds_fallidos.append(feed_id)
except Exception as exc:
logging.error(f"Excepción en feed {original_feed_data['url']} (ID: {feed_id}): {exc}")
feeds_fallidos.append(feed_id)
noticias_desde_rss_count = len(todas_las_noticias)
logging.info(f"=> Parte 1 Finalizada. Noticias desde RSS: {noticias_desde_rss_count}. Éxitos: {len(feeds_exitosos)}. Fallos: {len(feeds_fallidos)}.")
logging.info("=> Parte 2: Procesando Fuentes URL...")
urls_to_process = []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT * FROM fuentes_url")
urls_to_process = cursor.fetchall()
logging.info(f"Encontradas {len(urls_to_process)} fuentes URL para scrapear.")
except Exception as e:
logging.error(f"Error de BD al obtener fuentes URL: {e}")
if urls_to_process:
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_url = {
executor.submit(
process_newspaper_url,
source['nombre'], source['url'], source['categoria_id'],
source['pais_id'], source['idioma']
): source for source in urls_to_process
}
for future in tqdm(as_completed(future_to_url), total=len(urls_to_process), desc="Procesando Fuentes URL"):
source = future_to_url[future]
try:
noticias_encontradas, _ = future.result()
if noticias_encontradas:
todas_las_noticias.extend(noticias_encontradas)
except Exception as exc:
logging.error(f"Fallo al procesar la fuente URL {source['nombre']}: {exc}")
noticias_desde_urls_count = len(todas_las_noticias) - noticias_desde_rss_count
logging.info(f"=> Parte 2 Finalizada. Noticias encontradas desde URLs: {noticias_desde_urls_count}.")
logging.info("=> Parte 3: Actualizando la base de datos...")
if not any([todas_las_noticias, feeds_fallidos, feeds_exitosos, feeds_para_actualizar_headers]):
logging.info("No se encontraron nuevas noticias ni cambios en los feeds. Nada que actualizar.")
logging.info("--- CICLO DE CAPTURA GLOBAL FINALIZADO ---")
return
try:
with get_conn() as conn:
with conn.cursor() as cursor:
if feeds_fallidos:
cursor.execute("UPDATE feeds SET fallos = fallos + 1 WHERE id IN %s", (tuple(feeds_fallidos),))
cursor.execute("UPDATE feeds SET activo = FALSE WHERE fallos >= %s AND id IN %s", (MAX_FALLOS, tuple(feeds_fallidos)))
logging.info(f"Incrementado contador de fallos para {len(feeds_fallidos)} feeds.")
if feeds_exitosos:
cursor.execute("UPDATE feeds SET fallos = 0 WHERE id IN %s", (tuple(feeds_exitosos),))
logging.info(f"Reseteado contador de fallos para {len(feeds_exitosos)} feeds.")
if feeds_para_actualizar_headers:
psycopg2.extras.execute_values(
cursor,
"UPDATE feeds SET last_etag = data.etag, last_modified = data.modified FROM (VALUES %s) AS data(id, etag, modified) WHERE feeds.id = data.id",
[(f['id'], f['etag'], f['modified']) for f in feeds_para_actualizar_headers]
)
logging.info(f"Actualizados headers para {len(feeds_para_actualizar_headers)} feeds.")
if todas_las_noticias:
logging.info(f"Intentando insertar/ignorar {len(todas_las_noticias)} noticias en total.")
insert_query = """
INSERT INTO noticias (id, titulo, resumen, url, fecha, imagen_url, fuente_nombre, categoria_id, pais_id)
VALUES %s
ON CONFLICT (url) DO NOTHING;
"""
psycopg2.extras.execute_values(cursor, insert_query, todas_las_noticias, page_size=200)
logging.info(f"Inserción de noticias finalizada. {cursor.rowcount} filas podrían haber sido afectadas.")
logging.info("=> Parte 3 Finalizada. Base de datos actualizada correctamente.")
except Exception as e:
logging.error(f"Error de BD en la actualización masiva final: {e}", exc_info=True)
logging.info("--- CICLO DE CAPTURA GLOBAL FINALIZADO ---")
@app.route("/backup_feeds")
def backup_feeds():
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("""
SELECT f.id, f.nombre, f.descripcion, f.url, f.categoria_id, c.nombre AS categoria,
f.pais_id, p.nombre AS pais, f.idioma, f.activo, f.fallos
FROM feeds f
LEFT JOIN categorias c ON f.categoria_id = c.id
LEFT JOIN paises p ON f.pais_id = p.id
ORDER BY f.id
""")
feeds_ = cursor.fetchall()
if not feeds_:
flash("No hay feeds para exportar.", "warning")
return redirect(url_for("dashboard"))
fieldnames = list(feeds_[0].keys())
output = StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows([dict(feed) for feed in feeds_])
return Response(output.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=feeds_backup.csv"})
except Exception as e:
app.logger.error(f"[ERROR] Al hacer backup de feeds: {e}", exc_info=True)
flash(f"Error interno al generar el backup: {e}", "error")
return redirect(url_for("dashboard"))
@app.route("/backup_urls")
def backup_urls():
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("""
SELECT f.id, f.nombre, f.url, f.categoria_id, c.nombre AS categoria, f.pais_id, p.nombre AS pais, f.idioma
FROM fuentes_url f
LEFT JOIN categorias c ON f.categoria_id = c.id
LEFT JOIN paises p ON f.pais_id = p.id
ORDER BY f.id
""")
fuentes = cursor.fetchall()
if not fuentes:
flash("No hay fuentes URL para exportar.", "warning")
return redirect(url_for("dashboard"))
fieldnames = list(fuentes[0].keys())
output = StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows([dict(fuente) for fuente in fuentes])
return Response(
output.getvalue(),
mimetype="text/csv",
headers={"Content-Disposition": "attachment;filename=fuentes_url_backup.csv"}
)
except Exception as e:
app.logger.error(f"[ERROR] Al hacer backup de fuentes URL: {e}", exc_info=True)
flash(f"Error interno al generar el backup de fuentes URL: {e}", "error")
return redirect(url_for("dashboard"))
@app.route("/backup_noticias")
def backup_noticias():
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("""
SELECT n.id, n.titulo, n.resumen, n.url, n.fecha, n.imagen_url, n.fuente_nombre,
c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente
FROM noticias n
LEFT JOIN categorias c ON n.categoria_id = c.id
LEFT JOIN paises p ON n.pais_id = p.id
LEFT JOIN continentes co ON p.continente_id = co.id
ORDER BY n.fecha DESC
""")
noticias = cursor.fetchall()
if not noticias:
flash("No hay noticias para exportar.", "warning")
return redirect(url_for("dashboard"))
fieldnames_noticias = list(noticias[0].keys())
output = StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames_noticias)
writer.writeheader()
writer.writerows([dict(noticia) for noticia in noticias])
return Response(output.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=noticias_backup.csv"})
except Exception as e:
app.logger.error(f"[ERROR] Al hacer backup de noticias: {e}", exc_info=True)
flash(f"Error interno al generar el backup: {e}", "error")
return redirect(url_for("dashboard"))
@app.route("/backup_completo")
def backup_completo():
try:
memory_buffer = BytesIO()
with zipfile.ZipFile(memory_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT * FROM feeds ORDER BY id")
feeds_data = cursor.fetchall()
if feeds_data:
output_feeds = StringIO()
writer_feeds = csv.DictWriter(output_feeds, fieldnames=list(feeds_data[0].keys()))
writer_feeds.writeheader()
writer_feeds.writerows([dict(f) for f in feeds_data])
zipf.writestr("feeds.csv", output_feeds.getvalue())
cursor.execute("SELECT * FROM fuentes_url ORDER BY id")
fuentes_data = cursor.fetchall()
if fuentes_data:
output_fuentes = StringIO()
writer_fuentes = csv.DictWriter(output_fuentes, fieldnames=list(fuentes_data[0].keys()))
writer_fuentes.writeheader()
writer_fuentes.writerows([dict(f) for f in fuentes_data])
zipf.writestr("fuentes_url.csv", output_fuentes.getvalue())
cursor.execute("SELECT * FROM noticias ORDER BY fecha DESC")
noticias_data = cursor.fetchall()
if noticias_data:
output_noticias = StringIO()
writer_noticias = csv.DictWriter(output_noticias, fieldnames=list(noticias_data[0].keys()))
writer_noticias.writeheader()
writer_noticias.writerows([dict(n) for n in noticias_data])
zipf.writestr("noticias.csv", output_noticias.getvalue())
memory_buffer.seek(0)
return Response(memory_buffer, mimetype="application/zip", headers={"Content-Disposition": "attachment;filename=rss_backup_completo.zip"})
except Exception as e:
app.logger.error(f"[ERROR] Al hacer backup completo: {e}", exc_info=True)
flash(f"Error interno al generar el backup: {e}", "error")
return redirect(url_for("dashboard"))
@app.route("/restore_feeds", methods=["GET", "POST"])
def restore_feeds():
if request.method == "POST":
file = request.files.get("file")
if not file or not file.filename.endswith(".csv"):
flash("Archivo no válido. Sube un .csv.", "error")
return redirect(url_for("restore_feeds"))
try:
file_stream = StringIO(file.read().decode("utf-8", errors='ignore'))
reader = csv.DictReader(file_stream)
rows = list(reader)
n_ok, n_err = 0, 0
with get_conn() as conn:
for row in rows:
with conn.cursor() as cursor:
try:
cursor.execute("SAVEPOINT restore_feed_row")
activo = str(row.get("activo", "")).strip().lower() in ["1", "true", "t", "yes", "on"]
cat_id = int(row["categoria_id"]) if row.get("categoria_id") and row["categoria_id"].strip() else None
pais_id = int(row["pais_id"]) if row.get("pais_id") and row["pais_id"].strip() else None
cursor.execute(
"""
INSERT INTO feeds (id, nombre, descripcion, url, categoria_id, pais_id, idioma, activo, fallos)
VALUES (%(id)s, %(nombre)s, %(descripcion)s, %(url)s, %(categoria_id)s, %(pais_id)s, %(idioma)s, %(activo)s, %(fallos)s)
ON CONFLICT (id) DO UPDATE SET
nombre=EXCLUDED.nombre, descripcion=EXCLUDED.descripcion, url=EXCLUDED.url, categoria_id=EXCLUDED.categoria_id,
pais_id=EXCLUDED.pais_id, idioma=EXCLUDED.idioma, activo=EXCLUDED.activo, fallos=EXCLUDED.fallos;
""",
{
"id": int(row["id"]), "nombre": row.get("nombre"), "descripcion": row.get("descripcion") or "", "url": row.get("url"),
"categoria_id": cat_id, "pais_id": pais_id, "idioma": row.get("idioma") or None, "activo": activo,
"fallos": int(row.get("fallos", 0) or 0)
}
)
n_ok += 1
cursor.execute("RELEASE SAVEPOINT restore_feed_row")
except Exception as e:
cursor.execute("ROLLBACK TO SAVEPOINT restore_feed_row")
n_err += 1
app.logger.error(f"Error procesando fila (se omite): {row} - Error: {e}")
flash(f"Restauración completada. Feeds procesados: {n_ok}. Errores: {n_err}.", "success" if n_err == 0 else "warning")
except Exception as e:
app.logger.error(f"Error al restaurar feeds desde CSV: {e}", exc_info=True)
flash(f"Ocurrió un error general al procesar el archivo: {e}", "error")
return redirect(url_for("dashboard"))
return render_template("restore_feeds.html")
@app.route("/restore_urls", methods=["GET", "POST"])
def restore_urls():
if request.method == "POST":
file = request.files.get("file")
if not file or not file.filename.endswith(".csv"):
flash("Archivo no válido. Sube un .csv.", "error")
return redirect(url_for("restore_urls"))
try:
file_stream = StringIO(file.read().decode("utf-8", errors='ignore'))
reader = csv.DictReader(file_stream)
rows = list(reader)
n_ok, n_err = 0, 0
with get_conn() as conn:
for row in rows:
with conn.cursor() as cursor:
try:
cursor.execute("SAVEPOINT restore_url_row")
cat_id = int(row["categoria_id"]) if row.get("categoria_id") and row["categoria_id"].strip() else None
pais_id = int(row["pais_id"]) if row.get("pais_id") and row["pais_id"].strip() else None
cursor.execute(
"""
INSERT INTO fuentes_url (id, nombre, url, categoria_id, pais_id, idioma)
VALUES (%(id)s, %(nombre)s, %(url)s, %(categoria_id)s, %(pais_id)s, %(idioma)s)
ON CONFLICT (id) DO UPDATE SET
nombre=EXCLUDED.nombre, url=EXCLUDED.url, categoria_id=EXCLUDED.categoria_id,
pais_id=EXCLUDED.pais_id, idioma=EXCLUDED.idioma;
""",
{
"id": int(row["id"]),
"nombre": row.get("nombre"),
"url": row.get("url"),
"categoria_id": cat_id,
"pais_id": pais_id,
"idioma": row.get("idioma") or None
}
)
n_ok += 1
cursor.execute("RELEASE SAVEPOINT restore_url_row")
except Exception as e:
cursor.execute("ROLLBACK TO SAVEPOINT restore_url_row")
n_err += 1
app.logger.error(f"Error procesando fila de fuente URL (se omite): {row} - Error: {e}")
flash(f"Restauración de Fuentes URL completada. Procesadas: {n_ok}. Errores: {n_err}.", "success" if n_err == 0 else "warning")
except Exception as e:
app.logger.error(f"Error al restaurar fuentes URL desde CSV: {e}", exc_info=True)
flash(f"Ocurrió un error general al procesar el archivo: {e}", "error")
return redirect(url_for("dashboard"))
return render_template("restore_urls.html")
if __name__ == "__main__":
if not db_pool:
app.logger.error("La aplicación no puede arrancar sin una conexión a la base de datos.")
sys.exit(1)
app.run(host="0.0.0.0", port=8001, debug=True)