import os import re import logging from logging.handlers import RotatingFileHandler from collections import Counter from datetime import datetime from pymongo import MongoClient from tqdm import tqdm from multiprocessing import Pool import string import nltk from nltk.corpus import stopwords # Descargar stopwords la primera vez nltk.download('stopwords') stop_words = set(stopwords.words('spanish')) # Parámetros de configuración SIMILARITY_THRESHOLD = 4.0 LOG_FILE = "pipeline_mongolo.log" NUM_PROCESOS = 2 # Número de procesos para el Pool BATCH_SIZE = 50 # Tamaño del lote para archivos # Configuración de logging con rotación de archivos logger = logging.getLogger() logger.setLevel(logging.INFO) handler = RotatingFileHandler(LOG_FILE, maxBytes=10*1024*1024, backupCount=5) # 10 MB por archivo, 5 archivos de respaldo formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) # Función para iniciar un cliente MongoDB def init_mongo_client(): """Inicializa un nuevo cliente MongoDB para cada proceso.""" return MongoClient('localhost', 27017) # Función para preprocesar el texto def preprocesar_texto(texto): # Eliminar signos de puntuación texto = texto.translate(str.maketrans('', '', string.punctuation)) # Convertir a minúsculas texto = texto.lower() # Eliminar stop words palabras = texto.split() palabras = [word for word in palabras if word not in stop_words] return ' '.join(palabras) # Función para asignar tema y subtema basado en el contenido del texto def asignar_tema_y_subtema(texto): tematicas = { 'inteligencia y seguridad': ['inteligencia', 'ciberseguridad', 'espionaje', 'seguridad nacional', 'contraterrorismo'], 'cambio climático': ['cambio climático', 'desastres naturales', 'conservación', 'energía renovable', 'escasez de agua'], 'guerra global': ['conflictos internacionales', 'guerras civiles', 'terrorismo', 'armas', 'alianzas militares'], 'demografía y sociedad': ['sobrepoblación', 'enfermedades', 'migraciones', 'urbanización', 'despoblación rural'], 'economía y corporaciones': ['economía global', 'corporaciones multinacionales', 'comercio internacional', 'organismos financieros', 'desigualdad económica'] } texto_lower = texto.lower() for tema, palabras_clave in tematicas.items(): for palabra_clave in palabras_clave: if palabra_clave in texto_lower: return tema, palabra_clave return 'otros', 'general' # Función para extraer la fecha del nombre del archivo def extraer_fecha_de_nombre(nombre_archivo): try: fecha_str = re.search(r'\d{4}-\d{2}-\d{2}', nombre_archivo).group(0) return datetime.strptime(fecha_str, '%Y-%m-%d') except: return None # Función para contar palabras en un archivo tokenizado def contar_palabras(nombre_archivo): try: with open(nombre_archivo, 'r', encoding='utf-8') as f: palabras = f.read().split() return Counter(palabras) except Exception as e: logging.error(f"Error al contar palabras en {nombre_archivo}: {e}") return Counter() # Función para comparar dos archivos y calcular el porcentaje de similitud def comparar_archivos(archivo1, archivo2): try: conteo1 = contar_palabras(archivo1) conteo2 = contar_palabras(archivo2) palabras_comunes = set(conteo1.keys()) & set(conteo2.keys()) num_palabras_comunes = sum(min(conteo1[p], conteo2[p]) for p in palabras_comunes) num_palabras_totales = sum(conteo1.values()) + sum(conteo2.values()) if num_palabras_totales == 0: return 0 porcentaje_similitud = (num_palabras_comunes / num_palabras_totales) * 100 return porcentaje_similitud except Exception as e: logging.error(f"Error al comparar archivos {archivo1} y {archivo2}: {e}", exc_info=True) return 0 # Retornar 0 en caso de error # Función paralelizada para manejar las comparaciones def manejar_comparacion_multiproceso(pair): archivo1, archivo2 = pair try: # Inicia MongoDB dentro del proceso client = init_mongo_client() db = client['FLUJOS_DATOS'] comparaciones_collection = db['comparaciones'] nombre_archivo1 = os.path.basename(archivo1) nombre_archivo2 = os.path.basename(archivo2) porcentaje = comparar_archivos(archivo1, archivo2) comparacion = { 'noticia1': nombre_archivo1, 'noticia2': nombre_archivo2, 'porcentaje_similitud': porcentaje } comparaciones_collection.insert_one(comparacion) logging.info(f"Guardada comparación entre {nombre_archivo1} y {nombre_archivo2} con {porcentaje:.2f}% de similitud.") except Exception as e: logging.error(f"Error al manejar comparación {archivo1} vs {archivo2}: {e}", exc_info=True) finally: client.close() # Función para verificar si el documento ya fue procesado def documento_ya_subido(nombre_archivo, collection): return collection.find_one({"archivo": nombre_archivo}) is not None # Función para subir documentos a MongoDB def subir_documento_a_mongodb(nombre_archivo, tema, subtema, fecha, texto, collection): documento = { 'archivo': nombre_archivo, 'tema': tema, 'subtema': subtema, 'fecha': fecha, 'texto': texto # Subimos el texto original } try: collection.insert_one(documento) logging.info(f"Subido documento: {nombre_archivo} con tema: {tema}, subtema: {subtema}.") except Exception as e: logging.error(f"Error al subir documento {nombre_archivo}: {e}") # Función para procesar documentos y subirlos a MongoDB def procesar_documentos(directorio, collection): archivos = [os.path.join(directorio, f) for f in os.listdir(directorio)] print(f"Procesando {len(archivos)} archivos en el directorio: {directorio}") # Subir documentos a MongoDB with tqdm(total=len(archivos), desc=f"Subiendo documentos a MongoDB ({collection.name})", ncols=100) as pbar: for archivo in archivos: nombre_archivo = os.path.basename(archivo) if documento_ya_subido(nombre_archivo, collection): logging.info(f"Saltando {nombre_archivo}, ya está subido.") pbar.update(1) continue fecha = extraer_fecha_de_nombre(nombre_archivo) try: with open(archivo, 'r', encoding='utf-8') as f: texto_original = f.read() texto_preprocesado = preprocesar_texto(texto_original) tema, subtema = asignar_tema_y_subtema(texto_preprocesado) subir_documento_a_mongodb(nombre_archivo, tema, subtema, fecha, texto_original, collection) except Exception as e: logging.error(f"Error al procesar archivo {archivo}: {e}") pbar.update(1) # Función mejorada para manejar las comparaciones con menor uso de RAM def manejar_comparaciones_multiproceso(directorios_tokenized): try: archivos_tokenizados = [] for directorio in directorios_tokenized: archivos_directorio = [os.path.join(directorio, f) for f in os.listdir(directorio)] archivos_tokenizados.append(archivos_directorio) total_pairs = 0 # Procesar combinaciones para reducir el uso de memoria for i in range(len(archivos_tokenizados)): for j in range(i+1, len(archivos_tokenizados)): archivos1 = archivos_tokenizados[i] archivos2 = archivos_tokenizados[j] num_archivos1 = len(archivos1) num_archivos2 = len(archivos2) logging.info(f"Comparando {num_archivos1} archivos de {directorios_tokenized[i]} con {num_archivos2} archivos de {directorios_tokenized[j]}") # Procesar por bloques for idx1 in range(0, num_archivos1, BATCH_SIZE): batch_archivos1 = archivos1[idx1:idx1+BATCH_SIZE] for idx2 in range(0, num_archivos2, BATCH_SIZE): batch_archivos2 = archivos2[idx2:idx2+BATCH_SIZE] # Usar un generador para los pares def pair_generator(): for a1 in batch_archivos1: for a2 in batch_archivos2: yield (a1, a2) total_pairs_in_batch = len(batch_archivos1) * len(batch_archivos2) total_pairs += total_pairs_in_batch logging.info(f"Procesando batch de {total_pairs_in_batch} pares") with Pool(processes=NUM_PROCESOS) as pool: list(tqdm( pool.imap_unordered(manejar_comparacion_multiproceso, pair_generator()), total=total_pairs_in_batch, desc=f"Comparando archivos", ncols=100 )) logging.info(f"Total de pares comparados: {total_pairs}") except Exception as e: logging.error(f"Error en manejar_comparaciones_multiproceso: {e}", exc_info=True) def main(): try: # Rutas absolutas de los archivos en formato txt carpeta_noticias_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/NOTICIAS/articulos' carpeta_wikipedia_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/WIKIPEDIA/articulos_wikipedia' carpeta_torrents_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/TORRENTS/TORRENTS_WIKILEAKS_COMPLETO/txt' # Carpetas con los archivos tokenizados para hacer las comparaciones carpeta_noticias_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/NOTICIAS/tokenized' carpeta_wikipedia_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/WIKIPEDIA/articulos_tokenizados' carpeta_torrents_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/TORRENTS/TORRENTS_WIKILEAKS_COMPLETO/tokenized' # Iniciar cliente MongoDB client = init_mongo_client() # Subir las noticias, artículos de Wikipedia y TORRENTS a MongoDB en formato txt # print("Subiendo archivos originales de Noticias a MongoDB...") # logging.info("Iniciando subida de noticias a MongoDB") # procesar_documentos(carpeta_noticias_txt, client['FLUJOS_DATOS']['noticias']) # logging.info("Finalizada subida de noticias a MongoDB") #print("Subiendo archivos originales de Wikipedia a MongoDB...") #logging.info("Iniciando subida de Wikipedia a MongoDB") #procesar_documentos(carpeta_wikipedia_txt, client['FLUJOS_DATOS']['wikipedia']) #logging.info("Finalizada subida de Wikipedia a MongoDB") print("Subiendo archivos originales de TORRENTS a MongoDB...") logging.info("Iniciando subida de TORRENTS a MongoDB") procesar_documentos(carpeta_torrents_txt, client['FLUJOS_DATOS']['torrents']) logging.info("Finalizada subida de TORRENTS a MongoDB") # ==> Sólo comparaciones Wikipedia vs Wikipedia <== print("Comparando textos tokenizados de Wikipedia entre sí...") logging.info("Iniciando comparaciones sólo de Wikipedia") manejar_comparaciones_multiproceso([ carpeta_wikipedia_tokenized, carpeta_wikipedia_tokenized ]) logging.info("Finalizadas comparaciones de Wikipedia") # print("Comparando textos tokenizados entre Noticias, Wikipedia y TORRENTS...") # logging.info("Iniciando comparaciones entre textos tokenizados") # manejar_comparaciones_multiproceso([carpeta_noticias_tokenized, carpeta_wikipedia_tokenized, carpeta_torrents_tokenized]) # logging.info("Finalizadas comparaciones entre textos tokenizados") except Exception as e: logging.error(f"Error en la ejecución del script: {e}", exc_info=True) if __name__ == "__main__": main()