diff --git a/app.py b/app.py index 56df327..848dbfa 100644 --- a/app.py +++ b/app.py @@ -93,7 +93,6 @@ def home(): paises = cursor.fetchall() sql_params, conditions = [], [] - # --- CORRECCIÓN: SE AÑADE 'fuente_nombre' AL SELECT --- sql_base = "SELECT n.fecha, n.titulo, n.resumen, n.url, n.imagen_url, n.fuente_nombre, c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente FROM noticias n LEFT JOIN categorias c ON n.categoria_id = c.id LEFT JOIN paises p ON n.pais_id = p.id LEFT JOIN continentes co ON p.continente_id = co.id" if q: @@ -352,69 +351,131 @@ def delete_url_source(url_id): flash(f"Error al eliminar la fuente URL: {db_err}", "error") return redirect(url_for("manage_urls")) -# --- PROCESAMIENTO DE URLS --- -@app.route("/scrape_url", methods=['GET', 'POST']) -def scrape_url(): - if request.method == 'POST': - source_id = request.form.get("source_id") - if not source_id: - flash("Debes seleccionar una fuente para procesar.", "error") - return redirect(url_for('scrape_url')) +# --- TAREA DE FONDO (CORREGIDA Y REFACTORIZADA) --- - source = None +def fetch_and_store_all(): + """ + Tarea de fondo única y cohesiva que recolecta noticias tanto de Feeds RSS como de Fuentes URL, + y luego actualiza la base de datos en una sola transacción. + """ + with app.app_context(): + logging.info("--- INICIANDO CICLO DE CAPTURA GLOBAL (RSS y URL) ---") + + todas_las_noticias = [] + feeds_fallidos = [] + feeds_exitosos = [] + feeds_para_actualizar_headers = [] + + # --- 1. PROCESAR FEEDS RSS --- + logging.info("=> Parte 1: Procesando Feeds RSS...") + feeds_to_process = [] try: with get_conn() as conn: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: - cursor.execute("SELECT * FROM fuentes_url WHERE id = %s", (source_id,)) - source = cursor.fetchone() + cursor.execute("SELECT id, nombre, url, categoria_id, pais_id, last_etag, last_modified FROM feeds WHERE activo = TRUE") + feeds_to_process = cursor.fetchall() + logging.info(f"Encontrados {len(feeds_to_process)} feeds RSS activos para procesar.") except psycopg2.Error as db_err: - app.logger.error(f"[DB ERROR] Al buscar fuente URL: {db_err}", exc_info=True) - flash("Error de base de datos al buscar la fuente.", "error") - return redirect(url_for('scrape_url')) + logging.error(f"Error de BD al obtener feeds RSS: {db_err}") + return - if not source: - flash("La fuente seleccionada no existe.", "error") - return redirect(url_for('scrape_url')) + if feeds_to_process: + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + future_to_feed = {executor.submit(process_single_feed, dict(feed)): feed for feed in feeds_to_process} + for future in tqdm(as_completed(future_to_feed), total=len(feeds_to_process), desc="Procesando Feeds RSS"): + original_feed_data = future_to_feed[future] + feed_id = original_feed_data['id'] + try: + _, noticias_encontradas, new_etag, new_modified, success = future.result(timeout=SINGLE_FEED_TIMEOUT) + if success: + feeds_exitosos.append(feed_id) + if noticias_encontradas: + todas_las_noticias.extend(noticias_encontradas) + if (new_etag and new_etag != original_feed_data.get('last_etag')) or \ + (new_modified and new_modified != original_feed_data.get('last_modified')): + feeds_para_actualizar_headers.append({'id': feed_id, 'etag': new_etag, 'modified': new_modified}) + else: + feeds_fallidos.append(feed_id) + except Exception as exc: + logging.error(f"Excepción en feed {original_feed_data['url']} (ID: {feed_id}): {exc}") + feeds_fallidos.append(feed_id) + + noticias_desde_rss_count = len(todas_las_noticias) + logging.info(f"=> Parte 1 Finalizada. Noticias desde RSS: {noticias_desde_rss_count}. Éxitos: {len(feeds_exitosos)}. Fallos: {len(feeds_fallidos)}.") - lista_noticias, message = process_newspaper_url(source['nombre'], source['url'], source['categoria_id'], source['pais_id'], source['idioma']) + # --- 2. PROCESAR FUENTES URL --- + logging.info("=> Parte 2: Procesando Fuentes URL...") + urls_to_process = [] + try: + with get_conn() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: + cursor.execute("SELECT * FROM fuentes_url") + urls_to_process = cursor.fetchall() + logging.info(f"Encontradas {len(urls_to_process)} fuentes URL para scrapear.") + except Exception as e: + logging.error(f"Error de BD al obtener fuentes URL: {e}") - if lista_noticias: - try: - with get_conn() as conn: - with conn.cursor() as cursor: + if urls_to_process: + for source in tqdm(urls_to_process, desc="Procesando Fuentes URL"): + try: + noticias_encontradas, _ = process_newspaper_url( + source['nombre'], source['url'], source['categoria_id'], + source['pais_id'], source['idioma'] + ) + if noticias_encontradas: + todas_las_noticias.extend(noticias_encontradas) + except Exception as e: + logging.error(f"Fallo al procesar la fuente URL {source['nombre']}: {e}") + + noticias_desde_urls_count = len(todas_las_noticias) - noticias_desde_rss_count + logging.info(f"=> Parte 2 Finalizada. Noticias encontradas desde URLs: {noticias_desde_urls_count}.") + + # --- 3. ACTUALIZAR BD --- + logging.info("=> Parte 3: Actualizando la base de datos...") + if not any([todas_las_noticias, feeds_fallidos, feeds_exitosos, feeds_para_actualizar_headers]): + logging.info("No se encontraron nuevas noticias ni cambios en los feeds. Nada que actualizar.") + logging.info("--- CICLO DE CAPTURA GLOBAL FINALIZADO ---") + return + + try: + with get_conn() as conn: + with conn.cursor() as cursor: + if feeds_fallidos: + cursor.execute("UPDATE feeds SET fallos = fallos + 1 WHERE id IN %s", (tuple(feeds_fallidos),)) + cursor.execute("UPDATE feeds SET activo = FALSE WHERE fallos >= %s AND id IN %s", (MAX_FALLOS, tuple(feeds_fallidos))) + logging.info(f"Incrementado contador de fallos para {len(feeds_fallidos)} feeds.") + if feeds_exitosos: + cursor.execute("UPDATE feeds SET fallos = 0 WHERE id IN %s", (tuple(feeds_exitosos),)) + logging.info(f"Reseteado contador de fallos para {len(feeds_exitosos)} feeds.") + + if feeds_para_actualizar_headers: + psycopg2.extras.execute_values( + cursor, + "UPDATE feeds SET last_etag = data.etag, last_modified = data.modified FROM (VALUES %s) AS data(id, etag, modified) WHERE feeds.id = data.id", + [(f['id'], f['etag'], f['modified']) for f in feeds_para_actualizar_headers] + ) + logging.info(f"Actualizados headers para {len(feeds_para_actualizar_headers)} feeds.") + + if todas_las_noticias: + logging.info(f"Intentando insertar/ignorar {len(todas_las_noticias)} noticias en total.") insert_query = """ INSERT INTO noticias (id, titulo, resumen, url, fecha, imagen_url, fuente_nombre, categoria_id, pais_id) VALUES %s - ON CONFLICT (url) DO UPDATE SET - titulo = EXCLUDED.titulo, - resumen = EXCLUDED.resumen, - fecha = EXCLUDED.fecha, - imagen_url = EXCLUDED.imagen_url; + ON CONFLICT (url) DO NOTHING; """ - psycopg2.extras.execute_values(cursor, insert_query, lista_noticias) - flash(f"Se encontraron y guardaron {len(lista_noticias)} noticias desde '{source['nombre']}'.", "success") - return redirect(url_for("home")) - except psycopg2.Error as db_err: - app.logger.error(f"[DB ERROR] Al insertar noticias scrapeadas: {db_err}", exc_info=True) - flash(f"Error de base de datos al guardar las noticias: {db_err}", "error") - else: - flash(message, "warning") - - return redirect(url_for('scrape_url')) + psycopg2.extras.execute_values(cursor, insert_query, todas_las_noticias, page_size=200) + logging.info(f"Inserción de noticias finalizada. {cursor.rowcount} filas podrían haber sido afectadas.") + + logging.info("=> Parte 3 Finalizada. Base de datos actualizada correctamente.") + except Exception as e: + logging.error(f"Error de BD en la actualización masiva final: {e}", exc_info=True) - fuentes = [] - try: - with get_conn() as conn: - with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: - cursor.execute("SELECT id, nombre FROM fuentes_url ORDER BY nombre") - fuentes = cursor.fetchall() - except psycopg2.Error as db_err: - flash("Error al cargar las fuentes de URL.", "error") + logging.info("--- CICLO DE CAPTURA GLOBAL FINALIZADO ---") - return render_template("scrape_url.html", fuentes=fuentes) -# --- BACKUP Y RESTORE --- +# --- SECCIÓN DE BACKUPS Y RESTAURACIÓN --- + @app.route("/backup_feeds") def backup_feeds(): try: @@ -425,7 +486,7 @@ def backup_feeds(): if not feeds_: flash("No hay feeds para exportar.", "warning") return redirect(url_for("dashboard")) - + fieldnames = list(feeds_[0].keys()) output = StringIO() writer = csv.DictWriter(output, fieldnames=fieldnames) @@ -437,6 +498,40 @@ def backup_feeds(): flash(f"Error interno al generar el backup: {e}", "error") return redirect(url_for("dashboard")) +@app.route("/backup_urls") +def backup_urls(): + try: + with get_conn() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: + cursor.execute(""" + SELECT f.id, f.nombre, f.url, f.categoria_id, c.nombre AS categoria, f.pais_id, p.nombre AS pais, f.idioma + FROM fuentes_url f + LEFT JOIN categorias c ON f.categoria_id = c.id + LEFT JOIN paises p ON f.pais_id = p.id + ORDER BY f.id + """) + fuentes = cursor.fetchall() + + if not fuentes: + flash("No hay fuentes URL para exportar.", "warning") + return redirect(url_for("dashboard")) + + fieldnames = list(fuentes[0].keys()) + output = StringIO() + writer = csv.DictWriter(output, fieldnames=fieldnames) + writer.writeheader() + writer.writerows([dict(fuente) for fuente in fuentes]) + + return Response( + output.getvalue(), + mimetype="text/csv", + headers={"Content-Disposition": "attachment;filename=fuentes_url_backup.csv"} + ) + except Exception as e: + app.logger.error(f"[ERROR] Al hacer backup de fuentes URL: {e}", exc_info=True) + flash(f"Error interno al generar el backup de fuentes URL: {e}", "error") + return redirect(url_for("dashboard")) + @app.route("/backup_noticias") def backup_noticias(): try: @@ -447,7 +542,7 @@ def backup_noticias(): if not noticias: flash("No hay noticias para exportar.", "warning") return redirect(url_for("dashboard")) - + fieldnames_noticias = list(noticias[0].keys()) output = StringIO() writer = csv.DictWriter(output, fieldnames=fieldnames_noticias) @@ -466,25 +561,33 @@ def backup_completo(): with zipfile.ZipFile(memory_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf: with get_conn() as conn: with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: - cursor.execute("SELECT f.id, f.nombre, f.descripcion, f.url, f.categoria_id, c.nombre AS categoria, f.pais_id, p.nombre AS pais, f.idioma, f.activo, f.fallos FROM feeds f LEFT JOIN categorias c ON f.categoria_id = c.id LEFT JOIN paises p ON f.pais_id = p.id ORDER BY f.id") + cursor.execute("SELECT * FROM feeds ORDER BY id") feeds_data = cursor.fetchall() if feeds_data: - fieldnames_feeds = list(feeds_data[0].keys()) - output = StringIO() - writer = csv.DictWriter(output, fieldnames=fieldnames_feeds) - writer.writeheader() - writer.writerows([dict(f) for f in feeds_data]) - zipf.writestr("feeds.csv", output.getvalue()) + output_feeds = StringIO() + writer_feeds = csv.DictWriter(output_feeds, fieldnames=list(feeds_data[0].keys())) + writer_feeds.writeheader() + writer_feeds.writerows([dict(f) for f in feeds_data]) + zipf.writestr("feeds.csv", output_feeds.getvalue()) - cursor.execute("SELECT n.id, n.titulo, n.resumen, n.url, n.fecha, n.imagen_url, n.fuente_nombre, c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente FROM noticias n LEFT JOIN categorias c ON n.categoria_id = c.id LEFT JOIN paises p ON n.pais_id = p.id LEFT JOIN continentes co ON p.continente_id = co.id ORDER BY n.fecha DESC") + cursor.execute("SELECT * FROM fuentes_url ORDER BY id") + fuentes_data = cursor.fetchall() + if fuentes_data: + output_fuentes = StringIO() + writer_fuentes = csv.DictWriter(output_fuentes, fieldnames=list(fuentes_data[0].keys())) + writer_fuentes.writeheader() + writer_fuentes.writerows([dict(f) for f in fuentes_data]) + zipf.writestr("fuentes_url.csv", output_fuentes.getvalue()) + + cursor.execute("SELECT * FROM noticias ORDER BY fecha DESC") noticias_data = cursor.fetchall() if noticias_data: - fieldnames_noticias = list(noticias_data[0].keys()) - output = StringIO() - writer = csv.DictWriter(output, fieldnames=fieldnames_noticias) - writer.writeheader() - writer.writerows([dict(n) for n in noticias_data]) - zipf.writestr("noticias.csv", output.getvalue()) + output_noticias = StringIO() + writer_noticias = csv.DictWriter(output_noticias, fieldnames=list(noticias_data[0].keys())) + writer_noticias.writeheader() + writer_noticias.writerows([dict(n) for n in noticias_data]) + zipf.writestr("noticias.csv", output_noticias.getvalue()) + memory_buffer.seek(0) return Response(memory_buffer, mimetype="application/zip", headers={"Content-Disposition": "attachment;filename=rss_backup_completo.zip"}) except Exception as e: @@ -520,9 +623,11 @@ def restore_feeds(): nombre=EXCLUDED.nombre, descripcion=EXCLUDED.descripcion, url=EXCLUDED.url, categoria_id=EXCLUDED.categoria_id, pais_id=EXCLUDED.pais_id, idioma=EXCLUDED.idioma, activo=EXCLUDED.activo, fallos=EXCLUDED.fallos; """, - {"id": int(row["id"]), "nombre": row.get("nombre"), "descripcion": row.get("descripcion") or "", "url": row.get("url"), - "categoria_id": cat_id, "pais_id": pais_id, "idioma": row.get("idioma") or None, "activo": activo, - "fallos": int(row.get("fallos", 0) or 0)} + { + "id": int(row["id"]), "nombre": row.get("nombre"), "descripcion": row.get("descripcion") or "", "url": row.get("url"), + "categoria_id": cat_id, "pais_id": pais_id, "idioma": row.get("idioma") or None, "activo": activo, + "fallos": int(row.get("fallos", 0) or 0) + } ) n_ok += 1 cursor.execute("RELEASE SAVEPOINT restore_feed_row") @@ -537,100 +642,80 @@ def restore_feeds(): return redirect(url_for("dashboard")) return render_template("restore_feeds.html") - -# --- TAREA DE FONDO --- - -def fetch_and_store(): - with app.app_context(): - logging.info("--- INICIANDO CICLO DE CAPTURA ---") - feeds_to_process = [] - try: - with get_conn() as conn: - with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: - logging.info("Paso 1: Obteniendo lista de feeds...") - # --- CORRECCIÓN: Se añade 'nombre' al SELECT --- - cursor.execute("SELECT id, nombre, url, categoria_id, pais_id, last_etag, last_modified FROM feeds WHERE activo = TRUE") - feeds_to_process = cursor.fetchall() - logging.info(f"Paso 2: {len(feeds_to_process)} feeds para procesar.") - except psycopg2.Error as db_err: - logging.error(f"Error de BD al obtener feeds: {db_err}") - return - - if not feeds_to_process: - logging.info("No hay feeds activos para procesar.") - return - - feeds_fallidos, feeds_exitosos, todas_las_noticias, feeds_para_actualizar_headers = [], [], [], [] - logging.info(f"Paso 3: Iniciando procesamiento paralelo ({MAX_WORKERS} workers)...") - - with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: - future_to_feed = {executor.submit(process_single_feed, dict(feed)): feed for feed in feeds_to_process} - progress_bar = tqdm(as_completed(future_to_feed), total=len(feeds_to_process), desc="Procesando Feeds") - for future in progress_bar: - original_feed_data = future_to_feed[future] - feed_id = original_feed_data['id'] - try: - _, noticias_encontradas, new_etag, new_modified, success = future.result(timeout=SINGLE_FEED_TIMEOUT) - if success: - feeds_exitosos.append(feed_id) - if noticias_encontradas: todas_las_noticias.extend(noticias_encontradas) - if (new_etag is not None and new_etag != original_feed_data.get('last_etag')) or \ - (new_modified is not None and new_modified != original_feed_data.get('last_modified')): - feeds_para_actualizar_headers.append({'id': feed_id, 'etag': new_etag, 'modified': new_modified}) - else: - feeds_fallidos.append(feed_id) - except TimeoutError: - logging.error(f"!!! TIMEOUT en feed {original_feed_data['url']} (ID: {feed_id})") - feeds_fallidos.append(feed_id) - except Exception as exc: - logging.error(f"Excepción en feed {original_feed_data['url']} (ID: {feed_id}): {exc}", exc_info=True) - feeds_fallidos.append(feed_id) - - logging.info(f"Paso 4: Procesamiento finalizado. Noticias nuevas: {len(todas_las_noticias)}, Feeds fallidos: {len(feeds_fallidos)}, Feeds actualizados: {len(feeds_para_actualizar_headers)}.") - if not any([todas_las_noticias, feeds_fallidos, feeds_exitosos, feeds_para_actualizar_headers]): - logging.info("Sin cambios que aplicar en la base de datos.") - return +@app.route("/restore_urls", methods=["GET", "POST"]) +def restore_urls(): + if request.method == "POST": + file = request.files.get("file") + if not file or not file.filename.endswith(".csv"): + flash("Archivo no válido. Sube un .csv.", "error") + return redirect(url_for("restore_urls")) try: + file_stream = StringIO(file.read().decode("utf-8", errors='ignore')) + reader = csv.DictReader(file_stream) + rows = list(reader) + n_ok, n_err = 0, 0 with get_conn() as conn: - logging.info("Paso 5: Actualizando BD...") + for row in rows: + with conn.cursor() as cursor: + try: + cursor.execute("SAVEPOINT restore_url_row") + cat_id = int(row["categoria_id"]) if row.get("categoria_id") and row["categoria_id"].strip() else None + pais_id = int(row["pais_id"]) if row.get("pais_id") and row["pais_id"].strip() else None + cursor.execute( + """ + INSERT INTO fuentes_url (id, nombre, url, categoria_id, pais_id, idioma) + VALUES (%(id)s, %(nombre)s, %(url)s, %(categoria_id)s, %(pais_id)s, %(idioma)s) + ON CONFLICT (id) DO UPDATE SET + nombre=EXCLUDED.nombre, url=EXCLUDED.url, categoria_id=EXCLUDED.categoria_id, + pais_id=EXCLUDED.pais_id, idioma=EXCLUDED.idioma; + """, + { + "id": int(row["id"]), + "nombre": row.get("nombre"), + "url": row.get("url"), + "categoria_id": cat_id, + "pais_id": pais_id, + "idioma": row.get("idioma") or None + } + ) + n_ok += 1 + cursor.execute("RELEASE SAVEPOINT restore_url_row") + except Exception as e: + cursor.execute("ROLLBACK TO SAVEPOINT restore_url_row") + n_err += 1 + app.logger.error(f"Error procesando fila de fuente URL (se omite): {row} - Error: {e}") + + flash(f"Restauración de Fuentes URL completada. Procesadas: {n_ok}. Errores: {n_err}.", "success" if n_err == 0 else "warning") + except Exception as e: + app.logger.error(f"Error al restaurar fuentes URL desde CSV: {e}", exc_info=True) + flash(f"Ocurrió un error general al procesar el archivo: {e}", "error") + + return redirect(url_for("dashboard")) + + return render_template("restore_urls.html") - if feeds_fallidos or feeds_exitosos: - with conn.cursor() as cursor_feeds_status: - if feeds_fallidos: - cursor_feeds_status.execute("UPDATE feeds SET fallos = fallos + 1 WHERE id IN %s", (tuple(feeds_fallidos),)) - cursor_feeds_status.execute("UPDATE feeds SET activo = FALSE WHERE fallos >= %s AND id IN %s", (MAX_FALLOS, tuple(feeds_fallidos))) - if feeds_exitosos: - cursor_feeds_status.execute("UPDATE feeds SET fallos = 0 WHERE id IN %s", (tuple(feeds_exitosos),)) - if feeds_para_actualizar_headers: - with conn.cursor() as cursor_headers: - psycopg2.extras.execute_values( - cursor_headers, - "UPDATE feeds SET last_etag = data.etag, last_modified = data.modified FROM (VALUES %s) AS data(id, etag, modified) WHERE feeds.id = data.id", - [(f['id'], f['etag'], f['modified']) for f in feeds_para_actualizar_headers] - ) - - if todas_las_noticias: - logging.info(f"Intentando insertar {len(todas_las_noticias)} noticias en la base de datos.") - with conn.cursor() as cursor_news_insert: - # --- CORRECCIÓN: Se añade 'fuente_nombre' a la consulta INSERT --- - psycopg2.extras.execute_values( - cursor_news_insert, - "INSERT INTO noticias (id, titulo, resumen, url, fecha, imagen_url, fuente_nombre, categoria_id, pais_id) VALUES %s ON CONFLICT (url) DO NOTHING", - todas_las_noticias - ) - rows_inserted = cursor_news_insert.rowcount - logging.info(f"Se insertaron/omitieron {rows_inserted} noticias (ON CONFLICT DO NOTHING).") - - logging.info("--- CICLO DE CAPTURA FINALIZADO ---") - except psycopg2.Error as db_err: - logging.error(f"Error de BD en actualización masiva: {db_err}", exc_info=True) +# --- RUTA DE UTILIDAD PARA PRUEBAS --- +# MOVIDA FUERA DEL BLOQUE if __name__ == '__main__' PARA QUE GUNICORN LA RECONOZCA +@app.route("/run-fetch") +def run_fetch_now(): + """Ejecuta la tarea de recolección manualmente para pruebas.""" + try: + # Idealmente, esto debería correr en un hilo separado para no bloquear la respuesta + # pero para una ejecución manual simple, está bien así. + fetch_and_store_all() + flash("Tarea de fondo de recolección ejecutada manualmente.", "info") + except Exception as e: + flash(f"Error al ejecutar la tarea de fondo: {e}", "error") + app.logger.error(f"Error en la ejecución manual de la tarea de fondo: {e}", exc_info=True) + return redirect(url_for('dashboard')) if __name__ == "__main__": if not db_pool: app.logger.error("La aplicación no puede arrancar sin una conexión a la base de datos.") sys.exit(1) - app.run(host="0.0.0.0", port=5000, debug=True) - + + # El app.run solo se usa para el desarrollo local. Gunicorn no ejecuta esta parte. + app.run(host="0.0.0.0", port=8000, debug=True) diff --git a/templates/base.html b/templates/base.html index 2151478..0ddbe22 100644 --- a/templates/base.html +++ b/templates/base.html @@ -9,7 +9,7 @@ - +