Initial clean commit

This commit is contained in:
jlimolina 2026-01-13 13:39:51 +01:00
commit 6784d81c2c
141 changed files with 25219 additions and 0 deletions

7
utils/__init__.py Normal file
View file

@ -0,0 +1,7 @@
"""Utils package for authentication and other utilities."""
# Import helper functions for backward compatibility
from .helpers import safe_html, format_date, country_flag
__all__ = ['safe_html', 'format_date', 'country_flag']

146
utils/auth.py Normal file
View file

@ -0,0 +1,146 @@
"""
Authentication utilities for user management.
Provides password hashing, verification, and authentication decorators.
"""
import bcrypt
from functools import wraps
from flask import session, redirect, url_for, flash
from db import get_conn
from psycopg2 import extras
def hash_password(password: str) -> str:
"""Hash a password using bcrypt.
Args:
password: Plain text password
Returns:
Hashed password string
"""
salt = bcrypt.gensalt(rounds=12)
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
def verify_password(password: str, password_hash: str) -> bool:
"""Verify a password against its hash.
Args:
password: Plain text password to verify
password_hash: Bcrypt hash to check against
Returns:
True if password matches, False otherwise
"""
try:
return bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8'))
except Exception:
return False
def get_current_user():
"""Get the currently authenticated user from session.
Returns:
User dict with id, username, email, etc. or None if not authenticated
"""
user_id = session.get('user_id')
if not user_id:
return None
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=extras.DictCursor) as cur:
cur.execute("""
SELECT id, username, email, created_at, last_login, is_active, avatar_url
FROM usuarios
WHERE id = %s AND is_active = TRUE
""", (user_id,))
user = cur.fetchone()
return dict(user) if user else None
except Exception:
return None
def is_authenticated() -> bool:
"""Check if current user is authenticated.
Returns:
True if user is logged in, False otherwise
"""
return 'user_id' in session and session.get('user_id') is not None
def login_required(f):
"""Decorator to require authentication for a route.
Usage:
@app.route('/protected')
@login_required
def protected_route():
return "You can only see this if logged in"
"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not is_authenticated():
flash('Por favor inicia sesión para acceder a esta página.', 'warning')
return redirect(url_for('auth.login', next=request.url))
return f(*args, **kwargs)
return decorated_function
def validate_username(username: str) -> tuple[bool, str]:
"""Validate username format.
Args:
username: Username to validate
Returns:
Tuple of (is_valid, error_message)
"""
if not username or len(username) < 3:
return False, "El nombre de usuario debe tener al menos 3 caracteres"
if len(username) > 50:
return False, "El nombre de usuario no puede tener más de 50 caracteres"
if not username.replace('_', '').replace('-', '').isalnum():
return False, "El nombre de usuario solo puede contener letras, números, guiones y guiones bajos"
return True, ""
def validate_password(password: str) -> tuple[bool, str]:
"""Validate password strength.
Args:
password: Password to validate
Returns:
Tuple of (is_valid, error_message)
"""
if not password or len(password) < 6:
return False, "La contraseña debe tener al menos 6 caracteres"
if len(password) > 128:
return False, "La contraseña no puede tener más de 128 caracteres"
return True, ""
def validate_email(email: str) -> tuple[bool, str]:
"""Validate email format.
Args:
email: Email to validate
Returns:
Tuple of (is_valid, error_message)
"""
try:
from email_validator import validate_email as validate_email_lib, EmailNotValidError
validate_email_lib(email)
return True, ""
except EmailNotValidError as e:
return False, f"Email inválido: {str(e)}"
except ImportError:
# Fallback to basic regex if email-validator not available
import re
if re.match(r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$', email):
return True, ""
return False, "Email inválido"

285
utils/feed_analysis.py Normal file
View file

@ -0,0 +1,285 @@
"""
Feed Analysis and Categorization Utilities
Provides functions to automatically detect language, suggest country and category
"""
import re
from typing import Dict, Optional, Tuple
import logging
from urllib.parse import urlparse
import requests
logger = logging.getLogger(__name__)
# Language to country mapping (primary countries for each language)
LANGUAGE_COUNTRY_MAP = {
'es': 'España',
'en': 'Reino Unido',
'fr': 'Francia',
'de': 'Alemania',
'it': 'Italia',
'pt': 'Portugal',
'nl': 'Países Bajos',
'pl': 'Polonia',
'ru': 'Rusia',
'zh': 'China',
'ja': 'Japón',
'ko': 'Corea del Sur',
'ar': 'Arabia Saudita',
'tr': 'Turquía',
'ca': 'España', # Catalan
'eu': 'España', # Basque
'gl': 'España', # Galician
}
# Domain to country mapping
DOMAIN_COUNTRY_MAP = {
'.es': 'España',
'.uk': 'Reino Unido',
'.co.uk': 'Reino Unido',
'.fr': 'Francia',
'.de': 'Alemania',
'.it': 'Italia',
'.pt': 'Portugal',
'.br': 'Brasil',
'.mx': 'México',
'.ar': 'Argentina',
'.cl': 'Chile',
'.co': 'Colombia',
'.pe': 'Perú',
'.ve': 'Venezuela',
'.us': 'Estados Unidos',
'.ca': 'Canadá',
'.au': 'Australia',
'.nz': 'Nueva Zelanda',
'.in': 'India',
'.cn': 'China',
'.jp': 'Japón',
'.kr': 'Corea del Sur',
'.ru': 'Rusia',
'.nl': 'Países Bajos',
'.be': 'Bélgica',
'.ch': 'Suiza',
'.at': 'Austria',
'.se': 'Suecia',
'.no': 'Noruega',
'.dk': 'Dinamarca',
'.fi': 'Finlandia',
'.pl': 'Polonia',
'.gr': 'Grecia',
'.tr': 'Turquía',
}
# Category keywords (Spanish)
CATEGORY_KEYWORDS = {
'Política': [
'politica', 'gobierno', 'elecciones', 'parlamento', 'congreso',
'ministerio', 'partido', 'votacion', 'democracia', 'legislativo'
],
'Economía': [
'economia', 'finanzas', 'bolsa', 'mercado', 'empresa', 'negocio',
'banco', 'dinero', 'comercio', 'industria', 'pib', 'inflacion'
],
'Tecnología': [
'tecnologia', 'tech', 'digital', 'software', 'hardware', 'internet',
'startup', 'innovacion', 'ciencia', 'ai', 'inteligencia artificial'
],
'Deportes': [
'deportes', 'futbol', 'basket', 'tenis', 'olimpicos', 'liga',
'champions', 'competicion', 'atleta', 'deporte'
],
'Cultura': [
'cultura', 'arte', 'museo', 'exposicion', 'literatura', 'libros',
'teatro', 'musica', 'cine', 'film', 'festival'
],
'Sociedad': [
'sociedad', 'social', 'comunidad', 'gente', 'vida', 'familia',
'educacion', 'salud', 'sanidad', 'medicina'
],
'Internacional': [
'internacional', 'mundo', 'global', 'extranjero', 'exterior',
'foreign', 'world', 'internacional'
],
'Nacional': [
'nacional', 'espana', 'pais', 'nacional', 'domestic', 'local'
],
}
def detect_country_from_url(url: str) -> Optional[str]:
"""
Detect country from URL domain
Returns country name or None
"""
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Check domain extensions
for tld, country in DOMAIN_COUNTRY_MAP.items():
if domain.endswith(tld):
return country
# Check for country names in domain
domain_parts = domain.split('.')
for part in domain_parts:
for tld, country in DOMAIN_COUNTRY_MAP.items():
if tld.strip('.') == part:
return country
return None
except Exception as e:
logger.error(f"Error detecting country from URL {url}: {e}")
return None
def detect_country_from_language(language: str) -> Optional[str]:
"""
Get primary country for a language code
"""
if not language:
return None
# Extract first 2 characters
lang_code = language[:2].lower()
return LANGUAGE_COUNTRY_MAP.get(lang_code)
def suggest_category_from_text(text: str) -> Tuple[Optional[str], float]:
"""
Suggest category based on text analysis
Returns (category_name, confidence_score)
"""
if not text:
return None, 0.0
text_lower = text.lower()
scores = {}
# Count keyword matches for each category
for category, keywords in CATEGORY_KEYWORDS.items():
score = 0
for keyword in keywords:
# Count occurrences
count = len(re.findall(r'\b' + re.escape(keyword) + r'\b', text_lower))
score += count
if score > 0:
scores[category] = score
if not scores:
return None, 0.0
# Get category with highest score
best_category = max(scores.items(), key=lambda x: x[1])
total_keywords = sum(scores.values())
confidence = best_category[1] / total_keywords if total_keywords > 0 else 0.0
return best_category[0], confidence
def analyze_feed(feed_metadata: Dict) -> Dict:
"""
Comprehensive feed analysis to detect country and suggest category
Args:
feed_metadata: Dictionary with feed metadata from get_feed_metadata()
Returns:
Dictionary with analysis results:
{
'detected_country': 'España',
'country_source': 'domain' or 'language',
'suggested_category': 'Política',
'category_confidence': 0.75,
'language': 'es',
'analysis_notes': 'Detected from .es domain'
}
"""
analysis = {
'detected_country': None,
'country_source': None,
'suggested_category': None,
'category_confidence': 0.0,
'language': None,
'analysis_notes': []
}
# Extract data from metadata
feed_url = feed_metadata.get('url', '')
feed_title = feed_metadata.get('title', '')
feed_description = feed_metadata.get('description', '')
feed_language = feed_metadata.get('language', '')
# Detect language
if feed_language:
analysis['language'] = feed_language[:2].lower()
analysis['analysis_notes'].append(f"Language: {feed_language}")
# Detect country from domain
country_from_domain = detect_country_from_url(feed_url)
if country_from_domain:
analysis['detected_country'] = country_from_domain
analysis['country_source'] = 'domain'
analysis['analysis_notes'].append(f"Country from domain: {country_from_domain}")
# If no country from domain, try language
if not analysis['detected_country'] and analysis['language']:
country_from_lang = detect_country_from_language(analysis['language'])
if country_from_lang:
analysis['detected_country'] = country_from_lang
analysis['country_source'] = 'language'
analysis['analysis_notes'].append(f"Country from language: {country_from_lang}")
# Suggest category from title and description
combined_text = f"{feed_title} {feed_description}"
category, confidence = suggest_category_from_text(combined_text)
if category:
analysis['suggested_category'] = category
analysis['category_confidence'] = confidence
analysis['analysis_notes'].append(
f"Suggested category: {category} (confidence: {confidence:.2%})"
)
# Join notes
analysis['analysis_notes'] = ' | '.join(analysis['analysis_notes'])
return analysis
def get_country_id_by_name(conn, country_name: str) -> Optional[int]:
"""Get country ID from database by name"""
if not country_name:
return None
try:
with conn.cursor() as cur:
cur.execute(
"SELECT id FROM paises WHERE LOWER(nombre) = LOWER(%s)",
(country_name,)
)
result = cur.fetchone()
return result[0] if result else None
except Exception as e:
logger.error(f"Error getting country ID for {country_name}: {e}")
return None
def get_category_id_by_name(conn, category_name: str) -> Optional[int]:
"""Get category ID from database by name"""
if not category_name:
return None
try:
with conn.cursor() as cur:
cur.execute(
"SELECT id FROM categorias WHERE LOWER(nombre) = LOWER(%s)",
(category_name,)
)
result = cur.fetchone()
return result[0] if result else None
except Exception as e:
logger.error(f"Error getting category ID for {category_name}: {e}")
return None

263
utils/feed_discovery.py Normal file
View file

@ -0,0 +1,263 @@
"""
Feed Discovery Utility
Provides functions to automatically discover RSS/Atom feeds from URLs.
"""
import feedfinder2
import feedparser
import requests
from typing import List, Dict, Optional
import logging
logger = logging.getLogger(__name__)
def discover_feeds(url: str, timeout: int = 15) -> List[Dict[str, str]]:
"""
Discover RSS/Atom feeds from a given URL, including link text context.
Args:
url: The URL to search for feeds
timeout: Request timeout in seconds
Returns:
List of dictionaries with feed information:
[
{
'url': 'feed_url',
'title': 'Feed Title',
'context_label': 'Text on the link that pointed here',
'type': 'rss' or 'atom',
'valid': True/False
}
]
"""
discovered_feeds = []
feed_context_map = {}
try:
# 1. Fetch content yourself to parse context (link names)
logger.info(f"Fetching content from: {url}")
headers = {'User-Agent': 'Mozilla/5.0 (compatible; RSS2-Feed-Discovery/1.0; +http://localhost)'}
response = requests.get(url, timeout=timeout, headers=headers)
if response.status_code == 200:
from bs4 import BeautifulSoup
from urllib.parse import urljoin
soup = BeautifulSoup(response.content, 'html.parser')
# Find <link> tags in head
for link in soup.find_all('link', rel='alternate'):
if link.get('type') in ['application/rss+xml', 'application/atom+xml']:
href = link.get('href')
title = link.get('title')
if href:
abs_url = urljoin(url, href)
if title:
feed_context_map[abs_url] = title
# Find <a> tags that might be feeds
for a in soup.find_all('a', href=True):
href = a['href']
# Simple heuristic for potential RSS links
if any(x in href.lower() for x in ['.rss', '.xml', 'feed', 'rss']):
abs_url = urljoin(url, href)
text = a.get_text(strip=True)
title = a.get('title')
# Prefer text, then title
context = text if text else title
if context:
# Don't overwrite if we already have a nice title from <link>
# actually, <a> text is often more descriptive on index pages ("Politics", "Sports")
# while <link> title is usually "Site Name RSS"
# Let's keep the shortest/cleanest or just overwrite.
# For now, let's keep the existing logic or overwrite if not present.
if abs_url not in feed_context_map:
feed_context_map[abs_url] = context
# 2. Use feedfinder2 for robust discovery
logger.info(f"Discovering feeds from: {url}")
feed_urls = feedfinder2.find_feeds(url)
if not feed_urls:
# Fallback: maybe feedfinder missed some that we found manually?
# feedfinder matches strict rules. Let's add ours if valid URLs.
for mapped_url in feed_context_map.keys():
if mapped_url not in feed_urls:
feed_urls.append(mapped_url)
if not feed_urls:
logger.warning(f"No feeds found for URL: {url}")
return []
logger.info(f"Found {len(feed_urls)} potential feeds")
# 3. Validate and merge context
for feed_url in feed_urls:
feed_info = validate_feed(feed_url, timeout=timeout)
if feed_info:
# Add the discovered context label
# fuzzy match if exact match fails?
context = feed_context_map.get(feed_url)
if not context:
# Try trailing slash variations
context = feed_context_map.get(feed_url.rstrip('/'))
feed_info['context_label'] = context or ""
discovered_feeds.append(feed_info)
# Sort by validity (valid feeds first)
discovered_feeds.sort(key=lambda x: x['valid'], reverse=True)
return discovered_feeds
except Exception as e:
logger.error(f"Error discovering feeds from {url}: {e}")
return []
def validate_feed(feed_url: str, timeout: int = 10) -> Optional[Dict[str, str]]:
"""
Validate and extract information from a feed URL.
Args:
feed_url: The feed URL to validate
timeout: Request timeout in seconds
Returns:
Dictionary with feed information if valid, None otherwise
"""
try:
# Try to parse the feed
response = requests.get(
feed_url,
timeout=timeout,
headers={'User-Agent': 'RSS2-Feed-Discovery/1.0'}
)
if response.status_code != 200:
logger.warning(f"Feed returned status {response.status_code}: {feed_url}")
return {
'url': feed_url,
'title': 'Unknown Feed',
'type': 'unknown',
'valid': False,
'error': f'HTTP {response.status_code}'
}
# Parse the feed
feed = feedparser.parse(response.content)
if feed.bozo and not feed.entries:
# Feed has errors and no entries
logger.warning(f"Invalid feed (no entries): {feed_url}")
return {
'url': feed_url,
'title': 'Invalid Feed',
'type': 'unknown',
'valid': False,
'error': 'No entries found'
}
# Extract feed information
feed_type = feed.get('version', 'unknown')
if feed_type.startswith('rss'):
feed_format = 'rss'
elif feed_type.startswith('atom'):
feed_format = 'atom'
else:
feed_format = 'unknown'
# Get feed title
title = feed.feed.get('title', 'Untitled Feed')
# Get feed description
description = feed.feed.get('description', '') or feed.feed.get('subtitle', '')
return {
'url': feed_url,
'title': title,
'description': description,
'type': feed_format,
'version': feed_type,
'valid': True,
'entry_count': len(feed.entries)
}
except requests.Timeout:
logger.error(f"Timeout validating feed: {feed_url}")
return {
'url': feed_url,
'title': 'Timeout',
'type': 'unknown',
'valid': False,
'error': 'Request timeout'
}
except Exception as e:
logger.error(f"Error validating feed {feed_url}: {e}")
return {
'url': feed_url,
'title': 'Error',
'type': 'unknown',
'valid': False,
'error': str(e)
}
def get_feed_metadata(feed_url: str, timeout: int = 10) -> Optional[Dict[str, any]]:
"""
Get detailed metadata from a feed URL.
Args:
feed_url: The feed URL
timeout: Request timeout in seconds
Returns:
Dictionary with detailed feed metadata
"""
try:
response = requests.get(
feed_url,
timeout=timeout,
headers={'User-Agent': 'RSS2-Feed-Discovery/1.0'}
)
if response.status_code != 200:
return None
feed = feedparser.parse(response.content)
if feed.bozo and not feed.entries:
return None
# Extract comprehensive metadata
metadata = {
'title': feed.feed.get('title', ''),
'description': feed.feed.get('description', '') or feed.feed.get('subtitle', ''),
'link': feed.feed.get('link', ''),
'language': feed.feed.get('language', ''),
'updated': feed.feed.get('updated', ''),
'image_url': '',
'entry_count': len(feed.entries),
'entries': []
}
# Extract image if available
if hasattr(feed.feed, 'image'):
metadata['image_url'] = feed.feed.image.get('href', '')
# Get first 5 entries as preview
for entry in feed.entries[:5]:
metadata['entries'].append({
'title': entry.get('title', ''),
'link': entry.get('link', ''),
'published': entry.get('published', '')
})
return metadata
except Exception as e:
logger.error(f"Error getting feed metadata for {feed_url}: {e}")
return None

212
utils/helpers.py Normal file
View file

@ -0,0 +1,212 @@
from __future__ import annotations
import time
from datetime import datetime
from typing import Optional
import bleach
from markupsafe import Markup
def safe_html(texto: Optional[str]) -> str:
if not texto:
return ""
# Sanitize content to prevent layout breakage (e.g. unclosed divs)
allowed_tags = ['b', 'i', 'strong', 'em', 'p', 'br', 'span', 'a']
allowed_attrs = {'a': ['href', 'target', 'rel']}
cleaned = bleach.clean(texto, tags=allowed_tags, attributes=allowed_attrs, strip=True)
return Markup(cleaned)
def normalize_url_py(u: Optional[str]) -> Optional[str]:
if not u:
return None
u = u.strip()
if not u:
return None
if "://" not in u:
u = "http://" + u
u = u.split("#", 1)[0]
try:
from urllib.parse import (
urlsplit,
urlunsplit,
parse_qsl,
urlencode,
)
except ImportError:
return u
try:
parts = urlsplit(u)
except Exception:
return u
scheme = parts.scheme.lower()
netloc = parts.netloc.lower()
if "@" in netloc:
auth, host = netloc.rsplit("@", 1)
else:
auth, host = None, netloc
if ":" in host:
hostname, port = host.split(":", 1)
else:
hostname, port = host, None
hostname = hostname.strip()
if port:
port = port.strip()
if (scheme == "http" and port == "80") or (scheme == "https" and port == "443"):
port = None
if port:
host = f"{hostname}:{port}"
else:
host = hostname
if auth:
host = f"{auth}@{host}"
query_list = parse_qsl(parts.query, keep_blank_values=True)
query_filtered = [
(k, v)
for (k, v) in query_list
if not (k.startswith("utm_") or k in ("gclid", "fbclid"))
]
query = urlencode(query_filtered)
path = parts.path
while "//" in path:
path = path.replace("//", "/")
cleaned = urlunsplit((scheme, host, path, query, ""))
return cleaned
def parse_rss_datetime(s: Optional[str]) -> Optional[datetime]:
if not s:
return None
s = s.strip()
if not s:
return None
formats = [
"%a, %d %b %Y %H:%M:%S %z",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S.%f%z",
"%a, %d %b %Y %H:%M:%S GMT",
"%Y-%m-%d %H:%M:%S",
]
for fmt in formats:
try:
return datetime.strptime(s, fmt)
except Exception:
pass
try:
from email.utils import parsedate_to_datetime
dt = parsedate_to_datetime(s)
return dt
except Exception:
return None
def unix_time() -> int:
return int(time.time())
def format_date(value, format="%Y-%m-%d %H:%M"):
if value is None:
return ""
if not isinstance(value, datetime):
return str(value)
return value.strftime(format)
# Country name (Spanish) to ISO 3166-1 alpha-2 code mapping
COUNTRY_ISO = {
"afganistán": "AF", "albania": "AL", "alemania": "DE", "andorra": "AD",
"angola": "AO", "antigua y barbuda": "AG", "arabia saudita": "SA",
"argelia": "DZ", "argentina": "AR", "armenia": "AM", "australia": "AU",
"austria": "AT", "azerbaiyán": "AZ", "bahamas": "BS", "bangladés": "BD",
"barbados": "BB", "baréin": "BH", "bélgica": "BE", "belice": "BZ",
"benín": "BJ", "bielorrusia": "BY", "birmania": "MM", "bolivia": "BO",
"bosnia y herzegovina": "BA", "botsuana": "BW", "brasil": "BR",
"brunéi": "BN", "bulgaria": "BG", "burkina faso": "BF", "burundi": "BI",
"bután": "BT", "cabo verde": "CV", "camboya": "KH", "camerún": "CM",
"canadá": "CA", "catar": "QA", "chad": "TD", "chile": "CL", "china": "CN",
"chipre": "CY", "colombia": "CO", "comoras": "KM", "corea del norte": "KP",
"corea del sur": "KR", "costa de marfil": "CI", "costa rica": "CR",
"croacia": "HR", "cuba": "CU", "dinamarca": "DK", "dominica": "DM",
"ecuador": "EC", "egipto": "EG", "el salvador": "SV",
"emiratos árabes unidos": "AE", "eritrea": "ER", "eslovaquia": "SK",
"eslovenia": "SI", "españa": "ES", "estados unidos": "US", "estonia": "EE",
"esuatini": "SZ", "etiopía": "ET", "filipinas": "PH", "finlandia": "FI",
"fiyi": "FJ", "francia": "FR", "gabón": "GA", "gambia": "GM",
"georgia": "GE", "ghana": "GH", "granada": "GD", "grecia": "GR",
"guatemala": "GT", "guinea": "GN", "guinea-bisáu": "GW",
"guinea ecuatorial": "GQ", "guyana": "GY", "haití": "HT", "honduras": "HN",
"hungría": "HU", "india": "IN", "indonesia": "ID", "irak": "IQ",
"irán": "IR", "irlanda": "IE", "islandia": "IS", "islas marshall": "MH",
"islas salomón": "SB", "israel": "IL", "italia": "IT", "jamaica": "JM",
"japón": "JP", "jordania": "JO", "kazajistán": "KZ", "kenia": "KE",
"kirguistán": "KG", "kiribati": "KI", "kuwait": "KW", "laos": "LA",
"lesoto": "LS", "letonia": "LV", "líbano": "LB", "liberia": "LR",
"libia": "LY", "liechtenstein": "LI", "lituania": "LT", "luxemburgo": "LU",
"macedonia del norte": "MK", "madagascar": "MG", "malasia": "MY",
"malaui": "MW", "maldivas": "MV", "malí": "ML", "malta": "MT",
"marruecos": "MA", "mauricio": "MU", "mauritania": "MR", "méxico": "MX",
"micronesia": "FM", "moldavia": "MD", "mónaco": "MC", "mongolia": "MN",
"montenegro": "ME", "mozambique": "MZ", "namibia": "NA", "nauru": "NR",
"nepal": "NP", "nicaragua": "NI", "níger": "NE", "nigeria": "NG",
"noruega": "NO", "nueva zelanda": "NZ", "omán": "OM", "países bajos": "NL",
"pakistán": "PK", "palaos": "PW", "palestina": "PS", "panamá": "PA",
"papúa nueva guinea": "PG", "paraguay": "PY", "perú": "PE", "polonia": "PL",
"portugal": "PT", "reino unido": "GB", "república centroafricana": "CF",
"república checa": "CZ", "república del congo": "CG",
"república democrática del congo": "CD", "república dominicana": "DO",
"ruanda": "RW", "rumanía": "RO", "rusia": "RU", "samoa": "WS",
"san cristóbal y nieves": "KN", "san marino": "SM",
"san vicente y las granadinas": "VC", "santa lucía": "LC",
"santo tomé y príncipe": "ST", "senegal": "SN", "serbia": "RS",
"seychelles": "SC", "sierra leona": "SL", "singapur": "SG", "siria": "SY",
"somalia": "SO", "sri lanka": "LK", "sudáfrica": "ZA", "sudán": "SD",
"sudán del sur": "SS", "suecia": "SE", "suiza": "CH", "surinam": "SR",
"tailandia": "TH", "tanzania": "TZ", "tayikistán": "TJ",
"timor oriental": "TL", "togo": "TG", "tonga": "TO",
"trinidad y tobago": "TT", "túnez": "TN", "turkmenistán": "TM",
"turquía": "TR", "tuvalu": "TV", "ucrania": "UA", "uganda": "UG",
"uruguay": "UY", "uzbekistán": "UZ", "vanuatu": "VU", "vaticano": "VA",
"venezuela": "VE", "vietnam": "VN", "yemen": "YE", "yibuti": "DJ",
"zambia": "ZM", "zimbabue": "ZW",
}
def country_flag(country_name: Optional[str]) -> str:
"""Convert country name to flag emoji using regional indicator symbols."""
if not country_name:
return ""
name = country_name.strip().lower()
iso_code = COUNTRY_ISO.get(name)
if not iso_code:
return ""
# Convert ISO code to flag emoji using regional indicator symbols
# A=🇦 is U+1F1E6, B=🇧 is U+1F1E7, etc.
return "".join(chr(0x1F1E6 + ord(c) - ord('A')) for c in iso_code.upper())

197
utils/qdrant_search.py Normal file
View file

@ -0,0 +1,197 @@
"""
Utilidad de búsqueda semántica con Qdrant.
Proporciona búsquedas vectoriales rápidas para noticias.
"""
import os
import time
from typing import List, Dict, Any, Optional
from qdrant_client import QdrantClient
from sentence_transformers import SentenceTransformer
# Configuración
QDRANT_HOST = os.environ.get("QDRANT_HOST", "localhost")
QDRANT_PORT = int(os.environ.get("QDRANT_PORT", "6333"))
QDRANT_COLLECTION = os.environ.get("QDRANT_COLLECTION_NAME", "news_vectors")
EMB_MODEL = os.environ.get("EMB_MODEL", "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
# Singleton para clientes globales
_qdrant_client: Optional[QdrantClient] = None
_embedding_model: Optional[SentenceTransformer] = None
def get_qdrant_client() -> QdrantClient:
"""
Obtiene el cliente de Qdrant (singleton).
Incluye verificación de salud y manejo de errores.
"""
global _qdrant_client
if _qdrant_client is None:
try:
print(f"🔌 Conectando a Qdrant en {QDRANT_HOST}:{QDRANT_PORT}")
_qdrant_client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT, timeout=5)
# Health check
collections = _qdrant_client.get_collections()
print(f"✅ Qdrant conectado. Colecciones: {[c.name for c in collections.collections]}")
except Exception as e:
print(f"❌ Error conectando a Qdrant: {e}")
_qdrant_client = None
raise
return _qdrant_client
def get_embedding_model() -> SentenceTransformer:
"""
Obtiene el modelo de embeddings (singleton).
"""
global _embedding_model
if _embedding_model is None:
_embedding_model = SentenceTransformer(EMB_MODEL, device='cpu')
return _embedding_model
def semantic_search(
query: str,
limit: int = 20,
score_threshold: float = 0.5,
filters: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
Realiza una búsqueda semántica en Qdrant.
"""
start_total = time.time()
try:
# Generar embedding de la consulta
t0 = time.time()
model = get_embedding_model()
query_vector = model.encode(query, convert_to_numpy=True).tolist()
t1 = time.time()
print(f"⏱️ [Timing] Generar embedding de query: {t1 - t0:.4f}s")
# Realizar búsqueda
try:
client = get_qdrant_client()
except Exception as conn_error:
print(f"⚠️ No se pudo conectar a Qdrant: {conn_error}")
return [] # Retornar lista vacía para activar fallback
search_params = {
"collection_name": QDRANT_COLLECTION,
"query_vector": query_vector,
"limit": limit,
"score_threshold": score_threshold
}
# Agregar filtros si existen
if filters:
from qdrant_client.models import Filter, FieldCondition, MatchValue
conditions = []
for key, value in filters.items():
if value is not None:
conditions.append(
FieldCondition(key=key, match=MatchValue(value=value))
)
if conditions:
search_params["query_filter"] = Filter(must=conditions)
t2 = time.time()
results = client.search(**search_params)
t3 = time.time()
print(f"⏱️ [Timing] Búsqueda en Qdrant: {t3 - t2:.4f}s")
print(f"⏱️ [Timing] Total semantic_search: {t3 - start_total:.4f}s")
print(f"✅ Qdrant retornó {len(results)} resultados")
# Formatear resultados
formatted_results = []
for hit in results:
formatted_results.append({
"score": hit.score,
"news_id": hit.payload.get("news_id"),
"traduccion_id": hit.payload.get("traduccion_id"),
"titulo": hit.payload.get("titulo", ""),
"resumen": hit.payload.get("resumen", ""),
"url": hit.payload.get("url", ""),
"fecha": hit.payload.get("fecha"),
"fuente_nombre": hit.payload.get("fuente_nombre", ""),
"categoria_id": hit.payload.get("categoria_id"),
"pais_id": hit.payload.get("pais_id"),
"lang": hit.payload.get("lang", "es")
})
return formatted_results
except Exception as e:
print(f"❌ Error en búsqueda semántica: {e}")
import traceback
traceback.print_exc()
return []
def hybrid_search(
query: str,
limit: int = 20,
semantic_weight: float = 0.7,
filters: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
Búsqueda híbrida: combina búsqueda semántica (Qdrant) con búsqueda tradicional.
Args:
query: Texto de búsqueda
limit: Número máximo de resultados
semantic_weight: Peso de la búsqueda semántica (0-1)
filters: Filtros adicionales
Returns:
Lista de resultados combinados
"""
# Por ahora, solo usar búsqueda semántica
# TODO: Implementar combinación con búsqueda PostgreSQL en futuro si es necesario
return semantic_search(query, limit=limit, filters=filters)
def search_by_keywords(
keywords: List[str],
limit: int = 100,
score_threshold: float = 0.4
) -> List[Dict[str, Any]]:
"""
Búsqueda por múltiples palabras clave.
Útil para el monitor de conflictos.
Args:
keywords: Lista de palabras clave
limit: Número máximo de resultados por keyword
score_threshold: Umbral mínimo de similitud
Returns:
Lista de resultados únicos
"""
all_results = {}
for keyword in keywords:
if not keyword.strip():
continue
results = semantic_search(
query=keyword,
limit=limit,
score_threshold=score_threshold
)
# Agregar a resultados, manteniendo el mejor score
for result in results:
news_id = result['news_id']
if news_id not in all_results or result['score'] > all_results[news_id]['score']:
all_results[news_id] = result
# Ordenar por score descendente
sorted_results = sorted(
all_results.values(),
key=lambda x: x['score'],
reverse=True
)
return sorted_results[:limit]

139
utils/wiki.py Normal file
View file

@ -0,0 +1,139 @@
import requests
import logging
from cache import cache_get, cache_set
from db import get_read_conn, get_write_conn
logger = logging.getLogger(__name__)
# Cache for 24 hours
CACHE_TTL = 86400
def fetch_wiki_data(name, entity_type=None):
"""
Fetch image URL AND summary from Wikipedia API for any entity.
Returns tuple: (image_url, summary)
"""
# 1. Check Cache
cache_key = f"wiki:data:{name.lower()}"
cached_data = cache_get(cache_key)
if cached_data is not None:
# Cache stores dict: {"image": url, "summary": text}
if isinstance(cached_data, dict):
return cached_data.get("image"), cached_data.get("summary")
# Legacy cache (string URL only)? Support migration or ignore
if isinstance(cached_data, str) and cached_data != "NO_IMAGE":
return cached_data, None
return None, None
# 2. Check Database
try:
with get_read_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT image_url, summary, summary_es FROM entity_images WHERE entity_name = %s", (name,))
row = cur.fetchone()
if row:
image_url, summary, summary_es = row
# Prefer the translated summary if it exists
final_summary = summary_es if summary_es else summary
# Update cache and return
cache_value = {"image": image_url, "summary": final_summary}
cache_set(cache_key, cache_value, ttl_seconds=CACHE_TTL)
return image_url, final_summary
except Exception as e:
logger.error(f"DB read error for {name}: {e}")
# 3. Fetch from Wikipedia
summary_en = None
summary_es = None
status_es = 'none'
image_url, summary = _query_wikipedia_api_full(name, lang='es')
if summary:
summary_es = summary
status_es = 'done'
else:
# Try English if Spanish failed
img_en, summ_en = _query_wikipedia_api_full(name, lang='en')
if summ_en:
summary = summ_en
summary_en = summ_en
status_es = 'pending'
if not image_url:
image_url = img_en
# 4. Persist to Database (found or not)
try:
with get_write_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO entity_images (entity_name, image_url, summary, summary_en, summary_es, status_es, last_checked)
VALUES (%s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (entity_name) DO UPDATE
SET image_url = EXCLUDED.image_url,
summary = EXCLUDED.summary,
summary_en = EXCLUDED.summary_en,
summary_es = EXCLUDED.summary_es,
status_es = EXCLUDED.status_es,
last_checked = NOW()
""", (name, image_url, summary, summary_en, summary_es, status_es))
conn.commit()
except Exception as e:
logger.error(f"DB write error for {name}: {e}")
# 5. Cache Result
cache_value = {"image": image_url, "summary": summary}
cache_set(cache_key, cache_value, ttl_seconds=CACHE_TTL)
return image_url, summary
def _query_wikipedia_api_full(query, lang='es'):
"""
Query Wikipedia API for thumbnail and summary.
"""
try:
url = f"https://{lang}.wikipedia.org/w/api.php"
params = {
"action": "query",
"format": "json",
"prop": "pageimages|extracts",
"piprop": "thumbnail",
"pithumbsize": 300, # Larger size requested
"exintro": 1,
"explaintext": 1,
"exchars": 400, # Limit chars
"titles": query,
"redirects": 1,
"origin": "*"
}
# Wikipedia requires a User-Agent
headers = {
"User-Agent": "NewsEntityStats/1.0 (internal_tool; contact@example.com)"
}
response = requests.get(url, params=params, headers=headers, timeout=2) # Fast timeout
data = response.json()
pages = data.get("query", {}).get("pages", {})
for page_id, page_data in pages.items():
if page_id == "-1":
continue # Not found
image_url = None
if "thumbnail" in page_data:
image_url = page_data["thumbnail"]["source"]
summary = page_data.get("extract")
if summary and "may refer to:" in summary: # Disambiguation page
summary = None
if image_url or summary:
return image_url, summary
except Exception as e:
logger.error(f"Error fetching wiki data for {query} ({lang}): {e}")
return None, None