diff --git a/feed_processor.py b/feed_processor.py index 60e408f..e58d612 100644 --- a/feed_processor.py +++ b/feed_processor.py @@ -1,5 +1,5 @@ import hashlib -import re # <-- CORRECCIÓN: Se importa el módulo de expresiones regulares +import re from datetime import datetime import logging import feedparser @@ -7,7 +7,7 @@ from bs4 import BeautifulSoup import requests import xml.sax._exceptions -NETWORK_TIMEOUT = 15 # segundos +NETWORK_TIMEOUT = 15 def process_single_feed(feed_data): """ @@ -55,14 +55,12 @@ def process_single_feed(feed_data): noticia_id = hashlib.md5(link.encode()).hexdigest() titulo = entry.get("title", "Sin título") - # --- MEJORA: Se prioriza el contenido completo sobre el resumen para obtener más texto --- resumen_html = "" if hasattr(entry, 'content') and entry.content: resumen_html = entry.content[0].value elif hasattr(entry, 'summary'): resumen_html = entry.summary - # --- CORRECCIÓN: Limpia los códigos de media personalizados y otros artefactos --- if resumen_html: resumen_html = re.sub(r'\[\[\{.*?\}\]\]', '', resumen_html) @@ -84,8 +82,17 @@ def process_single_feed(feed_data): fecha_publicacion = datetime(*entry.updated_parsed[:6]) noticias_encontradas.append( - (noticia_id, titulo, resumen_texto_plano, link, fecha_publicacion, - imagen_url, feed_nombre, feed_data['categoria_id'], feed_data['pais_id']) + ( + noticia_id, + titulo, + resumen_texto_plano, + link, + fecha_publicacion, + imagen_url, + feed_nombre, + feed_data['categoria_id'], + feed_data['pais_id'] + ) ) new_etag = response.headers.get('ETag') @@ -100,3 +107,4 @@ def process_single_feed(feed_data): logging.error(f"Excepción inesperada al procesar el feed {feed_url} (ID: {feed_id}): {e}", exc_info=True) return feed_id, noticias_encontradas, new_etag, new_modified, success + diff --git a/ner_worker.py b/ner_worker.py index a6f6f9f..5ced07f 100644 --- a/ner_worker.py +++ b/ner_worker.py @@ -36,12 +36,11 @@ _ws_re = re.compile(r"\s+") HTML_TRASH_PATTERNS = [ r"<[^>]+>", r"&[a-z]+;", - r"&#\d+;?", # entidades numéricas tipo … + r"&#\d+;?", r'width="\d+"', r'height="\d+"', ] -# Palabras/sintagmas demasiado genéricos o claramente ruido GENERIC_BAD_TAGS = { "república", "estado", @@ -110,7 +109,6 @@ TOPIC_MAX_PER_DOC = 15 def _looks_like_attr_or_path(text_lower: str) -> bool: - """Detecta cosas tipo rutas, atributos HTML, ids raros, etc.""" if text_lower.startswith("/"): return True if "http://" in text_lower or "https://" in text_lower: @@ -125,20 +123,16 @@ def _looks_like_attr_or_path(text_lower: str) -> bool: return True if re.search(r"&#\d+;?", text_lower): return True - # cosas tipo atributo=valor if "=" in text_lower and " " not in text_lower.strip(): return True - # cadenas largas sin espacios (ids, hashes…) if re.fullmatch(r"[a-z0-9_]{15,}", text_lower.replace("-", "").replace(" ", "")): return True - # palabra única con guión suele ser ruta/slug: wp-content, internal-photos… if "-" in text_lower and " " not in text_lower: return True return False def clean_tag_text(text: str) -> str | None: - """Limpieza para entidades (PERSON/ORG/GPE/LOC).""" if not text: return None text = BeautifulSoup(text, "html.parser").get_text() @@ -172,7 +166,6 @@ def clean_tag_text(text: str) -> str | None: def clean_topic_text(text: str) -> str | None: - """Limpieza para posibles 'temas' (noun_chunks).""" if not text: return None text = BeautifulSoup(text, "html.parser").get_text() @@ -184,11 +177,9 @@ def clean_topic_text(text: str) -> str | None: return None lower = text.lower() - if _looks_like_attr_or_path(lower): return None - # tokenizamos en minúsculas y quitamos puntuación tokens = [ t.strip(string.punctuation) for t in lower.split() @@ -197,13 +188,11 @@ def clean_topic_text(text: str) -> str | None: if not tokens: return None - # quitamos artículo inicial si lo hay if tokens and tokens[0] in ARTICLES: tokens = tokens[1:] if not tokens: return None - # reconstruimos texto normalizado sin artículo norm = " ".join(tokens).strip() if len(norm) < TOPIC_MIN_CHARS: return None @@ -211,15 +200,12 @@ def clean_topic_text(text: str) -> str | None: if norm in GENERIC_BAD_TAGS: return None - # límite máximo de palabras if len(tokens) > TOPIC_MAX_WORDS: return None - # todos stopwords => fuera if all(t in STOPWORDS for t in tokens): return None - # sólo números/fechas if re.fullmatch(r"[0-9\s\.,\-:/]+", norm): return None @@ -239,7 +225,6 @@ def extract_entities_and_topics(nlp, text: str) -> Tuple[List[Tuple[str, str]], doc = nlp(text) - # Entidades "clásicas" for ent in doc.ents: tipo = ENT_LABELS.get(ent.label_) if not tipo: @@ -249,7 +234,6 @@ def extract_entities_and_topics(nlp, text: str) -> Tuple[List[Tuple[str, str]], continue ents.append((val, tipo)) - # Candidatos a "tema" a partir de noun_chunks topic_counter: Counter[str] = Counter() for chunk in doc.noun_chunks: @@ -265,7 +249,6 @@ def extract_entities_and_topics(nlp, text: str) -> Tuple[List[Tuple[str, str]], continue topics.append((val, "tema")) - # quitamos duplicados ents = list(set(ents)) topics = list(set(topics)) return ents, topics diff --git a/related_worker.py b/related_worker.py index a9b8b08..dc1d4b2 100644 --- a/related_worker.py +++ b/related_worker.py @@ -20,10 +20,10 @@ DB = dict( password=os.environ.get("DB_PASS", "x"), ) -TOPK = int(os.environ.get("RELATED_TOPK", 10)) -BATCH_IDS = int(os.environ.get("RELATED_BATCH_IDS", 200)) -SLEEP_IDLE = float(os.environ.get("RELATED_SLEEP", 10)) -MIN_SCORE = float(os.environ.get("RELATED_MIN_SCORE", 0.0)) +TOPK = int(os.environ.get("RELATED_TOPK", 10)) +BATCH_IDS = int(os.environ.get("RELATED_BATCH_IDS", 200)) +SLEEP_IDLE = float(os.environ.get("RELATED_SLEEP", 10)) +MIN_SCORE = float(os.environ.get("RELATED_MIN_SCORE", 0.0)) WINDOW_HOURS = int(os.environ.get("RELATED_WINDOW_H", 0)) @@ -31,9 +31,6 @@ def get_conn(): return psycopg2.connect(**DB) -# --------------------------------------------------------- -# Cargar embeddings SOLO de traducciones en español (lang_to='es') -# --------------------------------------------------------- def _fetch_all_embeddings(cur): base_sql = """ SELECT e.traduccion_id, e.vec @@ -49,8 +46,8 @@ def _fetch_all_embeddings(cur): params.append(f"{WINDOW_HOURS} hours") cur.execute(base_sql, params) - rows = cur.fetchall() + if not rows: return [], None @@ -66,10 +63,7 @@ def _fetch_all_embeddings(cur): if not ids: return [], None - # Convertimos a matriz numpy mat = np.array(vecs, dtype=np.float32) - - # Normalizamos (evita división por 0) norms = np.linalg.norm(mat, axis=1, keepdims=True) norms[norms == 0] = 1e-8 mat = mat / norms @@ -77,9 +71,6 @@ def _fetch_all_embeddings(cur): return ids, mat -# --------------------------------------------------------- -# Obtiene IDs pendientes -# --------------------------------------------------------- def _fetch_pending_ids(cur, limit) -> List[int]: cur.execute( """ @@ -99,44 +90,24 @@ def _fetch_pending_ids(cur, limit) -> List[int]: return [r[0] for r in cur.fetchall()] -# --------------------------------------------------------- -# TOP-K usando NumPy (súper rápido) -# --------------------------------------------------------- -def _topk_numpy( - idx: int, - ids_all: List[int], - mat: np.ndarray, - K: int -) -> List[Tuple[int, float]]: - - # vector de la noticia central - q = mat[idx] # (dim,) - - # similitudes coseno: dot product (matriz · vector) +def _topk_numpy(idx: int, ids_all: List[int], mat: np.ndarray, K: int) -> List[Tuple[int, float]]: + q = mat[idx] sims = np.dot(mat, q) - - # eliminar self-match sims[idx] = -999.0 - # filtramos por score mínimo if MIN_SCORE > 0: mask = sims >= MIN_SCORE sims = np.where(mask, sims, -999.0) - # obtenemos los índices top-k (mucho más rápido que ordenar todo) if K >= len(sims): top_idx = np.argsort(-sims) else: part = np.argpartition(-sims, K)[:K] top_idx = part[np.argsort(-sims[part])] - out = [(ids_all[j], float(sims[j])) for j in top_idx[:K]] - return out + return [(ids_all[j], float(sims[j])) for j in top_idx[:K]] -# --------------------------------------------------------- -# Inserta en la tabla related_noticias -# --------------------------------------------------------- def _insert_related(cur, tr_id: int, pairs: List[Tuple[int, float]]): if not pairs: return @@ -152,9 +123,6 @@ def _insert_related(cur, tr_id: int, pairs: List[Tuple[int, float]]): ) -# --------------------------------------------------------- -# Procesar IDs objetivo -# --------------------------------------------------------- def build_for_ids(conn, target_ids: List[int]) -> int: with conn.cursor() as cur: ids_all, mat = _fetch_all_embeddings(cur) @@ -162,7 +130,6 @@ def build_for_ids(conn, target_ids: List[int]) -> int: if not ids_all or mat is None: return 0 - # Mapa ID → index pos = {tid: i for i, tid in enumerate(ids_all)} processed = 0 @@ -181,9 +148,6 @@ def build_for_ids(conn, target_ids: List[int]) -> int: return processed -# --------------------------------------------------------- -# MAIN -# --------------------------------------------------------- def main(): logging.info( "Iniciando related_worker (TOPK=%s, BATCH_IDS=%s, MIN_SCORE=%.3f, WINDOW_H=%s)", @@ -192,6 +156,7 @@ def main(): MIN_SCORE, WINDOW_HOURS, ) + while True: try: with get_conn() as conn, conn.cursor() as cur: diff --git a/scheduler.py b/scheduler.py index b6b517f..255173f 100644 --- a/scheduler.py +++ b/scheduler.py @@ -1,5 +1,3 @@ -# /home/x/rss/scheduler.py - import time import logging import atexit @@ -7,33 +5,23 @@ from datetime import datetime, timedelta import sys from apscheduler.schedulers.background import BackgroundScheduler -# Importamos la app (para el contexto) y la función de captura desde nuestro fichero principal -# --- CORRECCIÓN 1: Se cambió 'fetch_and_store' por 'fetch_and_store_all' --- from app import app, fetch_and_store_all -# Es importante configurar el logging también para este proceso logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='[%(asctime)s] %(levelname)s in %(module)s: %(message)s') -# Creamos el planificador con la zona horaria UTC para evitar ambigüedades scheduler = BackgroundScheduler(daemon=True, timezone="UTC") -# --- Apagado robusto con atexit --- def shutdown_scheduler(): """Función para detener el planificador de forma segura al salir.""" if scheduler.running: scheduler.shutdown() logging.info("Scheduler del worker detenido.") -# Registramos la función de apagado para que se ejecute al terminar el proceso atexit.register(shutdown_scheduler) if __name__ == '__main__': - # Usamos el contexto de la aplicación para asegurar que la tarea tiene acceso a todo lo necesario with app.app_context(): - # Añadimos la tarea para que se ejecute cada 3 minutos - # La primera ejecución será a los 10 segundos de arrancar el worker scheduler.add_job( - # --- CORRECCIÓN 2: Se cambió 'fetch_and_store' por 'fetch_and_store_all' --- fetch_and_store_all, "interval", minutes=10, @@ -41,18 +29,14 @@ if __name__ == '__main__': next_run_time=datetime.utcnow() + timedelta(seconds=10) ) - # Arrancamos el planificador scheduler.start() logging.info("Worker del Scheduler iniciado. Tarea programada cada 3 minutos.") logging.info("El proceso se mantendrá en ejecución para permitir que se ejecuten las tareas.") - # Este bucle infinito simplemente mantiene vivo el script del worker. - # El apagado ahora es manejado por atexit. try: while True: time.sleep(60) except (KeyboardInterrupt, SystemExit): logging.info("Apagando el worker...") - diff --git a/url_processor.py b/url_processor.py index 83e97a5..f89514a 100644 --- a/url_processor.py +++ b/url_processor.py @@ -11,22 +11,16 @@ def _process_individual_article(article_url, config): Está diseñada para ser ejecutada en un hilo separado. """ try: - # Es crucial crear un nuevo objeto Article dentro de cada hilo. article = newspaper.Article(article_url, config=config) article.download() - - # Un artículo necesita ser parseado para tener título, texto, etc. article.parse() - # Si no se pudo obtener título o texto, no es un artículo válido. if not article.title or not article.text: return None - # El método nlp() es necesario para el resumen. article.nlp() return article except Exception: - # Ignoramos errores en artículos individuales (p.ej., enlaces rotos, etc.) return None def process_newspaper_url(source_name, url, categoria_id, pais_id, idioma='es'): @@ -41,39 +35,31 @@ def process_newspaper_url(source_name, url, categoria_id, pais_id, idioma='es'): try: config = Config() config.browser_user_agent = 'RssApp/1.0 (Scraper)' - config.request_timeout = 15 # Timeout más corto para artículos individuales. - config.memoize_articles = False # No guardar en caché para obtener siempre lo último. + config.request_timeout = 15 + config.memoize_articles = False - # Usamos el idioma proporcionado para mejorar la extracción source = newspaper.build(url, config=config, language=idioma) - # Limitar el número de artículos para no sobrecargar el servidor. articles_to_process = source.articles[:25] logging.info(f"Fuente construida. Procesando {len(articles_to_process)} artículos en paralelo...") - # Usamos un ThreadPoolExecutor para procesar los artículos concurrentemente. with ThreadPoolExecutor(max_workers=10) as executor: - # Creamos un futuro para cada URL de artículo. future_to_article = {executor.submit(_process_individual_article, article.url, config): article for article in articles_to_process} for future in as_completed(future_to_article): processed_article = future.result() - # Si el artículo se procesó correctamente, lo añadimos a la lista. if processed_article: noticia_id = hashlib.md5(processed_article.url.encode()).hexdigest() if processed_article.summary: resumen = processed_article.summary else: - # Fallback a un extracto del texto si no hay resumen. resumen = (processed_article.text[:400] + '...') if len(processed_article.text) > 400 else processed_article.text fecha = processed_article.publish_date if processed_article.publish_date else datetime.now() - # --- LÍNEA CLAVE --- - # Añadimos 'source_name' a la tupla de datos todas_las_noticias.append(( noticia_id, processed_article.title,