import os import re import logging from logging.handlers import RotatingFileHandler from collections import Counter from datetime import datetime from pymongo import MongoClient from tqdm import tqdm from multiprocessing import Pool, cpu_count import psutil import string import nltk from nltk.corpus import stopwords # Descargar stopwords la primera vez nltk.download('stopwords') stop_words = set(stopwords.words('spanish')) # Parámetros de configuración SIMILARITY_THRESHOLD = 4.0 LOG_FILE = "pipeline_mongolo.log" NUM_PROCESOS = 4 # Limitar a 4 procesos # Configuración de logging con rotación de archivos logger = logging.getLogger() logger.setLevel(logging.INFO) handler = RotatingFileHandler(LOG_FILE, maxBytes=10*1024*1024, backupCount=5) # 10 MB por archivo, 5 archivos de respaldo formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) # Función para iniciar un cliente MongoDB def init_mongo_client(): """Inicializa un nuevo cliente MongoDB para cada proceso.""" return MongoClient('localhost', 27017) # Función para preprocesar el texto def preprocesar_texto(texto): # Eliminar signos de puntuación texto = texto.translate(str.maketrans('', '', string.punctuation)) # Convertir a minúsculas texto = texto.lower() # Eliminar stop words palabras = texto.split() palabras = [word for word in palabras if word not in stop_words] return ' '.join(palabras) # Función para asignar tema y subtema basado en el contenido del texto def asignar_tema_y_subtema(texto): tematicas = { 'inteligencia y seguridad': ['inteligencia', 'ciberseguridad', 'espionaje', 'seguridad nacional', 'contraterrorismo'], 'cambio climático': ['cambio climático', 'desastres naturales', 'conservación', 'energía renovable', 'escasez de agua'], 'guerra global': ['conflictos internacionales', 'guerras civiles', 'terrorismo', 'armas', 'alianzas militares'], 'demografía y sociedad': ['sobrepoblación', 'enfermedades', 'migraciones', 'urbanización', 'despoblación rural'], 'economía y corporaciones': ['economía global', 'corporaciones multinacionales', 'comercio internacional', 'organismos financieros', 'desigualdad económica'] } texto_lower = texto.lower() for tema, palabras_clave in tematicas.items(): for palabra_clave in palabras_clave: if palabra_clave in texto_lower: return tema, palabra_clave return 'otros', 'general' # Función para extraer la fecha del nombre del archivo def extraer_fecha_de_nombre(nombre_archivo): try: fecha_str = re.search(r'\d{4}-\d{2}-\d{2}', nombre_archivo).group(0) return datetime.strptime(fecha_str, '%Y-%m-%d') except: return None # Función para contar palabras en un archivo tokenizado def contar_palabras(nombre_archivo): try: with open(nombre_archivo, 'r', encoding='utf-8') as f: palabras = f.read().split() return Counter(palabras) except Exception as e: logging.error(f"Error al contar palabras en {nombre_archivo}: {e}") return Counter() # Función para comparar dos archivos y calcular el porcentaje de similitud def comparar_archivos(archivo1, archivo2): try: conteo1 = contar_palabras(archivo1) conteo2 = contar_palabras(archivo2) palabras_comunes = set(conteo1.keys()) & set(conteo2.keys()) num_palabras_comunes = sum(min(conteo1[p], conteo2[p]) for p in palabras_comunes) num_palabras_totales = sum(conteo1.values()) + sum(conteo2.values()) if num_palabras_totales == 0: return 0 porcentaje_similitud = (num_palabras_comunes / num_palabras_totales) * 100 return porcentaje_similitud except Exception as e: logging.error(f"Error al comparar archivos {archivo1} y {archivo2}: {e}", exc_info=True) return 0 # Retornar 0 en caso de error # Función paralelizada para manejar las comparaciones def manejar_comparacion_multiproceso(pair): archivo1, archivo2 = pair try: # Inicia MongoDB dentro del proceso client = init_mongo_client() db = client['FLUJOS_DATOS'] comparaciones_collection = db['comparaciones'] nombre_archivo1 = os.path.basename(archivo1) nombre_archivo2 = os.path.basename(archivo2) porcentaje = comparar_archivos(archivo1, archivo2) comparacion = { 'noticia1': nombre_archivo1, 'noticia2': nombre_archivo2, 'porcentaje_similitud': porcentaje } comparaciones_collection.insert_one(comparacion) logging.info(f"Guardada comparación entre {nombre_archivo1} y {nombre_archivo2} con {porcentaje:.2f}% de similitud.") except Exception as e: logging.error(f"Error al manejar comparación {archivo1} vs {archivo2}: {e}", exc_info=True) finally: client.close() # Función para verificar si el documento ya fue procesado def documento_ya_subido(nombre_archivo, collection): return collection.find_one({"archivo": nombre_archivo}) is not None # Función para subir documentos a MongoDB def subir_documento_a_mongodb(nombre_archivo, tema, subtema, fecha, texto, collection): documento = { 'archivo': nombre_archivo, 'tema': tema, 'subtema': subtema, 'fecha': fecha, 'texto': texto # Subimos el texto original } try: collection.insert_one(documento) logging.info(f"Subido documento: {nombre_archivo} con tema: {tema}, subtema: {subtema}.") except Exception as e: logging.error(f"Error al subir documento {nombre_archivo}: {e}") # Función para procesar documentos y subirlos a MongoDB def procesar_documentos(directorio, collection): archivos = [os.path.join(directorio, f) for f in os.listdir(directorio)] print(f"Procesando {len(archivos)} archivos en el directorio: {directorio}") # Subir documentos a MongoDB with tqdm(total=len(archivos), desc=f"Subiendo documentos a MongoDB ({collection.name})", ncols=100) as pbar: for archivo in archivos: nombre_archivo = os.path.basename(archivo) if documento_ya_subido(nombre_archivo, collection): logging.info(f"Saltando {nombre_archivo}, ya está subido.") pbar.update(1) continue fecha = extraer_fecha_de_nombre(nombre_archivo) try: with open(archivo, 'r', encoding='utf-8') as f: texto_original = f.read() texto_preprocesado = preprocesar_texto(texto_original) tema, subtema = asignar_tema_y_subtema(texto_preprocesado) subir_documento_a_mongodb(nombre_archivo, tema, subtema, fecha, texto_original, collection) except Exception as e: logging.error(f"Error al procesar archivo {archivo}: {e}") pbar.update(1) def manejar_comparaciones_multiproceso(directorios_tokenized): try: archivos_tokenizados = [] for directorio in directorios_tokenized: archivos_directorio = [os.path.join(directorio, f) for f in os.listdir(directorio)] archivos_tokenizados.append(archivos_directorio) # Generar pares de archivos para comparar (combinaciones entre directorios) pairs = [] for i in range(len(archivos_tokenizados)): for j in range(i+1, len(archivos_tokenizados)): for archivo1 in archivos_tokenizados[i]: for archivo2 in archivos_tokenizados[j]: pairs.append((archivo1, archivo2)) total_pairs = len(pairs) logging.info(f"Total de pares a comparar: {total_pairs}") # Procesar las comparaciones en lotes más pequeños para reducir el uso de memoria batch_size = 1000 # Tamaño del lote num_batches = (total_pairs // batch_size) + 1 logging.info(f"Procesando en {num_batches} lotes de {batch_size} pares cada uno.") for batch_num in range(num_batches): batch_start = batch_num * batch_size batch_end = min(batch_start + batch_size, total_pairs) batch_pairs = pairs[batch_start:batch_end] logging.info(f"Procesando lote {batch_num + 1}/{num_batches}") with Pool(processes=NUM_PROCESOS) as pool: list(tqdm(pool.imap_unordered(manejar_comparacion_multiproceso, batch_pairs), total=len(batch_pairs), desc=f"Comparando archivos (Lote {batch_num + 1})", ncols=100)) except Exception as e: logging.error(f"Error en manejar_comparaciones_multiproceso: {e}", exc_info=True) def main(): try: # Rutas absolutas de los archivos en formato txt carpeta_noticias_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/NOTICIAS/articulos' carpeta_wikipedia_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/WIKIPEDIA/articulos_wikipedia' carpeta_torrents_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/TORRENTS/TORRENTS_WIKILEAKS_COMPLETO/txt' # Carpetas con los archivos tokenizados para hacer las comparaciones carpeta_noticias_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/NOTICIAS/tokenized' carpeta_wikipedia_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/WIKIPEDIA/articulos_tokenizados' carpeta_torrents_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/TORRENTS/TORRENTS_WIKILEAKS_COMPLETO/tokenized' # Iniciar cliente MongoDB client = init_mongo_client() # Omitir la subida de documentos print("Subiendo archivos originales de Noticias a MongoDB...") logging.info("Iniciando subida de noticias a MongoDB") procesar_documentos(carpeta_noticias_txt, client['FLUJOS_DATOS']['noticias']) logging.info("Finalizada subida de noticias a MongoDB") # print("Subiendo archivos originales de Wikipedia a MongoDB...") logging.info("Iniciando subida de Wikipedia a MongoDB") procesar_documentos(carpeta_wikipedia_txt, client['FLUJOS_DATOS']['wikipedia']) logging.info("Finalizada subida de Wikipedia a MongoDB") # print("Subiendo archivos originales de TORRENTS a MongoDB...") logging.info("Iniciando subida de TORRENTS a MongoDB") procesar_documentos(carpeta_torrents_txt, client['FLUJOS_DATOS']['torrents']) logging.info("Finalizada subida de TORRENTS a MongoDB") # Comparar archivos tokenizados y guardar comparaciones en MongoDB print("Comparando textos tokenizados entre Noticias, Wikipedia y TORRENTS...") logging.info("Iniciando comparaciones entre textos tokenizados") manejar_comparaciones_multiproceso([carpeta_noticias_tokenized, carpeta_wikipedia_tokenized, carpeta_torrents_tokenized]) logging.info("Finalizadas comparaciones entre textos tokenizados") except Exception as e: logging.error(f"Error en la ejecución del script: {e}", exc_info=True) if __name__ == "__main__": main()