Initial commit - FLUJOS codebase (production branch)
Includes: FLUJOS app (Node/Flask/Python), FLUJOS_DATOS scripts (scrapers, Keras, Django) Excludes: MongoDB, scraped data, Wikipedia/WikiLeaks dumps, Python venv, node_modules
This commit is contained in:
commit
a40b946163
158 changed files with 196645 additions and 0 deletions
254
FLUJOS_DATOS/COMPARACIONES/pipeline_completo.py
Executable file
254
FLUJOS_DATOS/COMPARACIONES/pipeline_completo.py
Executable file
|
|
@ -0,0 +1,254 @@
|
|||
import os
|
||||
import re
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from collections import Counter
|
||||
from datetime import datetime
|
||||
from pymongo import MongoClient
|
||||
from tqdm import tqdm
|
||||
from multiprocessing import Pool, cpu_count
|
||||
import psutil
|
||||
import string
|
||||
import nltk
|
||||
from nltk.corpus import stopwords
|
||||
|
||||
# Descargar stopwords la primera vez
|
||||
nltk.download('stopwords')
|
||||
stop_words = set(stopwords.words('spanish'))
|
||||
|
||||
# Parámetros de configuración
|
||||
SIMILARITY_THRESHOLD = 4.0
|
||||
LOG_FILE = "pipeline_mongolo.log"
|
||||
NUM_PROCESOS = 4 # Limitar a 4 procesos
|
||||
|
||||
# Configuración de logging con rotación de archivos
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
handler = RotatingFileHandler(LOG_FILE, maxBytes=10*1024*1024, backupCount=5) # 10 MB por archivo, 5 archivos de respaldo
|
||||
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
logger.addHandler(handler)
|
||||
|
||||
# Función para iniciar un cliente MongoDB
|
||||
def init_mongo_client():
|
||||
"""Inicializa un nuevo cliente MongoDB para cada proceso."""
|
||||
return MongoClient('localhost', 27017)
|
||||
|
||||
# Función para preprocesar el texto
|
||||
def preprocesar_texto(texto):
|
||||
# Eliminar signos de puntuación
|
||||
texto = texto.translate(str.maketrans('', '', string.punctuation))
|
||||
# Convertir a minúsculas
|
||||
texto = texto.lower()
|
||||
# Eliminar stop words
|
||||
palabras = texto.split()
|
||||
palabras = [word for word in palabras if word not in stop_words]
|
||||
return ' '.join(palabras)
|
||||
|
||||
# Función para asignar tema y subtema basado en el contenido del texto
|
||||
def asignar_tema_y_subtema(texto):
|
||||
tematicas = {
|
||||
'inteligencia y seguridad': ['inteligencia', 'ciberseguridad', 'espionaje', 'seguridad nacional', 'contraterrorismo'],
|
||||
'cambio climático': ['cambio climático', 'desastres naturales', 'conservación', 'energía renovable', 'escasez de agua'],
|
||||
'guerra global': ['conflictos internacionales', 'guerras civiles', 'terrorismo', 'armas', 'alianzas militares'],
|
||||
'demografía y sociedad': ['sobrepoblación', 'enfermedades', 'migraciones', 'urbanización', 'despoblación rural'],
|
||||
'economía y corporaciones': ['economía global', 'corporaciones multinacionales', 'comercio internacional', 'organismos financieros', 'desigualdad económica']
|
||||
}
|
||||
|
||||
texto_lower = texto.lower()
|
||||
for tema, palabras_clave in tematicas.items():
|
||||
for palabra_clave in palabras_clave:
|
||||
if palabra_clave in texto_lower:
|
||||
return tema, palabra_clave
|
||||
return 'otros', 'general'
|
||||
|
||||
# Función para extraer la fecha del nombre del archivo
|
||||
def extraer_fecha_de_nombre(nombre_archivo):
|
||||
try:
|
||||
fecha_str = re.search(r'\d{4}-\d{2}-\d{2}', nombre_archivo).group(0)
|
||||
return datetime.strptime(fecha_str, '%Y-%m-%d')
|
||||
except:
|
||||
return None
|
||||
|
||||
# Función para contar palabras en un archivo tokenizado
|
||||
def contar_palabras(nombre_archivo):
|
||||
try:
|
||||
with open(nombre_archivo, 'r', encoding='utf-8') as f:
|
||||
palabras = f.read().split()
|
||||
return Counter(palabras)
|
||||
except Exception as e:
|
||||
logging.error(f"Error al contar palabras en {nombre_archivo}: {e}")
|
||||
return Counter()
|
||||
|
||||
# Función para comparar dos archivos y calcular el porcentaje de similitud
|
||||
def comparar_archivos(archivo1, archivo2):
|
||||
try:
|
||||
conteo1 = contar_palabras(archivo1)
|
||||
conteo2 = contar_palabras(archivo2)
|
||||
|
||||
palabras_comunes = set(conteo1.keys()) & set(conteo2.keys())
|
||||
num_palabras_comunes = sum(min(conteo1[p], conteo2[p]) for p in palabras_comunes)
|
||||
num_palabras_totales = sum(conteo1.values()) + sum(conteo2.values())
|
||||
|
||||
if num_palabras_totales == 0:
|
||||
return 0
|
||||
|
||||
porcentaje_similitud = (num_palabras_comunes / num_palabras_totales) * 100
|
||||
return porcentaje_similitud
|
||||
except Exception as e:
|
||||
logging.error(f"Error al comparar archivos {archivo1} y {archivo2}: {e}", exc_info=True)
|
||||
return 0 # Retornar 0 en caso de error
|
||||
|
||||
# Función paralelizada para manejar las comparaciones
|
||||
def manejar_comparacion_multiproceso(pair):
|
||||
archivo1, archivo2 = pair
|
||||
try:
|
||||
# Inicia MongoDB dentro del proceso
|
||||
client = init_mongo_client()
|
||||
db = client['FLUJOS_DATOS']
|
||||
comparaciones_collection = db['comparaciones']
|
||||
|
||||
nombre_archivo1 = os.path.basename(archivo1)
|
||||
nombre_archivo2 = os.path.basename(archivo2)
|
||||
|
||||
porcentaje = comparar_archivos(archivo1, archivo2)
|
||||
comparacion = {
|
||||
'noticia1': nombre_archivo1,
|
||||
'noticia2': nombre_archivo2,
|
||||
'porcentaje_similitud': porcentaje
|
||||
}
|
||||
|
||||
comparaciones_collection.insert_one(comparacion)
|
||||
logging.info(f"Guardada comparación entre {nombre_archivo1} y {nombre_archivo2} con {porcentaje:.2f}% de similitud.")
|
||||
except Exception as e:
|
||||
logging.error(f"Error al manejar comparación {archivo1} vs {archivo2}: {e}", exc_info=True)
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
# Función para verificar si el documento ya fue procesado
|
||||
def documento_ya_subido(nombre_archivo, collection):
|
||||
return collection.find_one({"archivo": nombre_archivo}) is not None
|
||||
|
||||
# Función para subir documentos a MongoDB
|
||||
def subir_documento_a_mongodb(nombre_archivo, tema, subtema, fecha, texto, collection):
|
||||
documento = {
|
||||
'archivo': nombre_archivo,
|
||||
'tema': tema,
|
||||
'subtema': subtema,
|
||||
'fecha': fecha,
|
||||
'texto': texto # Subimos el texto original
|
||||
}
|
||||
try:
|
||||
collection.insert_one(documento)
|
||||
logging.info(f"Subido documento: {nombre_archivo} con tema: {tema}, subtema: {subtema}.")
|
||||
except Exception as e:
|
||||
logging.error(f"Error al subir documento {nombre_archivo}: {e}")
|
||||
|
||||
# Función para procesar documentos y subirlos a MongoDB
|
||||
def procesar_documentos(directorio, collection):
|
||||
archivos = [os.path.join(directorio, f) for f in os.listdir(directorio)]
|
||||
print(f"Procesando {len(archivos)} archivos en el directorio: {directorio}")
|
||||
|
||||
# Subir documentos a MongoDB
|
||||
with tqdm(total=len(archivos), desc=f"Subiendo documentos a MongoDB ({collection.name})", ncols=100) as pbar:
|
||||
for archivo in archivos:
|
||||
nombre_archivo = os.path.basename(archivo)
|
||||
if documento_ya_subido(nombre_archivo, collection):
|
||||
logging.info(f"Saltando {nombre_archivo}, ya está subido.")
|
||||
pbar.update(1)
|
||||
continue
|
||||
|
||||
fecha = extraer_fecha_de_nombre(nombre_archivo)
|
||||
|
||||
try:
|
||||
with open(archivo, 'r', encoding='utf-8') as f:
|
||||
texto_original = f.read()
|
||||
texto_preprocesado = preprocesar_texto(texto_original)
|
||||
tema, subtema = asignar_tema_y_subtema(texto_preprocesado)
|
||||
subir_documento_a_mongodb(nombre_archivo, tema, subtema, fecha, texto_original, collection)
|
||||
except Exception as e:
|
||||
logging.error(f"Error al procesar archivo {archivo}: {e}")
|
||||
|
||||
pbar.update(1)
|
||||
|
||||
def manejar_comparaciones_multiproceso(directorios_tokenized):
|
||||
try:
|
||||
archivos_tokenizados = []
|
||||
for directorio in directorios_tokenized:
|
||||
archivos_directorio = [os.path.join(directorio, f) for f in os.listdir(directorio)]
|
||||
archivos_tokenizados.append(archivos_directorio)
|
||||
|
||||
# Generar pares de archivos para comparar (combinaciones entre directorios)
|
||||
pairs = []
|
||||
for i in range(len(archivos_tokenizados)):
|
||||
for j in range(i+1, len(archivos_tokenizados)):
|
||||
for archivo1 in archivos_tokenizados[i]:
|
||||
for archivo2 in archivos_tokenizados[j]:
|
||||
pairs.append((archivo1, archivo2))
|
||||
|
||||
total_pairs = len(pairs)
|
||||
logging.info(f"Total de pares a comparar: {total_pairs}")
|
||||
|
||||
# Procesar las comparaciones en lotes más pequeños para reducir el uso de memoria
|
||||
batch_size = 1000 # Tamaño del lote
|
||||
num_batches = (total_pairs // batch_size) + 1
|
||||
|
||||
logging.info(f"Procesando en {num_batches} lotes de {batch_size} pares cada uno.")
|
||||
|
||||
for batch_num in range(num_batches):
|
||||
batch_start = batch_num * batch_size
|
||||
batch_end = min(batch_start + batch_size, total_pairs)
|
||||
batch_pairs = pairs[batch_start:batch_end]
|
||||
|
||||
logging.info(f"Procesando lote {batch_num + 1}/{num_batches}")
|
||||
|
||||
with Pool(processes=NUM_PROCESOS) as pool:
|
||||
list(tqdm(pool.imap_unordered(manejar_comparacion_multiproceso, batch_pairs), total=len(batch_pairs), desc=f"Comparando archivos (Lote {batch_num + 1})", ncols=100))
|
||||
except Exception as e:
|
||||
logging.error(f"Error en manejar_comparaciones_multiproceso: {e}", exc_info=True)
|
||||
|
||||
def main():
|
||||
try:
|
||||
# Rutas absolutas de los archivos en formato txt
|
||||
carpeta_noticias_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/NOTICIAS/articulos'
|
||||
carpeta_wikipedia_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/WIKIPEDIA/articulos_wikipedia'
|
||||
carpeta_torrents_txt = '/var/www/theflows.net/flujos/FLUJOS_DATOS/TORRENTS/TORRENTS_WIKILEAKS_COMPLETO/txt'
|
||||
|
||||
# Carpetas con los archivos tokenizados para hacer las comparaciones
|
||||
carpeta_noticias_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/NOTICIAS/tokenized'
|
||||
carpeta_wikipedia_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/WIKIPEDIA/articulos_tokenizados'
|
||||
carpeta_torrents_tokenized = '/var/www/theflows.net/flujos/FLUJOS_DATOS/TORRENTS/TORRENTS_WIKILEAKS_COMPLETO/tokenized'
|
||||
|
||||
# Iniciar cliente MongoDB
|
||||
client = init_mongo_client()
|
||||
|
||||
# Omitir la subida de documentos
|
||||
print("Subiendo archivos originales de Noticias a MongoDB...")
|
||||
logging.info("Iniciando subida de noticias a MongoDB")
|
||||
procesar_documentos(carpeta_noticias_txt, client['FLUJOS_DATOS']['noticias'])
|
||||
logging.info("Finalizada subida de noticias a MongoDB")
|
||||
|
||||
# print("Subiendo archivos originales de Wikipedia a MongoDB...")
|
||||
logging.info("Iniciando subida de Wikipedia a MongoDB")
|
||||
procesar_documentos(carpeta_wikipedia_txt, client['FLUJOS_DATOS']['wikipedia'])
|
||||
logging.info("Finalizada subida de Wikipedia a MongoDB")
|
||||
|
||||
# print("Subiendo archivos originales de TORRENTS a MongoDB...")
|
||||
logging.info("Iniciando subida de TORRENTS a MongoDB")
|
||||
procesar_documentos(carpeta_torrents_txt, client['FLUJOS_DATOS']['torrents'])
|
||||
logging.info("Finalizada subida de TORRENTS a MongoDB")
|
||||
|
||||
# Comparar archivos tokenizados y guardar comparaciones en MongoDB
|
||||
print("Comparando textos tokenizados entre Noticias, Wikipedia y TORRENTS...")
|
||||
logging.info("Iniciando comparaciones entre textos tokenizados")
|
||||
manejar_comparaciones_multiproceso([carpeta_noticias_tokenized, carpeta_wikipedia_tokenized, carpeta_torrents_tokenized])
|
||||
logging.info("Finalizadas comparaciones entre textos tokenizados")
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error en la ejecución del script: {e}", exc_info=True)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue