import os import time import logging import re import psycopg2 import psycopg2.extras import spacy logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s') DB = dict( host=os.environ.get("DB_HOST", "localhost"), port=int(os.environ.get("DB_PORT", 5432)), dbname=os.environ.get("DB_NAME", "rss"), user=os.environ.get("DB_USER", "rss"), password=os.environ.get("DB_PASS", "x"), ) # Idioma de las traducciones que vamos a etiquetar NER_LANG = os.environ.get("NER_LANG", "es").strip().lower() # Tamaño de lote de traducciones a procesar por iteración BATCH = int(os.environ.get("NER_BATCH", 64)) # Mapeo de etiquetas de spaCy -> tipos de nuestro esquema ENT_LABELS = { "PERSON": "persona", "ORG": "organizacion", "GPE": "lugar", "LOC": "lugar", } # Normaliza el valor del tag (quita espacios extra, colapsa espacios internos) _ws_re = re.compile(r"\s+") def _clean_value(s: str) -> str: if not s: return "" s = s.strip() s = _ws_re.sub(" ", s) return s def get_conn(): return psycopg2.connect(**DB) def main(): # Nota: asumimos español porque el contenedor instala es_core_news_md en el Dockerfile. # Si quisieras soportar más idiomas, instala el modelo correspondiente y haz un mapping. nlp = spacy.load("es_core_news_md", disable=["parser", "lemmatizer", "textcat"]) logging.info("spaCy cargado: es_core_news_md") while True: try: with get_conn() as conn, conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: # Tomamos traducciones 'done' hacia NER_LANG que aún no tengan ninguna relación en tags_noticia cur.execute( """ WITH pend AS ( SELECT t.id, t.titulo_trad, t.resumen_trad FROM traducciones t LEFT JOIN tags_noticia tn ON tn.traduccion_id = t.id WHERE t.status = 'done' AND t.lang_to = %s GROUP BY t.id, t.titulo_trad, t.resumen_trad HAVING COUNT(tn.tag_id) = 0 ORDER BY t.id DESC LIMIT %s ) SELECT * FROM pend; """, (NER_LANG, BATCH), ) rows = cur.fetchall() if not rows: time.sleep(5) continue logging.info(f"Procesando {len(rows)} traducciones para NER...") new_links = 0 new_tags = 0 for r in rows: text = f"{r['titulo_trad'] or ''}\n{r['resumen_trad'] or ''}".strip() if not text: continue doc = nlp(text) ents = [] for ent in doc.ents: tipo = ENT_LABELS.get(ent.label_) if not tipo: continue val = _clean_value(ent.text) # filtros simples if len(val) < 2: continue ents.append((val, tipo)) if not ents: continue # Insertamos (o actualizamos si ya existe) el tag y luego la relación # IMPORTANTE: requiere UNIQUE(valor, tipo) en 'tags' y UNIQUE(traduccion_id, tag_id) en 'tags_noticia' for valor, tipo in set(ents): try: cur.execute( """ INSERT INTO tags (valor, tipo) VALUES (%s, %s) ON CONFLICT (valor, tipo) DO UPDATE SET valor = EXCLUDED.valor RETURNING id """, (valor, tipo), ) tag_id = cur.fetchone()[0] # Intenta crear la relación; si existe (por UNIQUE), se ignora cur.execute( """ INSERT INTO tags_noticia (traduccion_id, tag_id) VALUES (%s, %s) ON CONFLICT DO NOTHING """, (r["id"], tag_id), ) if cur.rowcount > 0: new_links += 1 # Heurística: si el tag se ha creado (no hay forma directa aquí), # lo aproximamos contando que el RETURNING vino de un insert o un update. # Para no complicar: cuenta enlaces nuevos, y deja 'new_tags' como métrica opcional. except Exception: # No abortar el lote por un único fallo en un valor raro. logging.exception("Fallo insertando tag/relación") conn.commit() logging.info(f"NER lote OK. Nuevos enlaces: {new_links}.") except Exception as e: logging.exception(f"Error en NER loop: {e}") time.sleep(5) if __name__ == "__main__": main()