334 lines
9.9 KiB
Python
334 lines
9.9 KiB
Python
"""
|
|
Worker de Qdrant
|
|
Vectoriza noticias traducidas y las sube a Qdrant para búsquedas semánticas.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import List, Dict, Any
|
|
|
|
# Añadir el directorio raíz al path
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from db import get_read_conn, get_write_conn
|
|
|
|
try:
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
|
|
except ImportError:
|
|
print("❌ Error: qdrant-client no instalado. Ejecuta: pip install qdrant-client")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
from sentence_transformers import SentenceTransformer
|
|
except ImportError:
|
|
print("❌ Error: sentence-transformers no instalado")
|
|
sys.exit(1)
|
|
|
|
# Configuración
|
|
QDRANT_HOST = os.environ.get("QDRANT_HOST", "localhost")
|
|
QDRANT_PORT = int(os.environ.get("QDRANT_PORT", "6333"))
|
|
QDRANT_COLLECTION = os.environ.get("QDRANT_COLLECTION_NAME", "news_vectors")
|
|
|
|
EMB_MODEL = os.environ.get("EMB_MODEL", "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
|
|
EMB_DEVICE = os.environ.get("EMB_DEVICE", "cuda")
|
|
BATCH_SIZE = int(os.environ.get("QDRANT_BATCH_SIZE", "100"))
|
|
SLEEP_IDLE = int(os.environ.get("QDRANT_SLEEP_IDLE", "30"))
|
|
|
|
# Cliente Qdrant global
|
|
qdrant_client = None
|
|
embedding_model = None
|
|
|
|
|
|
def init_qdrant_client():
|
|
"""
|
|
Inicializa el cliente de Qdrant y crea la colección si no existe.
|
|
"""
|
|
global qdrant_client
|
|
|
|
print(f"🔌 Conectando a Qdrant en {QDRANT_HOST}:{QDRANT_PORT}...")
|
|
qdrant_client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT)
|
|
|
|
# Verificar si la colección existe
|
|
collections = qdrant_client.get_collections().collections
|
|
collection_names = [c.name for c in collections]
|
|
|
|
if QDRANT_COLLECTION not in collection_names:
|
|
print(f"📦 Creando colección '{QDRANT_COLLECTION}'...")
|
|
|
|
# Obtener dimensión del modelo de embeddings
|
|
# paraphrase-multilingual-MiniLM-L12-v2 = 384 dimensiones
|
|
vector_size = 384
|
|
|
|
qdrant_client.create_collection(
|
|
collection_name=QDRANT_COLLECTION,
|
|
vectors_config=VectorParams(
|
|
size=vector_size,
|
|
distance=Distance.COSINE
|
|
)
|
|
)
|
|
print(f"✅ Colección '{QDRANT_COLLECTION}' creada (dimensión: {vector_size})")
|
|
else:
|
|
print(f"✅ Colección '{QDRANT_COLLECTION}' ya existe")
|
|
|
|
# Obtener info de la colección
|
|
collection_info = qdrant_client.get_collection(QDRANT_COLLECTION)
|
|
print(f"📊 Puntos en colección: {collection_info.points_count}")
|
|
|
|
|
|
def init_embedding_model():
|
|
"""
|
|
Inicializa el modelo de embeddings.
|
|
"""
|
|
global embedding_model
|
|
|
|
print(f"🤖 Cargando modelo de embeddings: {EMB_MODEL}")
|
|
print(f"🖥️ Dispositivo: {EMB_DEVICE}")
|
|
|
|
embedding_model = SentenceTransformer(EMB_MODEL, device=EMB_DEVICE)
|
|
|
|
print(f"✅ Modelo cargado correctamente")
|
|
|
|
|
|
def get_pending_news(limit: int = BATCH_SIZE) -> List[Dict[str, Any]]:
|
|
"""
|
|
Obtiene noticias traducidas pendientes de vectorizar.
|
|
|
|
Args:
|
|
limit: Número máximo de noticias a obtener
|
|
|
|
Returns:
|
|
Lista de noticias
|
|
"""
|
|
with get_read_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT
|
|
t.id as traduccion_id,
|
|
t.noticia_id,
|
|
t.lang_to as lang,
|
|
t.titulo_trad as titulo,
|
|
t.resumen_trad as resumen,
|
|
n.url,
|
|
n.fecha,
|
|
n.fuente_nombre,
|
|
n.categoria_id,
|
|
n.pais_id
|
|
FROM traducciones t
|
|
INNER JOIN noticias n ON t.noticia_id = n.id
|
|
WHERE t.vectorized = FALSE
|
|
AND t.status = 'done'
|
|
ORDER BY t.created_at ASC
|
|
LIMIT %s
|
|
""", (limit,))
|
|
|
|
columns = [desc[0] for desc in cur.description]
|
|
results = []
|
|
for row in cur.fetchall():
|
|
results.append(dict(zip(columns, row)))
|
|
|
|
return results
|
|
|
|
|
|
def generate_embeddings(texts: List[str]) -> List[List[float]]:
|
|
"""
|
|
Genera embeddings para una lista de textos.
|
|
|
|
Args:
|
|
texts: Lista de textos
|
|
|
|
Returns:
|
|
Lista de vectores de embeddings
|
|
"""
|
|
embeddings = embedding_model.encode(
|
|
texts,
|
|
batch_size=32,
|
|
show_progress_bar=False,
|
|
convert_to_numpy=True
|
|
)
|
|
return embeddings.tolist()
|
|
|
|
|
|
def upload_to_qdrant(news_batch: List[Dict[str, Any]]):
|
|
"""
|
|
Sube un lote de noticias a Qdrant.
|
|
|
|
Args:
|
|
news_batch: Lista de noticias
|
|
"""
|
|
if not news_batch:
|
|
return
|
|
|
|
# Preparar textos para embeddings (título + resumen)
|
|
texts = [
|
|
f"{news['titulo']} {news['resumen']}"
|
|
for news in news_batch
|
|
]
|
|
|
|
print(f" 🧮 Generando embeddings para {len(texts)} noticias...")
|
|
embeddings = generate_embeddings(texts)
|
|
|
|
# Preparar puntos para Qdrant
|
|
points = []
|
|
for news, embedding in zip(news_batch, embeddings):
|
|
point_id = str(uuid.uuid4())
|
|
|
|
# Preparar payload (metadata)
|
|
payload = {
|
|
"news_id": news['noticia_id'],
|
|
"traduccion_id": news['traduccion_id'],
|
|
"titulo": news['titulo'],
|
|
"resumen": news['resumen'],
|
|
"url": news['url'],
|
|
"fecha": news['fecha'].isoformat() if news['fecha'] else None,
|
|
"fuente_nombre": news['fuente_nombre'],
|
|
"categoria_id": news['categoria_id'],
|
|
"pais_id": news['pais_id'],
|
|
"lang": news['lang']
|
|
}
|
|
|
|
point = PointStruct(
|
|
id=point_id,
|
|
vector=embedding,
|
|
payload=payload
|
|
)
|
|
points.append(point)
|
|
|
|
# Guardar point_id para actualizar DB
|
|
news['qdrant_point_id'] = point_id
|
|
|
|
# Subir a Qdrant
|
|
print(f" ⬆️ Subiendo {len(points)} puntos a Qdrant...")
|
|
qdrant_client.upsert(
|
|
collection_name=QDRANT_COLLECTION,
|
|
points=points
|
|
)
|
|
|
|
# Actualizar base de datos
|
|
print(f" 💾 Actualizando estado en PostgreSQL...")
|
|
with get_write_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
for news in news_batch:
|
|
cur.execute("""
|
|
UPDATE traducciones
|
|
SET
|
|
vectorized = TRUE,
|
|
vectorization_date = NOW(),
|
|
qdrant_point_id = %s
|
|
WHERE id = %s
|
|
""", (news['qdrant_point_id'], news['traduccion_id']))
|
|
conn.commit()
|
|
|
|
print(f" ✅ Lote subido correctamente")
|
|
|
|
|
|
def process_batch():
|
|
"""
|
|
Procesa un lote de noticias traducidas.
|
|
|
|
Returns:
|
|
Número de noticias procesadas
|
|
"""
|
|
news_batch = get_pending_news()
|
|
|
|
if not news_batch:
|
|
return 0
|
|
|
|
print(f"\n📋 Procesando {len(news_batch)} noticias traducidas...")
|
|
|
|
try:
|
|
upload_to_qdrant(news_batch)
|
|
return len(news_batch)
|
|
except Exception as e:
|
|
print(f"❌ Error procesando lote: {e}")
|
|
return 0
|
|
|
|
|
|
def get_stats():
|
|
"""
|
|
Obtiene estadísticas del sistema.
|
|
"""
|
|
with get_read_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT
|
|
COUNT(*) as total,
|
|
COUNT(*) FILTER (WHERE vectorized = TRUE) as vectorizadas,
|
|
COUNT(*) FILTER (WHERE vectorized = FALSE AND status = 'done') as pendientes
|
|
FROM traducciones
|
|
WHERE lang_to = 'es'
|
|
""")
|
|
row = cur.fetchone()
|
|
return {
|
|
'total': row[0],
|
|
'vectorizadas': row[1],
|
|
'pendientes': row[2]
|
|
}
|
|
|
|
|
|
def main():
|
|
"""
|
|
Loop principal del worker.
|
|
"""
|
|
print("=" * 80)
|
|
print("🚀 Qdrant Vectorization Worker (Direct Translation)")
|
|
print("=" * 80)
|
|
print(f"Qdrant: {QDRANT_HOST}:{QDRANT_PORT}")
|
|
print(f"Colección: {QDRANT_COLLECTION}")
|
|
print(f"Modelo: {EMB_MODEL}")
|
|
print(f"Dispositivo: {EMB_DEVICE}")
|
|
print(f"Tamaño de lote: {BATCH_SIZE}")
|
|
print("=" * 80)
|
|
|
|
# Inicializar Qdrant
|
|
try:
|
|
init_qdrant_client()
|
|
except Exception as e:
|
|
print(f"❌ Error inicializando Qdrant: {e}")
|
|
print("⚠️ Asegúrate de que Qdrant esté corriendo")
|
|
return
|
|
|
|
# Inicializar modelo de embeddings
|
|
try:
|
|
init_embedding_model()
|
|
except Exception as e:
|
|
print(f"❌ Error cargando modelo de embeddings: {e}")
|
|
return
|
|
|
|
print("\n🔄 Iniciando loop de procesamiento...\n")
|
|
|
|
total_processed = 0
|
|
|
|
while True:
|
|
try:
|
|
processed = process_batch()
|
|
total_processed += processed
|
|
|
|
if processed > 0:
|
|
print(f"\n✅ Lote completado: {processed} noticias vectorizadas")
|
|
print(f"📊 Total procesado en esta sesión: {total_processed}")
|
|
|
|
# Mostrar estadísticas
|
|
stats = get_stats()
|
|
print(f"📈 Estadísticas globales:")
|
|
print(f" Total traducciones: {stats['total']}")
|
|
print(f" Vectorizadas: {stats['vectorizadas']}")
|
|
print(f" Pendientes: {stats['pendientes']}")
|
|
else:
|
|
print(f"💤 No hay noticias pendientes. Esperando {SLEEP_IDLE}s...")
|
|
time.sleep(SLEEP_IDLE)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n⏹️ Worker detenido por el usuario")
|
|
break
|
|
except Exception as e:
|
|
print(f"\n❌ Error en loop principal: {e}")
|
|
print(f"⏳ Esperando {SLEEP_IDLE}s antes de reintentar...")
|
|
time.sleep(SLEEP_IDLE)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|