Actualización del 2025-06-13 a las 22:54:59
This commit is contained in:
parent
5846d1310d
commit
075a438fe2
3 changed files with 397 additions and 45 deletions
75
app.py
75
app.py
|
|
@ -1,8 +1,6 @@
|
|||
# app.py - Versión final solo para la web
|
||||
|
||||
import os
|
||||
import sys
|
||||
import hashlib # Keep if used elsewhere (e.g., if generating IDs in other parts of the app), otherwise remove
|
||||
import hashlib
|
||||
import csv
|
||||
import math
|
||||
from io import StringIO, BytesIO
|
||||
|
|
@ -20,17 +18,13 @@ import psycopg2.extras
|
|||
import psycopg2.pool
|
||||
import bleach
|
||||
|
||||
# Import the processing function from the new module
|
||||
from feed_processor import process_single_feed
|
||||
|
||||
# --- Configuración de Logging ---
|
||||
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='[%(asctime)s] %(levelname)s in %(module)s: %(message)s')
|
||||
|
||||
# --- Inicialización de la App Flask ---
|
||||
app = Flask(__name__)
|
||||
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', os.urandom(24))
|
||||
|
||||
# --- Configuración de la Base de Datos y Constantes ---
|
||||
DB_CONFIG = {
|
||||
"host": os.environ.get("DB_HOST", "localhost"),
|
||||
"port": int(os.environ.get("DB_PORT", 5432)),
|
||||
|
|
@ -39,21 +33,16 @@ DB_CONFIG = {
|
|||
"password": os.environ.get("DB_PASS", "x")
|
||||
}
|
||||
|
||||
# Define worker constants here or in a separate config
|
||||
MAX_WORKERS = int(os.environ.get("RSS_MAX_WORKERS", 20)) # Changed default to 20 concurrent workers
|
||||
SINGLE_FEED_TIMEOUT = int(os.environ.get("RSS_FEED_TIMEOUT", 30)) # Example: 30 seconds per feed process
|
||||
MAX_FALLOS = int(os.environ.get("RSS_MAX_FAILURES", 5)) # Number of failures before deactivating a feed
|
||||
MAX_WORKERS = int(os.environ.get("RSS_MAX_WORKERS", 20))
|
||||
SINGLE_FEED_TIMEOUT = int(os.environ.get("RSS_FEED_TIMEOUT", 30))
|
||||
MAX_FALLOS = int(os.environ.get("RSS_MAX_FAILURES", 5))
|
||||
|
||||
|
||||
# --- Pool de Conexiones a la Base de Datos ---
|
||||
db_pool = None
|
||||
try:
|
||||
# Aumentamos el pool para dar cabida a los workers del servidor web
|
||||
db_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=10, **DB_CONFIG)
|
||||
app.logger.info("Pool de conexiones a la base de datos creado exitosamente.")
|
||||
except psycopg2.OperationalError as e:
|
||||
logging.error(f"FATAL: No se pudo conectar a la base de datos para crear el pool: {e}")
|
||||
# Consider sys.exit(1) here if DB connection is absolutely critical for app startup
|
||||
|
||||
@contextmanager
|
||||
def get_conn():
|
||||
|
|
@ -69,14 +58,12 @@ def get_conn():
|
|||
finally:
|
||||
if conn: db_pool.putconn(conn)
|
||||
|
||||
# --- Hook de Cierre ---
|
||||
@atexit.register
|
||||
def shutdown_hooks():
|
||||
if db_pool:
|
||||
db_pool.closeall()
|
||||
app.logger.info("Pool de conexiones de la base de datos cerrado.")
|
||||
|
||||
# --- Filtros y Rutas ---
|
||||
@app.template_filter('safe_html')
|
||||
def safe_html(text):
|
||||
if not text: return ""
|
||||
|
|
@ -279,13 +266,16 @@ def backup_feeds():
|
|||
if not feeds_:
|
||||
flash("No hay feeds para exportar.", "warning")
|
||||
return redirect(url_for("dashboard"))
|
||||
|
||||
fieldnames = list(feeds_[0].keys())
|
||||
output = StringIO()
|
||||
writer = csv.DictWriter(output, fieldnames=feeds_[0].keys())
|
||||
writer = csv.DictWriter(output, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
writer.writerows(feeds_)
|
||||
writer.writerows([dict(feed) for feed in feeds_])
|
||||
return Response(output.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=feeds_backup.csv"})
|
||||
except Exception as e:
|
||||
app.logger.error(f"[ERROR] Al hacer backup de feeds: {e}", exc_info=True)
|
||||
flash(f"Error interno al generar el backup: {e}", "error")
|
||||
return redirect(url_for("dashboard"))
|
||||
|
||||
@app.route("/backup_noticias")
|
||||
|
|
@ -298,13 +288,16 @@ def backup_noticias():
|
|||
if not noticias:
|
||||
flash("No hay noticias para exportar.", "warning")
|
||||
return redirect(url_for("dashboard"))
|
||||
|
||||
fieldnames_noticias = list(noticias[0].keys())
|
||||
output = StringIO()
|
||||
writer = csv.DictWriter(output, fieldnames=noticias[0].keys())
|
||||
writer = csv.DictWriter(output, fieldnames=fieldnames_noticias)
|
||||
writer.writeheader()
|
||||
writer.writerows(noticias)
|
||||
writer.writerows([dict(noticia) for noticia in noticias])
|
||||
return Response(output.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=noticias_backup.csv"})
|
||||
except Exception as e:
|
||||
app.logger.error(f"[ERROR] Al hacer backup de noticias: {e}", exc_info=True)
|
||||
flash(f"Error interno al generar el backup: {e}", "error")
|
||||
return redirect(url_for("dashboard"))
|
||||
|
||||
@app.route("/backup_completo")
|
||||
|
|
@ -317,23 +310,27 @@ def backup_completo():
|
|||
cursor.execute("SELECT f.id, f.nombre, f.descripcion, f.url, f.categoria_id, c.nombre AS categoria, f.pais_id, p.nombre AS pais, f.idioma, f.activo, f.fallos FROM feeds f LEFT JOIN categorias c ON f.categoria_id = c.id LEFT JOIN paises p ON f.pais_id = p.id ORDER BY f.id")
|
||||
feeds_data = cursor.fetchall()
|
||||
if feeds_data:
|
||||
fieldnames_feeds = list(feeds_data[0].keys())
|
||||
output = StringIO()
|
||||
writer = csv.DictWriter(output, fieldnames=feeds_data[0].keys())
|
||||
writer = csv.DictWriter(output, fieldnames=fieldnames_feeds)
|
||||
writer.writeheader()
|
||||
writer.writerows(feeds_data)
|
||||
writer.writerows([dict(f) for f in feeds_data])
|
||||
zipf.writestr("feeds.csv", output.getvalue())
|
||||
|
||||
cursor.execute("SELECT n.id, n.titulo, n.resumen, n.url, n.fecha, n.imagen_url, c.nombre AS categoria, p.nombre AS pais, co.nombre AS continente FROM noticias n LEFT JOIN categorias c ON n.categoria_id = c.id LEFT JOIN paises p ON n.pais_id = p.id LEFT JOIN continentes co ON p.continente_id = co.id ORDER BY n.fecha DESC")
|
||||
noticias_data = cursor.fetchall()
|
||||
if noticias_data:
|
||||
fieldnames_noticias = list(noticias_data[0].keys())
|
||||
output = StringIO()
|
||||
writer = csv.DictWriter(output, fieldnames=noticias_data[0].keys())
|
||||
writer = csv.DictWriter(output, fieldnames=fieldnames_noticias)
|
||||
writer.writeheader()
|
||||
writer.writerows(noticias_data)
|
||||
writer.writerows([dict(n) for n in noticias_data])
|
||||
zipf.writestr("noticias.csv", output.getvalue())
|
||||
memory_buffer.seek(0)
|
||||
return Response(memory_buffer, mimetype="application/zip", headers={"Content-Disposition": "attachment;filename=rss_backup_completo.zip"})
|
||||
except Exception as e:
|
||||
app.logger.error(f"[ERROR] Al hacer backup completo: {e}", exc_info=True)
|
||||
flash(f"Error interno al generar el backup: {e}", "error")
|
||||
return redirect(url_for("dashboard"))
|
||||
|
||||
@app.route("/restore_feeds", methods=["GET", "POST"])
|
||||
|
|
@ -381,9 +378,6 @@ def restore_feeds():
|
|||
return redirect(url_for("dashboard"))
|
||||
return render_template("restore_feeds.html")
|
||||
|
||||
|
||||
# --- fetch_and_store function (modified slightly) ---
|
||||
|
||||
def fetch_and_store():
|
||||
with app.app_context():
|
||||
logging.info("--- INICIANDO CICLO DE CAPTURA ---")
|
||||
|
|
@ -406,19 +400,17 @@ def fetch_and_store():
|
|||
feeds_fallidos, feeds_exitosos, todas_las_noticias, feeds_para_actualizar_headers = [], [], [], []
|
||||
logging.info(f"Paso 3: Iniciando procesamiento paralelo ({MAX_WORKERS} workers)...")
|
||||
|
||||
# Pass the dict form of feed data
|
||||
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
||||
future_to_feed = {executor.submit(process_single_feed, dict(feed)): feed for feed in feeds_to_process}
|
||||
progress_bar = tqdm(as_completed(future_to_feed), total=len(feeds_to_process), desc="Procesando Feeds")
|
||||
for future in progress_bar:
|
||||
original_feed_data = future_to_feed[future] # This is the original DictRow
|
||||
original_feed_data = future_to_feed[future]
|
||||
feed_id = original_feed_data['id']
|
||||
try:
|
||||
_, noticias_encontradas, new_etag, new_modified, success = future.result(timeout=SINGLE_FEED_TIMEOUT)
|
||||
if success:
|
||||
feeds_exitosos.append(feed_id)
|
||||
if noticias_encontradas: todas_las_noticias.extend(noticias_encontradas)
|
||||
# Only add if etag/modified actually changed or were initially None
|
||||
if (new_etag is not None and new_etag != original_feed_data.get('last_etag')) or \
|
||||
(new_modified is not None and new_modified != original_feed_data.get('last_modified')):
|
||||
feeds_para_actualizar_headers.append({'id': feed_id, 'etag': new_etag, 'modified': new_modified})
|
||||
|
|
@ -437,33 +429,28 @@ def fetch_and_store():
|
|||
return
|
||||
|
||||
try:
|
||||
with get_conn() as conn: # This connection is for the entire transaction (commits/rolls back everything together)
|
||||
with get_conn() as conn:
|
||||
logging.info("Paso 5: Actualizando BD...")
|
||||
|
||||
# --- Update feed status (fallos, activo) ---
|
||||
if feeds_fallidos or feeds_exitosos: # Only create cursor if there's work
|
||||
if feeds_fallidos or feeds_exitosos:
|
||||
with conn.cursor() as cursor_feeds_status:
|
||||
if feeds_fallidos:
|
||||
cursor_feeds_status.execute("UPDATE feeds SET fallos = fallos + 1 WHERE id IN %s", (tuple(feeds_fallidos),))
|
||||
cursor_feeds_status.execute("UPDATE feeds SET activo = FALSE WHERE fallos >= %s AND id IN %s", (MAX_FALLOS, tuple(feeds_fallidos)))
|
||||
if feeds_exitosos:
|
||||
cursor_feeds_status.execute("UPDATE feeds SET fallos = 0 WHERE id IN %s", (tuple(feeds_exitosos),))
|
||||
# cursor_feeds_status is implicitly closed here
|
||||
|
||||
# --- Update feed headers (etag, modified) ---
|
||||
if feeds_para_actualizar_headers: # Only create cursor if there's work
|
||||
if feeds_para_actualizar_headers:
|
||||
with conn.cursor() as cursor_headers:
|
||||
psycopg2.extras.execute_values(
|
||||
cursor_headers,
|
||||
"UPDATE feeds SET last_etag = data.etag, last_modified = data.modified FROM (VALUES %s) AS data(id, etag, modified) WHERE feeds.id = data.id",
|
||||
[(f['id'], f['etag'], f['modified']) for f in feeds_para_actualizar_headers]
|
||||
)
|
||||
# cursor_headers is implicitly closed here
|
||||
|
||||
# --- Insert new news articles ---
|
||||
if todas_las_noticias: # Only create cursor if there's work
|
||||
if todas_las_noticias:
|
||||
logging.info(f"Intentando insertar {len(todas_las_noticias)} noticias en la base de datos.")
|
||||
with conn.cursor() as cursor_news_insert: # A fresh cursor specifically for news insertion
|
||||
with conn.cursor() as cursor_news_insert:
|
||||
psycopg2.extras.execute_values(
|
||||
cursor_news_insert,
|
||||
"INSERT INTO noticias (id, titulo, resumen, url, fecha, imagen_url, categoria_id, pais_id) VALUES %s ON CONFLICT (id) DO NOTHING",
|
||||
|
|
@ -471,16 +458,14 @@ def fetch_and_store():
|
|||
)
|
||||
rows_inserted = cursor_news_insert.rowcount
|
||||
logging.info(f"Se insertaron/omitieron {rows_inserted} noticias (ON CONFLICT DO NOTHING).")
|
||||
# cursor_news_insert is implicitly closed here
|
||||
|
||||
logging.info("--- CICLO DE CAPTURA FINALIZADO ---")
|
||||
logging.info("--- CICLO DE CAPTURA FINALIZADO ---")
|
||||
except psycopg2.Error as db_err:
|
||||
logging.error(f"Error de BD en actualización masiva: {db_err}", exc_info=True)
|
||||
|
||||
|
||||
# --- Arranque de la Aplicación (SOLO PARA DESARROLLO LOCAL) ---
|
||||
if __name__ == "__main__":
|
||||
if not db_pool:
|
||||
app.logger.error("La aplicación no puede arrancar sin una conexión a la base de datos.")
|
||||
sys.exit(1)
|
||||
app.run(host="0.0.0.0", port=5000, debug=True)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue