rss2/routers/auth.py
2026-01-13 13:39:51 +01:00

203 lines
8 KiB
Python

"""
Authentication router - User registration, login, and logout.
"""
from flask import Blueprint, request, render_template, redirect, url_for, session, flash
from psycopg2 import extras, IntegrityError
from db import get_conn
from utils.auth import (
hash_password, verify_password, is_authenticated,
validate_username, validate_password, validate_email
)
from datetime import datetime
auth_bp = Blueprint("auth", __name__, url_prefix="/auth")
def migrate_anonymous_favorites(session_id: str, user_id: int):
"""Migrate anonymous favorites to user account.
Args:
session_id: Anonymous session ID
user_id: User ID to migrate favorites to
"""
if not session_id:
return
with get_conn() as conn:
with conn.cursor() as cur:
# Migrate favorites, avoiding duplicates
cur.execute("""
UPDATE favoritos
SET user_id = %s, session_id = NULL
WHERE session_id = %s
AND noticia_id NOT IN (
SELECT noticia_id FROM favoritos WHERE user_id = %s
)
""", (user_id, session_id, user_id))
# Delete any remaining duplicates
cur.execute("""
DELETE FROM favoritos
WHERE session_id = %s
""", (session_id,))
conn.commit()
# ============================================================
# Registration
# ============================================================
@auth_bp.route("/register", methods=["GET", "POST"])
def register():
"""User registration page and handler."""
if is_authenticated():
return redirect(url_for('account.index'))
if request.method == "POST":
username = request.form.get("username", "").strip()
email = request.form.get("email", "").strip().lower()
password = request.form.get("password", "")
password_confirm = request.form.get("password_confirm", "")
# Validation
valid_username, username_error = validate_username(username)
if not valid_username:
flash(username_error, "danger")
return render_template("register.html", username=username, email=email)
valid_email, email_error = validate_email(email)
if not valid_email:
flash(email_error, "danger")
return render_template("register.html", username=username, email=email)
valid_password, password_error = validate_password(password)
if not valid_password:
flash(password_error, "danger")
return render_template("register.html", username=username, email=email)
if password != password_confirm:
flash("Las contraseñas no coinciden", "danger")
return render_template("register.html", username=username, email=email)
# Create user
try:
password_hash = hash_password(password)
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO usuarios (username, email, password_hash, last_login)
VALUES (%s, %s, %s, NOW())
RETURNING id
""", (username, email, password_hash))
user_id = cur.fetchone()[0]
conn.commit()
# Auto-login after registration
old_session_id = session.get('user_session')
session['user_id'] = user_id
session['username'] = username
# Migrate anonymous favorites if any
if old_session_id:
migrate_anonymous_favorites(old_session_id, user_id)
session.pop('user_session', None)
flash(f"¡Bienvenido {username}! Tu cuenta ha sido creada exitosamente.", "success")
return redirect(url_for('account.index'))
except IntegrityError as e:
if 'username' in str(e):
flash("Este nombre de usuario ya está en uso", "danger")
elif 'email' in str(e):
flash("Este email ya está registrado", "danger")
else:
flash("Error al crear la cuenta. Por favor intenta de nuevo.", "danger")
return render_template("register.html", username=username, email=email)
except Exception as e:
flash("Error al crear la cuenta. Por favor intenta de nuevo.", "danger")
return render_template("register.html", username=username, email=email)
return render_template("register.html")
# ============================================================
# Login
# ============================================================
@auth_bp.route("/login", methods=["GET", "POST"])
def login():
"""User login page and handler."""
if is_authenticated():
return redirect(url_for('account.index'))
if request.method == "POST":
username_or_email = request.form.get("username", "").strip()
password = request.form.get("password", "")
if not username_or_email or not password:
flash("Por favor ingresa tu usuario/email y contraseña", "danger")
return render_template("login.html", username=username_or_email)
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=extras.DictCursor) as cur:
# Try login with username or email
cur.execute("""
SELECT id, username, email, password_hash, is_active, avatar_url
FROM usuarios
WHERE (username = %s OR email = %s) AND is_active = TRUE
""", (username_or_email, username_or_email.lower()))
user = cur.fetchone()
if not user:
flash("Usuario o contraseña incorrectos", "danger")
return render_template("login.html", username=username_or_email)
if not verify_password(password, user['password_hash']):
flash("Usuario o contraseña incorrectos", "danger")
return render_template("login.html", username=username_or_email)
# Update last login
cur.execute("""
UPDATE usuarios SET last_login = NOW() WHERE id = %s
""", (user['id'],))
conn.commit()
# Create session
old_session_id = session.get('user_session')
session['user_id'] = user['id']
session['username'] = user['username']
session['avatar_url'] = user.get('avatar_url')
# Migrate anonymous favorites
if old_session_id:
migrate_anonymous_favorites(old_session_id, user['id'])
session.pop('user_session', None)
flash(f"¡Bienvenido de vuelta, {user['username']}!", "success")
# Redirect to 'next' parameter if exists
next_page = request.args.get('next')
if next_page and next_page.startswith('/'):
return redirect(next_page)
return redirect(url_for('account.index'))
except Exception as e:
flash("Error al iniciar sesión. Por favor intenta de nuevo.", "danger")
return render_template("login.html", username=username_or_email)
return render_template("login.html")
# ============================================================
# Logout
# ============================================================
@auth_bp.route("/logout", methods=["POST", "GET"])
def logout():
"""Log out the current user."""
username = session.get('username', 'Usuario')
session.clear()
flash(f"Hasta luego, {username}. Has cerrado sesión exitosamente.", "info")
return redirect(url_for('home.index'))