Includes: FLUJOS app (Node/Flask/Python), FLUJOS_DATOS scripts (scrapers, Keras, Django) Excludes: MongoDB, scraped data, Wikipedia/WikiLeaks dumps, Python venv, node_modules
254 lines
11 KiB
Python
Executable file
254 lines
11 KiB
Python
Executable file
import os
|
|
import re
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
from collections import Counter
|
|
from datetime import datetime
|
|
from pymongo import MongoClient
|
|
from tqdm import tqdm
|
|
from multiprocessing import Pool, cpu_count
|
|
import psutil
|
|
import string
|
|
import nltk
|
|
from nltk.corpus import stopwords
|
|
|
|
# Descargar stopwords la primera vez
|
|
nltk.download('stopwords')
|
|
stop_words = set(stopwords.words('spanish'))
|
|
|
|
# Parámetros de configuración
|
|
SIMILARITY_THRESHOLD = 4.0
|
|
LOG_FILE = "pipeline_mongolo.log"
|
|
NUM_PROCESOS = 4 # Limitar a 4 procesos
|
|
|
|
# Configuración de logging con rotación de archivos
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.INFO)
|
|
|
|
handler = RotatingFileHandler(LOG_FILE, maxBytes=10*1024*1024, backupCount=5) # 10 MB por archivo, 5 archivos de respaldo
|
|
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
handler.setFormatter(formatter)
|
|
|
|
logger.addHandler(handler)
|
|
|
|
# Función para iniciar un cliente MongoDB
|
|
def init_mongo_client():
|
|
"""Inicializa un nuevo cliente MongoDB para cada proceso."""
|
|
return MongoClient('localhost', 27017)
|
|
|
|
# Función para preprocesar el texto
|
|
def preprocesar_texto(texto):
|
|
# Eliminar signos de puntuación
|
|
texto = texto.translate(str.maketrans('', '', string.punctuation))
|
|
# Convertir a minúsculas
|
|
texto = texto.lower()
|
|
# Eliminar stop words
|
|
palabras = texto.split()
|
|
palabras = [word for word in palabras if word not in stop_words]
|
|
return ' '.join(palabras)
|
|
|
|
# Función para asignar tema y subtema basado en el contenido del texto
|
|
def asignar_tema_y_subtema(texto):
|
|
tematicas = {
|
|
'inteligencia y seguridad': ['inteligencia', 'ciberseguridad', 'espionaje', 'seguridad nacional', 'contraterrorismo'],
|
|
'cambio climático': ['cambio climático', 'desastres naturales', 'conservación', 'energía renovable', 'escasez de agua'],
|
|
'guerra global': ['conflictos internacionales', 'guerras civiles', 'terrorismo', 'armas', 'alianzas militares'],
|
|
'demografía y sociedad': ['sobrepoblación', 'enfermedades', 'migraciones', 'urbanización', 'despoblación rural'],
|
|
'economía y corporaciones': ['economía global', 'corporaciones multinacionales', 'comercio internacional', 'organismos financieros', 'desigualdad económica']
|
|
}
|
|
|
|
texto_lower = texto.lower()
|
|
for tema, palabras_clave in tematicas.items():
|
|
for palabra_clave in palabras_clave:
|
|
if palabra_clave in texto_lower:
|
|
return tema, palabra_clave
|
|
return 'otros', 'general'
|
|
|
|
# Función para extraer la fecha del nombre del archivo
|
|
def extraer_fecha_de_nombre(nombre_archivo):
|
|
try:
|
|
fecha_str = re.search(r'\d{4}-\d{2}-\d{2}', nombre_archivo).group(0)
|
|
return datetime.strptime(fecha_str, '%Y-%m-%d')
|
|
except:
|
|
return None
|
|
|
|
# Función para contar palabras en un archivo tokenizado
|
|
def contar_palabras(nombre_archivo):
|
|
try:
|
|
with open(nombre_archivo, 'r', encoding='utf-8') as f:
|
|
palabras = f.read().split()
|
|
return Counter(palabras)
|
|
except Exception as e:
|
|
logging.error(f"Error al contar palabras en {nombre_archivo}: {e}")
|
|
return Counter()
|
|
|
|
# Función para comparar dos archivos y calcular el porcentaje de similitud
|
|
def comparar_archivos(archivo1, archivo2):
|
|
try:
|
|
conteo1 = contar_palabras(archivo1)
|
|
conteo2 = contar_palabras(archivo2)
|
|
|
|
palabras_comunes = set(conteo1.keys()) & set(conteo2.keys())
|
|
num_palabras_comunes = sum(min(conteo1[p], conteo2[p]) for p in palabras_comunes)
|
|
num_palabras_totales = sum(conteo1.values()) + sum(conteo2.values())
|
|
|
|
if num_palabras_totales == 0:
|
|
return 0
|
|
|
|
porcentaje_similitud = (num_palabras_comunes / num_palabras_totales) * 100
|
|
return porcentaje_similitud
|
|
except Exception as e:
|
|
logging.error(f"Error al comparar archivos {archivo1} y {archivo2}: {e}", exc_info=True)
|
|
return 0 # Retornar 0 en caso de error
|
|
|
|
# Función paralelizada para manejar las comparaciones
|
|
def manejar_comparacion_multiproceso(pair):
|
|
archivo1, archivo2 = pair
|
|
try:
|
|
# Inicia MongoDB dentro del proceso
|
|
client = init_mongo_client()
|
|
db = client['FLUJOS_DATOS']
|
|
comparaciones_collection = db['comparaciones']
|
|
|
|
nombre_archivo1 = os.path.basename(archivo1)
|
|
nombre_archivo2 = os.path.basename(archivo2)
|
|
|
|
porcentaje = comparar_archivos(archivo1, archivo2)
|
|
comparacion = {
|
|
'noticia1': nombre_archivo1,
|
|
'noticia2': nombre_archivo2,
|
|
'porcentaje_similitud': porcentaje
|
|
}
|
|
|
|
comparaciones_collection.insert_one(comparacion)
|
|
logging.info(f"Guardada comparación entre {nombre_archivo1} y {nombre_archivo2} con {porcentaje:.2f}% de similitud.")
|
|
except Exception as e:
|
|
logging.error(f"Error al manejar comparación {archivo1} vs {archivo2}: {e}", exc_info=True)
|
|
finally:
|
|
client.close()
|
|
|
|
# Función para verificar si el documento ya fue procesado
|
|
def documento_ya_subido(nombre_archivo, collection):
|
|
return collection.find_one({"archivo": nombre_archivo}) is not None
|
|
|
|
# Función para subir documentos a MongoDB
|
|
def subir_documento_a_mongodb(nombre_archivo, tema, subtema, fecha, texto, collection):
|
|
documento = {
|
|
'archivo': nombre_archivo,
|
|
'tema': tema,
|
|
'subtema': subtema,
|
|
'fecha': fecha,
|
|
'texto': texto # Subimos el texto original
|
|
}
|
|
try:
|
|
collection.insert_one(documento)
|
|
logging.info(f"Subido documento: {nombre_archivo} con tema: {tema}, subtema: {subtema}.")
|
|
except Exception as e:
|
|
logging.error(f"Error al subir documento {nombre_archivo}: {e}")
|
|
|
|
# Función para procesar documentos y subirlos a MongoDB
|
|
def procesar_documentos(directorio, collection):
|
|
archivos = [os.path.join(directorio, f) for f in os.listdir(directorio)]
|
|
print(f"Procesando {len(archivos)} archivos en el directorio: {directorio}")
|
|
|
|
# Subir documentos a MongoDB
|
|
with tqdm(total=len(archivos), desc=f"Subiendo documentos a MongoDB ({collection.name})", ncols=100) as pbar:
|
|
for archivo in archivos:
|
|
nombre_archivo = os.path.basename(archivo)
|
|
if documento_ya_subido(nombre_archivo, collection):
|
|
logging.info(f"Saltando {nombre_archivo}, ya está subido.")
|
|
pbar.update(1)
|
|
continue
|
|
|
|
fecha = extraer_fecha_de_nombre(nombre_archivo)
|
|
|
|
try:
|
|
with open(archivo, 'r', encoding='utf-8') as f:
|
|
texto_original = f.read()
|
|
texto_preprocesado = preprocesar_texto(texto_original)
|
|
tema, subtema = asignar_tema_y_subtema(texto_preprocesado)
|
|
subir_documento_a_mongodb(nombre_archivo, tema, subtema, fecha, texto_original, collection)
|
|
except Exception as e:
|
|
logging.error(f"Error al procesar archivo {archivo}: {e}")
|
|
|
|
pbar.update(1)
|
|
|
|
def manejar_comparaciones_multiproceso(directorios_tokenized):
|
|
try:
|
|
archivos_tokenizados = []
|
|
for directorio in directorios_tokenized:
|
|
archivos_directorio = [os.path.join(directorio, f) for f in os.listdir(directorio)]
|
|
archivos_tokenizados.append(archivos_directorio)
|
|
|
|
# Generar pares de archivos para comparar (combinaciones entre directorios)
|
|
pairs = []
|
|
for i in range(len(archivos_tokenizados)):
|
|
for j in range(i+1, len(archivos_tokenizados)):
|
|
for archivo1 in archivos_tokenizados[i]:
|
|
for archivo2 in archivos_tokenizados[j]:
|
|
pairs.append((archivo1, archivo2))
|
|
|
|
total_pairs = len(pairs)
|
|
logging.info(f"Total de pares a comparar: {total_pairs}")
|
|
|
|
# Procesar las comparaciones en lotes más pequeños para reducir el uso de memoria
|
|
batch_size = 1000 # Tamaño del lote
|
|
num_batches = (total_pairs // batch_size) + 1
|
|
|
|
logging.info(f"Procesando en {num_batches} lotes de {batch_size} pares cada uno.")
|
|
|
|
for batch_num in range(num_batches):
|
|
batch_start = batch_num * batch_size
|
|
batch_end = min(batch_start + batch_size, total_pairs)
|
|
batch_pairs = pairs[batch_start:batch_end]
|
|
|
|
logging.info(f"Procesando lote {batch_num + 1}/{num_batches}")
|
|
|
|
with Pool(processes=NUM_PROCESOS) as pool:
|
|
list(tqdm(pool.imap_unordered(manejar_comparacion_multiproceso, batch_pairs), total=len(batch_pairs), desc=f"Comparando archivos (Lote {batch_num + 1})", ncols=100))
|
|
except Exception as e:
|
|
logging.error(f"Error en manejar_comparaciones_multiproceso: {e}", exc_info=True)
|
|
|
|
def main():
|
|
try:
|
|
# Rutas absolutas de los archivos en formato txt
|
|
carpeta_noticias_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/NOTICIAS/articulos'
|
|
carpeta_wikipedia_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/WIKIPEDIA/articulos_wikipedia'
|
|
carpeta_torrents_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/TORRENTS/TORRENTS_WIKILEAKS_COMPLETO/txt'
|
|
|
|
# Carpetas con los archivos tokenizados para hacer las comparaciones
|
|
carpeta_noticias_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/NOTICIAS/tokenized'
|
|
carpeta_wikipedia_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/WIKIPEDIA/articulos_tokenizados'
|
|
carpeta_torrents_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/TORRENTS/TORRENTS_WIKILEAKS_COMPLETO/tokenized'
|
|
|
|
# Iniciar cliente MongoDB
|
|
client = init_mongo_client()
|
|
|
|
# Omitir la subida de documentos
|
|
print("Subiendo archivos originales de Noticias a MongoDB...")
|
|
logging.info("Iniciando subida de noticias a MongoDB")
|
|
procesar_documentos(carpeta_noticias_txt, client['FLUJOS_DATOS']['noticias'])
|
|
logging.info("Finalizada subida de noticias a MongoDB")
|
|
|
|
# print("Subiendo archivos originales de Wikipedia a MongoDB...")
|
|
logging.info("Iniciando subida de Wikipedia a MongoDB")
|
|
procesar_documentos(carpeta_wikipedia_txt, client['FLUJOS_DATOS']['wikipedia'])
|
|
logging.info("Finalizada subida de Wikipedia a MongoDB")
|
|
|
|
# print("Subiendo archivos originales de TORRENTS a MongoDB...")
|
|
logging.info("Iniciando subida de TORRENTS a MongoDB")
|
|
procesar_documentos(carpeta_torrents_txt, client['FLUJOS_DATOS']['torrents'])
|
|
logging.info("Finalizada subida de TORRENTS a MongoDB")
|
|
|
|
# Comparar archivos tokenizados y guardar comparaciones en MongoDB
|
|
print("Comparando textos tokenizados entre Noticias, Wikipedia y TORRENTS...")
|
|
logging.info("Iniciando comparaciones entre textos tokenizados")
|
|
manejar_comparaciones_multiproceso([carpeta_noticias_tokenized, carpeta_wikipedia_tokenized, carpeta_torrents_tokenized])
|
|
logging.info("Finalizadas comparaciones entre textos tokenizados")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error en la ejecución del script: {e}", exc_info=True)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|