Actualización del 2025-06-18 a las 17:08:45

This commit is contained in:
jlimolina 2025-06-18 17:08:45 +02:00
parent eb72ec9e56
commit 78c01fd61b
7 changed files with 368 additions and 239 deletions

391
app.py
View file

@ -93,7 +93,6 @@ def home():
paises = cursor.fetchall()
sql_params, conditions = [], []
# --- CORRECCIÓN: SE AÑADE 'fuente_nombre' AL SELECT ---
sql_base = "SELECT n.fecha, n.titulo, n.resumen, n.url, n.imagen_url, n.fuente_nombre, c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente FROM noticias n LEFT JOIN categorias c ON n.categoria_id = c.id LEFT JOIN paises p ON n.pais_id = p.id LEFT JOIN continentes co ON p.continente_id = co.id"
if q:
@ -352,69 +351,131 @@ def delete_url_source(url_id):
flash(f"Error al eliminar la fuente URL: {db_err}", "error")
return redirect(url_for("manage_urls"))
# --- PROCESAMIENTO DE URLS ---
@app.route("/scrape_url", methods=['GET', 'POST'])
def scrape_url():
if request.method == 'POST':
source_id = request.form.get("source_id")
if not source_id:
flash("Debes seleccionar una fuente para procesar.", "error")
return redirect(url_for('scrape_url'))
# --- TAREA DE FONDO (CORREGIDA Y REFACTORIZADA) ---
source = None
def fetch_and_store_all():
"""
Tarea de fondo única y cohesiva que recolecta noticias tanto de Feeds RSS como de Fuentes URL,
y luego actualiza la base de datos en una sola transacción.
"""
with app.app_context():
logging.info("--- INICIANDO CICLO DE CAPTURA GLOBAL (RSS y URL) ---")
todas_las_noticias = []
feeds_fallidos = []
feeds_exitosos = []
feeds_para_actualizar_headers = []
# --- 1. PROCESAR FEEDS RSS ---
logging.info("=> Parte 1: Procesando Feeds RSS...")
feeds_to_process = []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT * FROM fuentes_url WHERE id = %s", (source_id,))
source = cursor.fetchone()
cursor.execute("SELECT id, nombre, url, categoria_id, pais_id, last_etag, last_modified FROM feeds WHERE activo = TRUE")
feeds_to_process = cursor.fetchall()
logging.info(f"Encontrados {len(feeds_to_process)} feeds RSS activos para procesar.")
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al buscar fuente URL: {db_err}", exc_info=True)
flash("Error de base de datos al buscar la fuente.", "error")
return redirect(url_for('scrape_url'))
logging.error(f"Error de BD al obtener feeds RSS: {db_err}")
return
if not source:
flash("La fuente seleccionada no existe.", "error")
return redirect(url_for('scrape_url'))
if feeds_to_process:
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_feed = {executor.submit(process_single_feed, dict(feed)): feed for feed in feeds_to_process}
for future in tqdm(as_completed(future_to_feed), total=len(feeds_to_process), desc="Procesando Feeds RSS"):
original_feed_data = future_to_feed[future]
feed_id = original_feed_data['id']
try:
_, noticias_encontradas, new_etag, new_modified, success = future.result(timeout=SINGLE_FEED_TIMEOUT)
if success:
feeds_exitosos.append(feed_id)
if noticias_encontradas:
todas_las_noticias.extend(noticias_encontradas)
if (new_etag and new_etag != original_feed_data.get('last_etag')) or \
(new_modified and new_modified != original_feed_data.get('last_modified')):
feeds_para_actualizar_headers.append({'id': feed_id, 'etag': new_etag, 'modified': new_modified})
else:
feeds_fallidos.append(feed_id)
except Exception as exc:
logging.error(f"Excepción en feed {original_feed_data['url']} (ID: {feed_id}): {exc}")
feeds_fallidos.append(feed_id)
noticias_desde_rss_count = len(todas_las_noticias)
logging.info(f"=> Parte 1 Finalizada. Noticias desde RSS: {noticias_desde_rss_count}. Éxitos: {len(feeds_exitosos)}. Fallos: {len(feeds_fallidos)}.")
lista_noticias, message = process_newspaper_url(source['nombre'], source['url'], source['categoria_id'], source['pais_id'], source['idioma'])
# --- 2. PROCESAR FUENTES URL ---
logging.info("=> Parte 2: Procesando Fuentes URL...")
urls_to_process = []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT * FROM fuentes_url")
urls_to_process = cursor.fetchall()
logging.info(f"Encontradas {len(urls_to_process)} fuentes URL para scrapear.")
except Exception as e:
logging.error(f"Error de BD al obtener fuentes URL: {e}")
if lista_noticias:
try:
with get_conn() as conn:
with conn.cursor() as cursor:
if urls_to_process:
for source in tqdm(urls_to_process, desc="Procesando Fuentes URL"):
try:
noticias_encontradas, _ = process_newspaper_url(
source['nombre'], source['url'], source['categoria_id'],
source['pais_id'], source['idioma']
)
if noticias_encontradas:
todas_las_noticias.extend(noticias_encontradas)
except Exception as e:
logging.error(f"Fallo al procesar la fuente URL {source['nombre']}: {e}")
noticias_desde_urls_count = len(todas_las_noticias) - noticias_desde_rss_count
logging.info(f"=> Parte 2 Finalizada. Noticias encontradas desde URLs: {noticias_desde_urls_count}.")
# --- 3. ACTUALIZAR BD ---
logging.info("=> Parte 3: Actualizando la base de datos...")
if not any([todas_las_noticias, feeds_fallidos, feeds_exitosos, feeds_para_actualizar_headers]):
logging.info("No se encontraron nuevas noticias ni cambios en los feeds. Nada que actualizar.")
logging.info("--- CICLO DE CAPTURA GLOBAL FINALIZADO ---")
return
try:
with get_conn() as conn:
with conn.cursor() as cursor:
if feeds_fallidos:
cursor.execute("UPDATE feeds SET fallos = fallos + 1 WHERE id IN %s", (tuple(feeds_fallidos),))
cursor.execute("UPDATE feeds SET activo = FALSE WHERE fallos >= %s AND id IN %s", (MAX_FALLOS, tuple(feeds_fallidos)))
logging.info(f"Incrementado contador de fallos para {len(feeds_fallidos)} feeds.")
if feeds_exitosos:
cursor.execute("UPDATE feeds SET fallos = 0 WHERE id IN %s", (tuple(feeds_exitosos),))
logging.info(f"Reseteado contador de fallos para {len(feeds_exitosos)} feeds.")
if feeds_para_actualizar_headers:
psycopg2.extras.execute_values(
cursor,
"UPDATE feeds SET last_etag = data.etag, last_modified = data.modified FROM (VALUES %s) AS data(id, etag, modified) WHERE feeds.id = data.id",
[(f['id'], f['etag'], f['modified']) for f in feeds_para_actualizar_headers]
)
logging.info(f"Actualizados headers para {len(feeds_para_actualizar_headers)} feeds.")
if todas_las_noticias:
logging.info(f"Intentando insertar/ignorar {len(todas_las_noticias)} noticias en total.")
insert_query = """
INSERT INTO noticias (id, titulo, resumen, url, fecha, imagen_url, fuente_nombre, categoria_id, pais_id)
VALUES %s
ON CONFLICT (url) DO UPDATE SET
titulo = EXCLUDED.titulo,
resumen = EXCLUDED.resumen,
fecha = EXCLUDED.fecha,
imagen_url = EXCLUDED.imagen_url;
ON CONFLICT (url) DO NOTHING;
"""
psycopg2.extras.execute_values(cursor, insert_query, lista_noticias)
flash(f"Se encontraron y guardaron {len(lista_noticias)} noticias desde '{source['nombre']}'.", "success")
return redirect(url_for("home"))
except psycopg2.Error as db_err:
app.logger.error(f"[DB ERROR] Al insertar noticias scrapeadas: {db_err}", exc_info=True)
flash(f"Error de base de datos al guardar las noticias: {db_err}", "error")
else:
flash(message, "warning")
return redirect(url_for('scrape_url'))
psycopg2.extras.execute_values(cursor, insert_query, todas_las_noticias, page_size=200)
logging.info(f"Inserción de noticias finalizada. {cursor.rowcount} filas podrían haber sido afectadas.")
logging.info("=> Parte 3 Finalizada. Base de datos actualizada correctamente.")
except Exception as e:
logging.error(f"Error de BD en la actualización masiva final: {e}", exc_info=True)
fuentes = []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT id, nombre FROM fuentes_url ORDER BY nombre")
fuentes = cursor.fetchall()
except psycopg2.Error as db_err:
flash("Error al cargar las fuentes de URL.", "error")
logging.info("--- CICLO DE CAPTURA GLOBAL FINALIZADO ---")
return render_template("scrape_url.html", fuentes=fuentes)
# --- BACKUP Y RESTORE ---
# --- SECCIÓN DE BACKUPS Y RESTAURACIÓN ---
@app.route("/backup_feeds")
def backup_feeds():
try:
@ -425,7 +486,7 @@ def backup_feeds():
if not feeds_:
flash("No hay feeds para exportar.", "warning")
return redirect(url_for("dashboard"))
fieldnames = list(feeds_[0].keys())
output = StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames)
@ -437,6 +498,40 @@ def backup_feeds():
flash(f"Error interno al generar el backup: {e}", "error")
return redirect(url_for("dashboard"))
@app.route("/backup_urls")
def backup_urls():
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("""
SELECT f.id, f.nombre, f.url, f.categoria_id, c.nombre AS categoria, f.pais_id, p.nombre AS pais, f.idioma
FROM fuentes_url f
LEFT JOIN categorias c ON f.categoria_id = c.id
LEFT JOIN paises p ON f.pais_id = p.id
ORDER BY f.id
""")
fuentes = cursor.fetchall()
if not fuentes:
flash("No hay fuentes URL para exportar.", "warning")
return redirect(url_for("dashboard"))
fieldnames = list(fuentes[0].keys())
output = StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows([dict(fuente) for fuente in fuentes])
return Response(
output.getvalue(),
mimetype="text/csv",
headers={"Content-Disposition": "attachment;filename=fuentes_url_backup.csv"}
)
except Exception as e:
app.logger.error(f"[ERROR] Al hacer backup de fuentes URL: {e}", exc_info=True)
flash(f"Error interno al generar el backup de fuentes URL: {e}", "error")
return redirect(url_for("dashboard"))
@app.route("/backup_noticias")
def backup_noticias():
try:
@ -447,7 +542,7 @@ def backup_noticias():
if not noticias:
flash("No hay noticias para exportar.", "warning")
return redirect(url_for("dashboard"))
fieldnames_noticias = list(noticias[0].keys())
output = StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames_noticias)
@ -466,25 +561,33 @@ def backup_completo():
with zipfile.ZipFile(memory_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
cursor.execute("SELECT f.id, f.nombre, f.descripcion, f.url, f.categoria_id, c.nombre AS categoria, f.pais_id, p.nombre AS pais, f.idioma, f.activo, f.fallos FROM feeds f LEFT JOIN categorias c ON f.categoria_id = c.id LEFT JOIN paises p ON f.pais_id = p.id ORDER BY f.id")
cursor.execute("SELECT * FROM feeds ORDER BY id")
feeds_data = cursor.fetchall()
if feeds_data:
fieldnames_feeds = list(feeds_data[0].keys())
output = StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames_feeds)
writer.writeheader()
writer.writerows([dict(f) for f in feeds_data])
zipf.writestr("feeds.csv", output.getvalue())
output_feeds = StringIO()
writer_feeds = csv.DictWriter(output_feeds, fieldnames=list(feeds_data[0].keys()))
writer_feeds.writeheader()
writer_feeds.writerows([dict(f) for f in feeds_data])
zipf.writestr("feeds.csv", output_feeds.getvalue())
cursor.execute("SELECT n.id, n.titulo, n.resumen, n.url, n.fecha, n.imagen_url, n.fuente_nombre, c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente FROM noticias n LEFT JOIN categorias c ON n.categoria_id = c.id LEFT JOIN paises p ON n.pais_id = p.id LEFT JOIN continentes co ON p.continente_id = co.id ORDER BY n.fecha DESC")
cursor.execute("SELECT * FROM fuentes_url ORDER BY id")
fuentes_data = cursor.fetchall()
if fuentes_data:
output_fuentes = StringIO()
writer_fuentes = csv.DictWriter(output_fuentes, fieldnames=list(fuentes_data[0].keys()))
writer_fuentes.writeheader()
writer_fuentes.writerows([dict(f) for f in fuentes_data])
zipf.writestr("fuentes_url.csv", output_fuentes.getvalue())
cursor.execute("SELECT * FROM noticias ORDER BY fecha DESC")
noticias_data = cursor.fetchall()
if noticias_data:
fieldnames_noticias = list(noticias_data[0].keys())
output = StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames_noticias)
writer.writeheader()
writer.writerows([dict(n) for n in noticias_data])
zipf.writestr("noticias.csv", output.getvalue())
output_noticias = StringIO()
writer_noticias = csv.DictWriter(output_noticias, fieldnames=list(noticias_data[0].keys()))
writer_noticias.writeheader()
writer_noticias.writerows([dict(n) for n in noticias_data])
zipf.writestr("noticias.csv", output_noticias.getvalue())
memory_buffer.seek(0)
return Response(memory_buffer, mimetype="application/zip", headers={"Content-Disposition": "attachment;filename=rss_backup_completo.zip"})
except Exception as e:
@ -520,9 +623,11 @@ def restore_feeds():
nombre=EXCLUDED.nombre, descripcion=EXCLUDED.descripcion, url=EXCLUDED.url, categoria_id=EXCLUDED.categoria_id,
pais_id=EXCLUDED.pais_id, idioma=EXCLUDED.idioma, activo=EXCLUDED.activo, fallos=EXCLUDED.fallos;
""",
{"id": int(row["id"]), "nombre": row.get("nombre"), "descripcion": row.get("descripcion") or "", "url": row.get("url"),
"categoria_id": cat_id, "pais_id": pais_id, "idioma": row.get("idioma") or None, "activo": activo,
"fallos": int(row.get("fallos", 0) or 0)}
{
"id": int(row["id"]), "nombre": row.get("nombre"), "descripcion": row.get("descripcion") or "", "url": row.get("url"),
"categoria_id": cat_id, "pais_id": pais_id, "idioma": row.get("idioma") or None, "activo": activo,
"fallos": int(row.get("fallos", 0) or 0)
}
)
n_ok += 1
cursor.execute("RELEASE SAVEPOINT restore_feed_row")
@ -537,100 +642,80 @@ def restore_feeds():
return redirect(url_for("dashboard"))
return render_template("restore_feeds.html")
# --- TAREA DE FONDO ---
def fetch_and_store():
with app.app_context():
logging.info("--- INICIANDO CICLO DE CAPTURA ---")
feeds_to_process = []
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
logging.info("Paso 1: Obteniendo lista de feeds...")
# --- CORRECCIÓN: Se añade 'nombre' al SELECT ---
cursor.execute("SELECT id, nombre, url, categoria_id, pais_id, last_etag, last_modified FROM feeds WHERE activo = TRUE")
feeds_to_process = cursor.fetchall()
logging.info(f"Paso 2: {len(feeds_to_process)} feeds para procesar.")
except psycopg2.Error as db_err:
logging.error(f"Error de BD al obtener feeds: {db_err}")
return
if not feeds_to_process:
logging.info("No hay feeds activos para procesar.")
return
feeds_fallidos, feeds_exitosos, todas_las_noticias, feeds_para_actualizar_headers = [], [], [], []
logging.info(f"Paso 3: Iniciando procesamiento paralelo ({MAX_WORKERS} workers)...")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_feed = {executor.submit(process_single_feed, dict(feed)): feed for feed in feeds_to_process}
progress_bar = tqdm(as_completed(future_to_feed), total=len(feeds_to_process), desc="Procesando Feeds")
for future in progress_bar:
original_feed_data = future_to_feed[future]
feed_id = original_feed_data['id']
try:
_, noticias_encontradas, new_etag, new_modified, success = future.result(timeout=SINGLE_FEED_TIMEOUT)
if success:
feeds_exitosos.append(feed_id)
if noticias_encontradas: todas_las_noticias.extend(noticias_encontradas)
if (new_etag is not None and new_etag != original_feed_data.get('last_etag')) or \
(new_modified is not None and new_modified != original_feed_data.get('last_modified')):
feeds_para_actualizar_headers.append({'id': feed_id, 'etag': new_etag, 'modified': new_modified})
else:
feeds_fallidos.append(feed_id)
except TimeoutError:
logging.error(f"!!! TIMEOUT en feed {original_feed_data['url']} (ID: {feed_id})")
feeds_fallidos.append(feed_id)
except Exception as exc:
logging.error(f"Excepción en feed {original_feed_data['url']} (ID: {feed_id}): {exc}", exc_info=True)
feeds_fallidos.append(feed_id)
logging.info(f"Paso 4: Procesamiento finalizado. Noticias nuevas: {len(todas_las_noticias)}, Feeds fallidos: {len(feeds_fallidos)}, Feeds actualizados: {len(feeds_para_actualizar_headers)}.")
if not any([todas_las_noticias, feeds_fallidos, feeds_exitosos, feeds_para_actualizar_headers]):
logging.info("Sin cambios que aplicar en la base de datos.")
return
@app.route("/restore_urls", methods=["GET", "POST"])
def restore_urls():
if request.method == "POST":
file = request.files.get("file")
if not file or not file.filename.endswith(".csv"):
flash("Archivo no válido. Sube un .csv.", "error")
return redirect(url_for("restore_urls"))
try:
file_stream = StringIO(file.read().decode("utf-8", errors='ignore'))
reader = csv.DictReader(file_stream)
rows = list(reader)
n_ok, n_err = 0, 0
with get_conn() as conn:
logging.info("Paso 5: Actualizando BD...")
for row in rows:
with conn.cursor() as cursor:
try:
cursor.execute("SAVEPOINT restore_url_row")
cat_id = int(row["categoria_id"]) if row.get("categoria_id") and row["categoria_id"].strip() else None
pais_id = int(row["pais_id"]) if row.get("pais_id") and row["pais_id"].strip() else None
cursor.execute(
"""
INSERT INTO fuentes_url (id, nombre, url, categoria_id, pais_id, idioma)
VALUES (%(id)s, %(nombre)s, %(url)s, %(categoria_id)s, %(pais_id)s, %(idioma)s)
ON CONFLICT (id) DO UPDATE SET
nombre=EXCLUDED.nombre, url=EXCLUDED.url, categoria_id=EXCLUDED.categoria_id,
pais_id=EXCLUDED.pais_id, idioma=EXCLUDED.idioma;
""",
{
"id": int(row["id"]),
"nombre": row.get("nombre"),
"url": row.get("url"),
"categoria_id": cat_id,
"pais_id": pais_id,
"idioma": row.get("idioma") or None
}
)
n_ok += 1
cursor.execute("RELEASE SAVEPOINT restore_url_row")
except Exception as e:
cursor.execute("ROLLBACK TO SAVEPOINT restore_url_row")
n_err += 1
app.logger.error(f"Error procesando fila de fuente URL (se omite): {row} - Error: {e}")
flash(f"Restauración de Fuentes URL completada. Procesadas: {n_ok}. Errores: {n_err}.", "success" if n_err == 0 else "warning")
except Exception as e:
app.logger.error(f"Error al restaurar fuentes URL desde CSV: {e}", exc_info=True)
flash(f"Ocurrió un error general al procesar el archivo: {e}", "error")
return redirect(url_for("dashboard"))
return render_template("restore_urls.html")
if feeds_fallidos or feeds_exitosos:
with conn.cursor() as cursor_feeds_status:
if feeds_fallidos:
cursor_feeds_status.execute("UPDATE feeds SET fallos = fallos + 1 WHERE id IN %s", (tuple(feeds_fallidos),))
cursor_feeds_status.execute("UPDATE feeds SET activo = FALSE WHERE fallos >= %s AND id IN %s", (MAX_FALLOS, tuple(feeds_fallidos)))
if feeds_exitosos:
cursor_feeds_status.execute("UPDATE feeds SET fallos = 0 WHERE id IN %s", (tuple(feeds_exitosos),))
if feeds_para_actualizar_headers:
with conn.cursor() as cursor_headers:
psycopg2.extras.execute_values(
cursor_headers,
"UPDATE feeds SET last_etag = data.etag, last_modified = data.modified FROM (VALUES %s) AS data(id, etag, modified) WHERE feeds.id = data.id",
[(f['id'], f['etag'], f['modified']) for f in feeds_para_actualizar_headers]
)
if todas_las_noticias:
logging.info(f"Intentando insertar {len(todas_las_noticias)} noticias en la base de datos.")
with conn.cursor() as cursor_news_insert:
# --- CORRECCIÓN: Se añade 'fuente_nombre' a la consulta INSERT ---
psycopg2.extras.execute_values(
cursor_news_insert,
"INSERT INTO noticias (id, titulo, resumen, url, fecha, imagen_url, fuente_nombre, categoria_id, pais_id) VALUES %s ON CONFLICT (url) DO NOTHING",
todas_las_noticias
)
rows_inserted = cursor_news_insert.rowcount
logging.info(f"Se insertaron/omitieron {rows_inserted} noticias (ON CONFLICT DO NOTHING).")
logging.info("--- CICLO DE CAPTURA FINALIZADO ---")
except psycopg2.Error as db_err:
logging.error(f"Error de BD en actualización masiva: {db_err}", exc_info=True)
# --- RUTA DE UTILIDAD PARA PRUEBAS ---
# MOVIDA FUERA DEL BLOQUE if __name__ == '__main__' PARA QUE GUNICORN LA RECONOZCA
@app.route("/run-fetch")
def run_fetch_now():
"""Ejecuta la tarea de recolección manualmente para pruebas."""
try:
# Idealmente, esto debería correr en un hilo separado para no bloquear la respuesta
# pero para una ejecución manual simple, está bien así.
fetch_and_store_all()
flash("Tarea de fondo de recolección ejecutada manualmente.", "info")
except Exception as e:
flash(f"Error al ejecutar la tarea de fondo: {e}", "error")
app.logger.error(f"Error en la ejecución manual de la tarea de fondo: {e}", exc_info=True)
return redirect(url_for('dashboard'))
if __name__ == "__main__":
if not db_pool:
app.logger.error("La aplicación no puede arrancar sin una conexión a la base de datos.")
sys.exit(1)
app.run(host="0.0.0.0", port=5000, debug=True)
# El app.run solo se usa para el desarrollo local. Gunicorn no ejecuta esta parte.
app.run(host="0.0.0.0", port=8000, debug=True)