FLUJOS/FLUJOS_DATOS/COMPARACIONES/pipeline_mongolo.py
CAPITANSITO a40b946163 Initial commit - FLUJOS codebase (production branch)
Includes: FLUJOS app (Node/Flask/Python), FLUJOS_DATOS scripts (scrapers, Keras, Django)
Excludes: MongoDB, scraped data, Wikipedia/WikiLeaks dumps, Python venv, node_modules
2026-03-31 14:10:02 +02:00

276 lines
12 KiB
Python
Executable file

import os
import re
import logging
from logging.handlers import RotatingFileHandler
from collections import Counter
from datetime import datetime
from pymongo import MongoClient
from tqdm import tqdm
from multiprocessing import Pool
import string
import nltk
from nltk.corpus import stopwords
# Descargar stopwords la primera vez
nltk.download('stopwords')
stop_words = set(stopwords.words('spanish'))
# Parámetros de configuración
SIMILARITY_THRESHOLD = 4.0
LOG_FILE = "pipeline_mongolo.log"
NUM_PROCESOS = 2 # Número de procesos para el Pool
BATCH_SIZE = 50 # Tamaño del lote para archivos
# Configuración de logging con rotación de archivos
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = RotatingFileHandler(LOG_FILE, maxBytes=10*1024*1024, backupCount=5) # 10 MB por archivo, 5 archivos de respaldo
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# Función para iniciar un cliente MongoDB
def init_mongo_client():
"""Inicializa un nuevo cliente MongoDB para cada proceso."""
return MongoClient('localhost', 27017)
# Función para preprocesar el texto
def preprocesar_texto(texto):
# Eliminar signos de puntuación
texto = texto.translate(str.maketrans('', '', string.punctuation))
# Convertir a minúsculas
texto = texto.lower()
# Eliminar stop words
palabras = texto.split()
palabras = [word for word in palabras if word not in stop_words]
return ' '.join(palabras)
# Función para asignar tema y subtema basado en el contenido del texto
def asignar_tema_y_subtema(texto):
tematicas = {
'inteligencia y seguridad': ['inteligencia', 'ciberseguridad', 'espionaje', 'seguridad nacional', 'contraterrorismo'],
'cambio climático': ['cambio climático', 'desastres naturales', 'conservación', 'energía renovable', 'escasez de agua'],
'guerra global': ['conflictos internacionales', 'guerras civiles', 'terrorismo', 'armas', 'alianzas militares'],
'demografía y sociedad': ['sobrepoblación', 'enfermedades', 'migraciones', 'urbanización', 'despoblación rural'],
'economía y corporaciones': ['economía global', 'corporaciones multinacionales', 'comercio internacional', 'organismos financieros', 'desigualdad económica']
}
texto_lower = texto.lower()
for tema, palabras_clave in tematicas.items():
for palabra_clave in palabras_clave:
if palabra_clave in texto_lower:
return tema, palabra_clave
return 'otros', 'general'
# Función para extraer la fecha del nombre del archivo
def extraer_fecha_de_nombre(nombre_archivo):
try:
fecha_str = re.search(r'\d{4}-\d{2}-\d{2}', nombre_archivo).group(0)
return datetime.strptime(fecha_str, '%Y-%m-%d')
except:
return None
# Función para contar palabras en un archivo tokenizado
def contar_palabras(nombre_archivo):
try:
with open(nombre_archivo, 'r', encoding='utf-8') as f:
palabras = f.read().split()
return Counter(palabras)
except Exception as e:
logging.error(f"Error al contar palabras en {nombre_archivo}: {e}")
return Counter()
# Función para comparar dos archivos y calcular el porcentaje de similitud
def comparar_archivos(archivo1, archivo2):
try:
conteo1 = contar_palabras(archivo1)
conteo2 = contar_palabras(archivo2)
palabras_comunes = set(conteo1.keys()) & set(conteo2.keys())
num_palabras_comunes = sum(min(conteo1[p], conteo2[p]) for p in palabras_comunes)
num_palabras_totales = sum(conteo1.values()) + sum(conteo2.values())
if num_palabras_totales == 0:
return 0
porcentaje_similitud = (num_palabras_comunes / num_palabras_totales) * 100
return porcentaje_similitud
except Exception as e:
logging.error(f"Error al comparar archivos {archivo1} y {archivo2}: {e}", exc_info=True)
return 0 # Retornar 0 en caso de error
# Función paralelizada para manejar las comparaciones
def manejar_comparacion_multiproceso(pair):
archivo1, archivo2 = pair
try:
# Inicia MongoDB dentro del proceso
client = init_mongo_client()
db = client['FLUJOS_DATOS']
comparaciones_collection = db['comparaciones']
nombre_archivo1 = os.path.basename(archivo1)
nombre_archivo2 = os.path.basename(archivo2)
porcentaje = comparar_archivos(archivo1, archivo2)
comparacion = {
'noticia1': nombre_archivo1,
'noticia2': nombre_archivo2,
'porcentaje_similitud': porcentaje
}
comparaciones_collection.insert_one(comparacion)
logging.info(f"Guardada comparación entre {nombre_archivo1} y {nombre_archivo2} con {porcentaje:.2f}% de similitud.")
except Exception as e:
logging.error(f"Error al manejar comparación {archivo1} vs {archivo2}: {e}", exc_info=True)
finally:
client.close()
# Función para verificar si el documento ya fue procesado
def documento_ya_subido(nombre_archivo, collection):
return collection.find_one({"archivo": nombre_archivo}) is not None
# Función para subir documentos a MongoDB
def subir_documento_a_mongodb(nombre_archivo, tema, subtema, fecha, texto, collection):
documento = {
'archivo': nombre_archivo,
'tema': tema,
'subtema': subtema,
'fecha': fecha,
'texto': texto # Subimos el texto original
}
try:
collection.insert_one(documento)
logging.info(f"Subido documento: {nombre_archivo} con tema: {tema}, subtema: {subtema}.")
except Exception as e:
logging.error(f"Error al subir documento {nombre_archivo}: {e}")
# Función para procesar documentos y subirlos a MongoDB
def procesar_documentos(directorio, collection):
archivos = [os.path.join(directorio, f) for f in os.listdir(directorio)]
print(f"Procesando {len(archivos)} archivos en el directorio: {directorio}")
# Subir documentos a MongoDB
with tqdm(total=len(archivos), desc=f"Subiendo documentos a MongoDB ({collection.name})", ncols=100) as pbar:
for archivo in archivos:
nombre_archivo = os.path.basename(archivo)
if documento_ya_subido(nombre_archivo, collection):
logging.info(f"Saltando {nombre_archivo}, ya está subido.")
pbar.update(1)
continue
fecha = extraer_fecha_de_nombre(nombre_archivo)
try:
with open(archivo, 'r', encoding='utf-8') as f:
texto_original = f.read()
texto_preprocesado = preprocesar_texto(texto_original)
tema, subtema = asignar_tema_y_subtema(texto_preprocesado)
subir_documento_a_mongodb(nombre_archivo, tema, subtema, fecha, texto_original, collection)
except Exception as e:
logging.error(f"Error al procesar archivo {archivo}: {e}")
pbar.update(1)
# Función mejorada para manejar las comparaciones con menor uso de RAM
def manejar_comparaciones_multiproceso(directorios_tokenized):
try:
archivos_tokenizados = []
for directorio in directorios_tokenized:
archivos_directorio = [os.path.join(directorio, f) for f in os.listdir(directorio)]
archivos_tokenizados.append(archivos_directorio)
total_pairs = 0
# Procesar combinaciones para reducir el uso de memoria
for i in range(len(archivos_tokenizados)):
for j in range(i+1, len(archivos_tokenizados)):
archivos1 = archivos_tokenizados[i]
archivos2 = archivos_tokenizados[j]
num_archivos1 = len(archivos1)
num_archivos2 = len(archivos2)
logging.info(f"Comparando {num_archivos1} archivos de {directorios_tokenized[i]} con {num_archivos2} archivos de {directorios_tokenized[j]}")
# Procesar por bloques
for idx1 in range(0, num_archivos1, BATCH_SIZE):
batch_archivos1 = archivos1[idx1:idx1+BATCH_SIZE]
for idx2 in range(0, num_archivos2, BATCH_SIZE):
batch_archivos2 = archivos2[idx2:idx2+BATCH_SIZE]
# Usar un generador para los pares
def pair_generator():
for a1 in batch_archivos1:
for a2 in batch_archivos2:
yield (a1, a2)
total_pairs_in_batch = len(batch_archivos1) * len(batch_archivos2)
total_pairs += total_pairs_in_batch
logging.info(f"Procesando batch de {total_pairs_in_batch} pares")
with Pool(processes=NUM_PROCESOS) as pool:
list(tqdm(
pool.imap_unordered(manejar_comparacion_multiproceso, pair_generator()),
total=total_pairs_in_batch,
desc=f"Comparando archivos",
ncols=100
))
logging.info(f"Total de pares comparados: {total_pairs}")
except Exception as e:
logging.error(f"Error en manejar_comparaciones_multiproceso: {e}", exc_info=True)
def main():
try:
# Rutas absolutas de los archivos en formato txt
carpeta_noticias_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/NOTICIAS/articulos'
carpeta_wikipedia_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/WIKIPEDIA/articulos_wikipedia'
carpeta_torrents_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/TORRENTS/TORRENTS_WIKILEAKS_COMPLETO/txt'
# Carpetas con los archivos tokenizados para hacer las comparaciones
carpeta_noticias_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/NOTICIAS/tokenized'
carpeta_wikipedia_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/WIKIPEDIA/articulos_tokenizados'
carpeta_torrents_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/TORRENTS/TORRENTS_WIKILEAKS_COMPLETO/tokenized'
# Iniciar cliente MongoDB
client = init_mongo_client()
# Subir las noticias, artículos de Wikipedia y TORRENTS a MongoDB en formato txt
# print("Subiendo archivos originales de Noticias a MongoDB...")
# logging.info("Iniciando subida de noticias a MongoDB")
# procesar_documentos(carpeta_noticias_txt, client['FLUJOS_DATOS']['noticias'])
# logging.info("Finalizada subida de noticias a MongoDB")
#print("Subiendo archivos originales de Wikipedia a MongoDB...")
#logging.info("Iniciando subida de Wikipedia a MongoDB")
#procesar_documentos(carpeta_wikipedia_txt, client['FLUJOS_DATOS']['wikipedia'])
#logging.info("Finalizada subida de Wikipedia a MongoDB")
print("Subiendo archivos originales de TORRENTS a MongoDB...")
logging.info("Iniciando subida de TORRENTS a MongoDB")
procesar_documentos(carpeta_torrents_txt, client['FLUJOS_DATOS']['torrents'])
logging.info("Finalizada subida de TORRENTS a MongoDB")
# ==> Sólo comparaciones Wikipedia vs Wikipedia <==
print("Comparando textos tokenizados de Wikipedia entre sí...")
logging.info("Iniciando comparaciones sólo de Wikipedia")
manejar_comparaciones_multiproceso([
carpeta_wikipedia_tokenized,
carpeta_wikipedia_tokenized
])
logging.info("Finalizadas comparaciones de Wikipedia")
# print("Comparando textos tokenizados entre Noticias, Wikipedia y TORRENTS...")
# logging.info("Iniciando comparaciones entre textos tokenizados")
# manejar_comparaciones_multiproceso([carpeta_noticias_tokenized, carpeta_wikipedia_tokenized, carpeta_torrents_tokenized])
# logging.info("Finalizadas comparaciones entre textos tokenizados")
except Exception as e:
logging.error(f"Error en la ejecución del script: {e}", exc_info=True)
if __name__ == "__main__":
main()