Apache Spark - Fundamentos y Entornos
🚀 Presentación: Spark
Apache Spark es un motor de procesamiento de datos distribuido diseñado para velocidad, facilidad de uso y análisis sofisticado. A diferencia de Hadoop MapReduce que escribe resultados intermedios en disco, Spark mantiene los datos en memoria (RAM), lo que lo hace hasta 100x más rápido para ciertos workloads.
¿Por qué aprender Spark?
- Velocidad: Procesamiento en memoria (in-memory computing)
- Versatilidad: Batch processing, streaming, ML, grafos, SQL
- Escalabilidad: Desde tu laptop hasta miles de nodos
- Industria: Usado por Netflix, Uber, Airbnb, Amazon, etc.
1. Arquitectura de Spark
Section titled “1. Arquitectura de Spark”Antes de empezar a programar, es crucial entender cómo funciona Spark internamente.
Componentes Principales
Section titled “Componentes Principales”graph TB subgraph "Driver Program" D["🧠 Driver<br/>(Tu código Python/Scala)<br/><br/>- SparkContext/SparkSession<br/>- Planifica tareas<br/>- Coordina ejecución"] end
subgraph "Cluster Manager" CM["📋 Cluster Manager<br/>(YARN / Standalone / Mesos)<br/><br/>- Asigna recursos<br/>- Gestiona nodos"] end
subgraph "Worker Nodes" W1["⚙️ Worker 1<br/><br/>Executor<br/>- Ejecuta tareas<br/>- Almacena datos"] W2["⚙️ Worker 2<br/><br/>Executor<br/>- Ejecuta tareas<br/>- Almacena datos"] W3["⚙️ Worker 3<br/><br/>Executor<br/>- Ejecuta tareas<br/>- Almacena datos"] end
D -->|"1. Solicita recursos"| CM CM -->|"2. Asigna Executors"| W1 CM -->|"2. Asigna Executors"| W2 CM -->|"2. Asigna Executors"| W3 D -->|"3. Envía código<br/>y tareas"| W1 D -->|"3. Envía código<br/>y tareas"| W2 D -->|"3. Envía código<br/>y tareas"| W3 W1 -.->|"4. Reporta<br/>resultados"| D W2 -.->|"4. Reporta<br/>resultados"| D W3 -.->|"4. Reporta<br/>resultados"| D
style D fill:#e3f2fd,stroke:#1976d2,stroke-width:3px style CM fill:#fff9c4,stroke:#f57c00,stroke-width:2px style W1 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style W2 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style W3 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px- Driver: Tu programa Python/Scala que contiene la lógica de negocio
- Cluster Manager: Asigna recursos (CPU, RAM) a tu aplicación
- Executors: Procesos JVM que ejecutan tu código en paralelo
- Comunicación: El Driver envía tareas, los Executors las ejecutan y reportan resultados
Modos de Ejecución
Section titled “Modos de Ejecución”| Modo | Descripción | Cuándo usar |
|---|---|---|
| Local | Todo corre en una sola máquina | Desarrollo, pruebas, aprendizaje |
| Standalone | Cluster Spark sin dependencias | Clusters dedicados a Spark |
| YARN | Integrado con Hadoop | Cuando ya tienes Hadoop instalado |
| Kubernetes | Contenedores orquestados | Entornos cloud-native |
| Mesos | Gestor de recursos genérico | Clusters compartidos con otras apps |
2. Entornos de Desarrollo
Section titled “2. Entornos de Desarrollo”Opción 1: BDA Lab (Recomendado para el curso)
Section titled “Opción 1: BDA Lab (Recomendado para el curso)”Entorno completo con Hadoop, HDFS y Spark integrados.
# Clonar el repositorio del laboratoriogit clone https://github.com/josepgarcia/BigDataAplicadoLab-2526
cd BigDataAplicadoLab-2526/modulo2
# Dar permisos de ejecución al menúchmod u+x menu.sh
# Ejecutar el menú interactivo./menu.shVentajas:
- ✅ Hadoop + HDFS + Spark preconfigurados
- ✅ Jupyter Notebook integrado
- ✅ Datasets de ejemplo incluidos
- ✅ Spark UI accesible en
http://localhost:4040
Opción 2: Docker con Jupyter (Rápido para empezar)
Section titled “Opción 2: Docker con Jupyter (Rápido para empezar)”Sin integración con Hadoop/HDFS, ideal para aprender Spark rápidamente.
# Crear carpeta de trabajomkdir spark-localcd spark-local
# Ejecutar contenedor con Jupyter + PySparkdocker run -it -p 8888:8888 -p 4040:4040 \ -v $(pwd):/home/jovyan/work/projects/ \ jupyter/pyspark-notebook
# Acceder a Jupyter: http://localhost:8888# Spark UI: http://localhost:4040 (cuando ejecutes código Spark)Ventajas:
- ✅ Instalación en 1 comando
- ✅ Jupyter Notebook listo para usar
- ✅ No requiere configuración
Desventajas:
- ❌ Sin HDFS (solo archivos locales)
- ❌ Sin integración con Hadoop
Opción 3: Instalación Local (Python + PySpark)
Section titled “Opción 3: Instalación Local (Python + PySpark)”Para quienes prefieren trabajar sin Docker.
# Opción 1: Con pippip install pyspark
# Opción 2: Con Conda (recomendado)conda install -c conda-forge pyspark
# Verificar instalaciónpython -c "import pyspark; print(pyspark.__version__)"Requisitos previos:
- Python 3.8+
- Java 8 o 11 (Spark corre sobre la JVM)
# Verificar Javajava -version
# Si no tienes Java, instalar:# macOS: brew install openjdk@11# Ubuntu: sudo apt install openjdk-11-jdk3. SparkSession vs SparkContext
Section titled “3. SparkSession vs SparkContext”Para trabajar con Spark, necesitas un punto de entrada. Históricamente ha habido confusión porque Spark ha evolucionado:
Evolución Histórica
Section titled “Evolución Histórica”graph LR V1["Spark 1.x<br/>(2014-2016)<br/><br/>SparkContext<br/>SQLContext<br/>HiveContext"] -->|"Unificación"| V2["Spark 2.0+<br/>(2016-presente)<br/><br/>✅ SparkSession<br/>(Engloba todo)"]
style V1 fill:#f8d7da,stroke:#c62828 style V2 fill:#c8e6c9,stroke:#388e3c,stroke-width:3pxComparación Detallada
Section titled “Comparación Detallada”| Aspecto | SparkContext (sc) | SparkSession (spark) |
|---|---|---|
| Versión | Spark 1.x (legacy) | Spark 2.0+ (actual) |
| Propósito | Crear RDDs | API unificada (RDDs + DataFrames + SQL) |
| Uso recomendado | ❌ Solo código legacy | ✅ Usar siempre |
| Acceso a RDDs | Directo | A través de spark.sparkContext |
| SQL | Requiere SQLContext separado | Integrado |
| Hive | Requiere HiveContext separado | Integrado |
Crear una SparkSession
Section titled “Crear una SparkSession”Versión Básica
Section titled “Versión Básica”from pyspark.sql import SparkSession
# Crear sesión (usa configuración por defecto)spark = SparkSession.builder.getOrCreate()
# Verificar que funcionasparkSalida esperada:
SparkSession - in-memory SparkContext Version: v3.5.0 Master: local[*] AppName: pyspark-shellExplicación:
local[*]: Modo local usando todos los cores disponiblespyspark-shell: Nombre por defecto de la aplicación
Versión con Configuración Personalizada
Section titled “Versión con Configuración Personalizada”spark = (SparkSession.builder .appName("MiAnalisisDatos") # Nombre que aparece en Spark UI .config("spark.executor.memory", "2g") # RAM por executor .config("spark.driver.memory", "1g") # RAM para el driver .config("spark.sql.shuffle.partitions", "10") # Particiones para shuffle .getOrCreate())Parámetros importantes:
| Configuración | Descripción | Valor recomendado |
|---|---|---|
spark.executor.memory | RAM por executor | 2-4g para desarrollo, más en producción |
spark.driver.memory | RAM para el driver | 1-2g normalmente |
spark.sql.shuffle.partitions | Particiones para operaciones shuffle | 10-20 para datos pequeños, 200 (default) para producción |
spark.executor.cores | CPUs por executor | 2-4 cores |
4. Primer Programa: “Hola Spark”
Section titled “4. Primer Programa: “Hola Spark””Vamos a verificar que todo funciona con un ejemplo simple: sumar los números del 0 al 100.
Código Completo
Section titled “Código Completo”from pyspark.sql import SparkSession
# 1. Crear SparkSessionspark = SparkSession.builder.appName("HolaSpark").getOrCreate()
# 2. Obtener el SparkContext (para funciones de bajo nivel)sc = spark.sparkContext
# 3. Crear un RDD (Resilient Distributed Dataset)# parallelize() distribuye los datos en particionesnumeros = sc.parallelize(range(101))
# 4. Realizar una acción: sumar todos los númerostotal = numeros.sum()
print(f"La suma de 0 a 100 es: {total}")# Salida: La suma de 0 a 100 es: 5050
# 5. Verificar cuántas particiones se crearonprint(f"Número de particiones: {numeros.getNumPartitions()}")¿Qué está pasando internamente?
Section titled “¿Qué está pasando internamente?”graph TB subgraph "1. Creación del RDD" D1["range(101)<br/>[0,1,2,...,100]"] end
subgraph "2. Particionamiento" P1["Partición 0<br/>[0-25]"] P2["Partición 1<br/>[26-50]"] P3["Partición 2<br/>[51-75]"] P4["Partición 3<br/>[76-100]"] end
subgraph "3. Procesamiento Paralelo" E1["Executor 1<br/>suma = 325"] E2["Executor 2<br/>suma = 975"] E3["Executor 3<br/>suma = 1625"] E4["Executor 4<br/>suma = 2125"] end
subgraph "4. Reducción Final" R["Driver<br/>325+975+1625+2125<br/>= 5050"] end
D1 --> P1 D1 --> P2 D1 --> P3 D1 --> P4
P1 --> E1 P2 --> E2 P3 --> E3 P4 --> E4
E1 --> R E2 --> R E3 --> R E4 --> R
style D1 fill:#e3f2fd,stroke:#1976d2 style P1 fill:#fff9c4,stroke:#f57c00 style P2 fill:#fff9c4,stroke:#f57c00 style P3 fill:#fff9c4,stroke:#f57c00 style P4 fill:#fff9c4,stroke:#f57c00 style E1 fill:#c8e6c9,stroke:#388e3c style E2 fill:#c8e6c9,stroke:#388e3c style E3 fill:#c8e6c9,stroke:#388e3c style E4 fill:#c8e6c9,stroke:#388e3c style R fill:#a5d6a7,stroke:#2e7d32,stroke-width:3pxPasos del procesamiento:
- Particionamiento: Spark divide los 101 números en 4 particiones (por defecto en local)
- Distribución: Cada partición se envía a un executor (proceso de ejecución)
- Procesamiento paralelo: Cada executor suma sus números localmente
- Reducción: El driver combina los resultados parciales
5. Spark UI - Monitorización
Section titled “5. Spark UI - Monitorización”Cada vez que ejecutas código Spark, se levanta automáticamente una interfaz web de monitorización.
Acceso: http://localhost:4040
Pestañas Principales
Section titled “Pestañas Principales”| Pestaña | Qué muestra | Para qué sirve |
|---|---|---|
| Jobs | Trabajos ejecutados | Ver cuánto tardó cada job |
| Stages | Etapas de cada job | Identificar cuellos de botella |
| Storage | Datos cacheados | Verificar qué está en memoria |
| Environment | Configuración | Ver parámetros de Spark |
| Executors | Estado de executors | Monitorizar uso de CPU/RAM |
| SQL | Queries SQL ejecutadas | Ver plan de ejecución optimizado |
Ejemplo de Análisis
Section titled “Ejemplo de Análisis”# Crear un RDD granderdd_grande = sc.parallelize(range(1000000), 10)
# Transformación + Acciónresultado = rdd_grande.filter(lambda x: x % 2 == 0).count()
# Ahora ve a http://localhost:4040# Verás:# - 1 Job (count)# - 1 Stage (filter + count)# - 10 Tasks (una por partición)6. Lectura de Datos desde Archivos
Section titled “6. Lectura de Datos desde Archivos”Lectura de CSV con RDDs
Section titled “Lectura de CSV con RDDs”Problema: Rutas de Archivos Locales
Section titled “Problema: Rutas de Archivos Locales”Cuando trabajas en local, Spark necesita saber que el archivo está en tu sistema de archivos (no en HDFS).
import osfrom pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()sc = spark.sparkContext
# ❌ INCORRECTO: Spark buscará en HDFSrdd = sc.textFile("./DATOS/archivo.csv")
# ✅ CORRECTO: Especificar que es un archivo localpath_relativo = "./DATOS/archivo.csv"path_absoluto = os.path.abspath(path_relativo)path_final = "file://" + path_absoluto
print(f"Leyendo desde: {path_final}")rdd = sc.textFile(path_final)Ejemplo Completo: Análisis de Ventas
Section titled “Ejemplo Completo: Análisis de Ventas”Vamos a trabajar con el dataset pdi_sales_small.csv que contiene ventas internacionales.
Paso 1: Descargar el Dataset
Section titled “Paso 1: Descargar el Dataset”# Crear carpeta y descargar datos!mkdir -p DATOS!wget "https://github.com/josepgarcia/datos/raw/refs/heads/main/pdi_sales_small.csv" -P ./DATOS
# Ver las primeras líneas!head ./DATOS/pdi_sales_small.csvEstructura del archivo:
ProductID;Date;Zip;Units;Revenue;Country725;1/15/1999;41540;1;115.5;Germany787;6/6/2002;41540;1;314.9;Germany...Paso 2: Cargar y Procesar
Section titled “Paso 2: Cargar y Procesar”import osfrom pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AnalisisVentas").getOrCreate()sc = spark.sparkContext
# Construir ruta absolutapath_relativo = "./DATOS/pdi_sales_small.csv"path_absoluto = os.path.abspath(path_relativo)path_final = "file://" + path_absoluto
print(f"Leyendo desde: {path_final}")
# Leer archivo (cada línea es un string)rdd = sc.textFile(path_final)
# Ver las primeras 3 líneasprint(rdd.take(3))# Salida:# ['ProductID;Date;Zip;Units;Revenue;Country',# '725;1/15/1999;41540 ;1;115.5;Germany',# '787;6/6/2002;41540 ;1;314.9;Germany']Paso 3: Extraer País y Unidades
Section titled “Paso 3: Extraer País y Unidades”# Extraer (País, Unidades) de cada línea# Formato: ProductID;Date;Zip;Units;Revenue;Country# 0 1 2 3 4 5
paises_unidades = rdd.map(lambda linea: ( linea.split(";")[-1].strip(), # País (última columna) linea.split(";")[3] # Unidades (columna 3)))
paises_unidades.take(5)# Salida:# [('Country', 'Units'),# ('Germany', '1'),# ('Germany', '1'),# ...]Paso 4: Optimización - Split Una Sola Vez
Section titled “Paso 4: Optimización - Split Una Sola Vez”# ✅ MEJOR: Función que hace split una sola vezdef parsear_linea(linea): columnas = linea.split(";") pais = columnas[-1].strip() unidades = columnas[3] return (pais, unidades)
paises_unidades = rdd.map(parsear_linea)paises_unidades.take(5)Paso 5: Eliminar Encabezado
Section titled “Paso 5: Eliminar Encabezado”# Obtener la primera línea (encabezado)header = paises_unidades.first()print(f"Encabezado: {header}")
# Filtrar todas las líneas que NO sean el encabezadopaises_unidades_sin_header = paises_unidades.filter(lambda linea: linea != header)
paises_unidades_sin_header.take(5)# Salida:# [('Germany', '1'),# ('Germany', '1'),# ('Germany', '1'),# ...]Paso 6: Convertir a Enteros y Sumar por País
Section titled “Paso 6: Convertir a Enteros y Sumar por País”# Convertir unidades a enterospaises_unidades_int = paises_unidades_sin_header.map( lambda x: (x[0], int(x[1])))
# Sumar unidades por país usando reduceByKeypaises_total_unidades = paises_unidades_int.reduceByKey(lambda a, b: a + b)
# Obtener resultadosresultados = paises_total_unidades.collect()
# Mostrar top 10 países por unidades vendidastop10 = sorted(resultados, key=lambda x: x[1], reverse=True)[:10]for pais, unidades in top10: print(f"{pais}: {unidades:,} unidades")Salida esperada:
Germany: 15,234 unidadesUSA: 12,891 unidadesFrance: 9,456 unidades...7. Conexión con HDFS
Section titled “7. Conexión con HDFS”Cuando trabajas con el entorno BDA Lab (Hadoop + Spark), puedes leer/escribir directamente en HDFS.
Diferencia de Rutas
Section titled “Diferencia de Rutas”# ❌ Archivo LOCAL (tu máquina)rdd_local = sc.textFile("file:///ruta/absoluta/archivo.csv")
# ✅ Archivo en HDFS (sistema distribuido)rdd_hdfs = sc.textFile("/user/hadoop/datos/archivo.csv")# O explícitamente:rdd_hdfs = sc.textFile("hdfs://localhost:9000/user/hadoop/datos/archivo.csv")Ejemplo Completo con HDFS
Section titled “Ejemplo Completo con HDFS”# 1. Subir archivo a HDFS (desde terminal)# hdfs dfs -put ./DATOS/pdi_sales_small.csv /user/hadoop/datos/
# 2. Leer desde HDFS en Sparkrdd = sc.textFile("/user/hadoop/datos/pdi_sales_small.csv")
# 3. Procesar (igual que antes)paises_unidades = rdd.map(lambda linea: ( linea.split(";")[-1].strip(), linea.split(";")[3]))
# 4. Guardar resultados en HDFSpaises_unidades.saveAsTextFile("/user/hadoop/resultados/ventas_por_pais")
# 5. Ver resultados (desde terminal)# hdfs dfs -cat /user/hadoop/resultados/ventas_por_pais/part-000008. Finalizar Sesión
Section titled “8. Finalizar Sesión”Al terminar de trabajar, libera los recursos:
# Detener SparkSession (libera memoria y CPU)spark.stop()Errores Comunes y Soluciones
Section titled “Errores Comunes y Soluciones”Error 1: JAVA_HOME is not set
Section titled “Error 1: JAVA_HOME is not set”Problema: Spark requiere Java pero no encuentra la instalación.
Solución:
# macOSexport JAVA_HOME=$(/usr/libexec/java_home)
# Linuxexport JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
# Verificarecho $JAVA_HOMEjava -versionError 2: FileNotFoundException
Section titled “Error 2: FileNotFoundException”Problema: Spark no encuentra el archivo.
Solución:
# ✅ Usar ruta absoluta con prefijo file://import ospath = "file://" + os.path.abspath("./DATOS/archivo.csv")rdd = sc.textFile(path)Error 3: Port 4040 is already in use
Section titled “Error 3: Port 4040 is already in use”Problema: Ya hay una sesión Spark corriendo.
Solución:
# Detener sesión anteriorspark.stop()
# O cambiar puerto en la configuraciónspark = SparkSession.builder \ .config("spark.ui.port", "4041") \ .getOrCreate()Error 4: Py4JJavaError al hacer collect()
Section titled “Error 4: Py4JJavaError al hacer collect()”Problema: Intentas traer demasiados datos al driver.
Solución:
# ❌ MALO: collect() en dataset granderesultado = rdd_gigante.collect() # OutOfMemoryError!
# ✅ BUENO: Usar take() para muestramuestra = rdd_gigante.take(100)
# ✅ BUENO: Guardar a discordd_gigante.saveAsTextFile("/ruta/resultados")Ejercicios Prácticos
Section titled “Ejercicios Prácticos”Ejercicio 1: Configurar SparkSession Personalizada
Section titled “Ejercicio 1: Configurar SparkSession Personalizada”Crea una SparkSession con configuración específica para un análisis de datos.
Tareas:
- Crea una SparkSession con el nombre “MiPrimerAnalisis”
- Configura 1GB de memoria para el executor
- Configura 8 particiones para shuffle
- Verifica que la sesión se creó correctamente mostrando su información
Resultado esperado:
- SparkSession creada con el nombre especificado
- Configuración aplicada correctamente
- Información de la sesión mostrada
Ejercicio 2: Operaciones Básicas con RDDs
Section titled “Ejercicio 2: Operaciones Básicas con RDDs”Crea un RDD y realiza operaciones básicas.
Datos de entrada:
numeros = list(range(1, 21)) # [1, 2, 3, ..., 20]Tareas:
- Crea un RDD a partir de la lista de números
- Cuenta cuántos elementos tiene el RDD
- Muestra los primeros 5 elementos
- Calcula la suma de todos los números
- Encuentra el número máximo y mínimo
Resultado esperado:
- Total de elementos: 20
- Primeros 5: [1, 2, 3, 4, 5]
- Suma: 210
- Máximo: 20, Mínimo: 1
Ejercicio 3: Transformaciones map y filter
Section titled “Ejercicio 3: Transformaciones map y filter”Aplica transformaciones a un RDD de números.
Datos de entrada:
numeros = sc.parallelize(range(1, 51)) # 1 a 50Tareas:
- Multiplica cada número por 3
- Filtra solo los números mayores a 50
- Cuenta cuántos números cumplen la condición
- Muestra los primeros 10 resultados
Resultado esperado:
- Números después de multiplicar por 3: [3, 6, 9, …, 150]
- Números mayores a 50: [51, 54, 57, …, 150]
- Total de números mayores a 50: 34
- Primeros 10: [51, 54, 57, 60, 63, 66, 69, 72, 75, 78]
Ejercicio 4: Trabajar con Pares Clave-Valor
Section titled “Ejercicio 4: Trabajar con Pares Clave-Valor”Crea un RDD de pares (clave, valor) y realiza agregaciones.
Datos de entrada:
ventas = [ ("Laptop", 1200), ("Mouse", 25), ("Laptop", 1100), ("Teclado", 75), ("Mouse", 30), ("Laptop", 1300)]Tareas:
- Crea un RDD a partir de la lista de ventas
- Calcula el total de ventas por producto usando
reduceByKey - Cuenta cuántas ventas hay de cada producto
- Encuentra el producto con mayor total de ventas
Resultado esperado:
- Total por producto: Laptop: 3600, Mouse: 55, Teclado: 75
- Número de ventas: Laptop: 3, Mouse: 2, Teclado: 1
- Producto con mayor total: Laptop (3600)
Ejercicio 5: Lectura y Procesamiento de Archivo CSV
Section titled “Ejercicio 5: Lectura y Procesamiento de Archivo CSV”Lee un archivo CSV y procesa sus datos.
Dataset: persons.csv de https://github.com/josepgarcia/datos
Estructura del archivo:
name,age,cityAna,28,MadridJuan,35,BarcelonaMaría,42,ValenciaTareas:
- Descarga el archivo
persons.csv - Cárgalo usando
sc.textFile()con la ruta correcta - Elimina el encabezado
- Extrae solo los nombres y edades
- Filtra personas mayores de 30 años
- Cuenta cuántas personas cumplen la condición
Resultado esperado:
- Archivo cargado correctamente
- Encabezado eliminado
- Personas mayores de 30: Juan (35), María (42)
- Total: 2 personas
Ejercicio 6: Optimización - Evitar Múltiples split()
Section titled “Ejercicio 6: Optimización - Evitar Múltiples split()”Optimiza el siguiente código que hace split múltiples veces.
Código ineficiente:
rdd = sc.textFile("file:///ruta/datos.csv")
# ❌ INEFICIENTE: split() se ejecuta 3 veces por líneanombres = rdd.map(lambda x: x.split(",")[0])edades = rdd.map(lambda x: x.split(",")[1])ciudades = rdd.map(lambda x: x.split(",")[2])Tareas:
- Identifica el problema de eficiencia
- Reescribe el código para hacer split() una sola vez
- Extrae nombre, edad y ciudad en una sola pasada
- Explica por qué tu solución es más eficiente
Resultado esperado:
- Código optimizado que hace split() una sola vez
- Explicación clara de la mejora de rendimiento
Ejercicio 7: Debugging - Encuentra y Corrige Errores
Section titled “Ejercicio 7: Debugging - Encuentra y Corrige Errores”El siguiente código tiene errores. Encuéntralos y corrígelos.
Código con errores:
from pyspark.sql import SparkSession
# Error 1: Falta configuraciónspark = SparkSession.builder.getOrCreate()
# Error 2: Ruta incorrecta para archivo localrdd = sc.textFile("./datos.csv")
# Error 3: Uso de collect() en dataset grandedatos = rdd.collect()for linea in datos: print(linea)
# Error 4: No se cierra la sesiónTareas:
- Identifica los 4 errores
- Explica por qué cada uno es un error
- Proporciona el código corregido
Resultado esperado:
- Error 1: Falta obtener SparkContext
- Error 2: Falta prefijo
file://y ruta absoluta - Error 3:
collect()es peligroso, usartake()oforeach() - Error 4: Falta
spark.stop()
Proyecto: Análisis de Logs de Servidor Web
Section titled “Proyecto: Análisis de Logs de Servidor Web”Objetivo
Section titled “Objetivo”Analizar logs de un servidor web para extraer estadísticas de acceso, identificar páginas más visitadas, detectar errores y analizar patrones de tráfico. Este proyecto integra todos los conceptos vistos: SparkSession, RDDs, transformaciones, acciones y lectura de archivos.
Dataset
Section titled “Dataset”Para este proyecto utilizaremos el dataset stocks.txt disponible en:
https://github.com/josepgarcia/datos
Descripción del dataset:
- Archivo de texto con datos de acciones bursátiles
- Formato:
Símbolo,Fecha,Precio,Volumen - Tamaño aproximado: ~1000 registros
- Ejemplo de línea:
AAPL,2020-01-02,300.35,50000000
Descarga:
# Descargar el dataset!mkdir -p DATOS!wget "https://github.com/josepgarcia/datos/raw/refs/heads/main/stocks.txt" -P ./DATOS!head ./DATOS/stocks.txtPaso 1: Configuración Inicial y Carga de Datos
Section titled “Paso 1: Configuración Inicial y Carga de Datos”Objetivo: Configurar Spark y cargar el archivo de logs correctamente.
Instrucciones:
- Crea una SparkSession con el nombre “AnalisisStocks”
- Configura 512MB de memoria para el executor
- Obtén el SparkContext
- Construye la ruta absoluta del archivo con el prefijo
file:// - Carga el archivo usando
sc.textFile() - Verifica la carga mostrando las primeras 5 líneas
Código guía:
from pyspark.sql import SparkSessionimport os
# TODO: Crear SparkSession con configuraciónspark = SparkSession.builder \ .appName("...") \ .config("spark.executor.memory", "...") \ .getOrCreate()
# TODO: Obtener SparkContextsc = ...
# TODO: Construir ruta absolutapath_relativo = "./DATOS/stocks.txt"path_absoluto = ...path_final = ...
# TODO: Cargar archivordd = ...
# TODO: Verificar cargaprint(rdd.take(5))Pista: Usa os.path.abspath() para obtener la ruta absoluta
Paso 2: Limpieza y Parseo de Datos
Section titled “Paso 2: Limpieza y Parseo de Datos”Objetivo: Limpiar los datos y convertirlos a un formato estructurado.
Instrucciones:
- Crea una función
parsear_linea(linea)que:- Divida la línea por comas
- Extraiga: símbolo, fecha, precio (float), volumen (int)
- Devuelva una tupla:
(símbolo, fecha, precio, volumen)
- Aplica la función a todo el RDD
- Filtra líneas que no se puedan parsear (manejo de errores)
- Cuenta cuántas líneas válidas tienes
Código guía:
def parsear_linea(linea): try: # TODO: Dividir por comas partes = ...
# TODO: Extraer campos simbolo = ... fecha = ... precio = float(...) volumen = int(...)
return (simbolo, fecha, precio, volumen) except: return None
# TODO: Aplicar función y filtrar nulosrdd_parseado = rdd.map(parsear_linea).filter(lambda x: x is not None)
# TODO: Contar líneas válidasprint(f"Líneas válidas: {rdd_parseado.count()}")Pista: Usa try-except para manejar errores de conversión
Paso 3: Análisis por Símbolo - Estadísticas Básicas
Section titled “Paso 3: Análisis por Símbolo - Estadísticas Básicas”Objetivo: Calcular estadísticas de precio por cada símbolo de acción.
Instrucciones:
- Crea pares
(símbolo, precio)desde el RDD parseado - Calcula para cada símbolo:
- Precio máximo
- Precio mínimo
- Precio promedio (necesitarás
aggregateByKey)
- Muestra los resultados ordenados por símbolo
Código guía:
# TODO: Crear pares (símbolo, precio)simbolo_precio = rdd_parseado.map(lambda x: (..., ...))
# TODO: Calcular máximo por símboloprecio_max = simbolo_precio.reduceByKey(...)
# TODO: Calcular mínimo por símboloprecio_min = simbolo_precio.reduceByKey(...)
# TODO: Calcular promedio (suma, contador)# aggregateByKey(valor_inicial, seqFunc, combFunc)precio_promedio = simbolo_precio.aggregateByKey( (0.0, 0), # (suma, contador) lambda acc, valor: ..., # Combinar valor con acumulador lambda acc1, acc2: ... # Combinar dos acumuladores).mapValues(lambda x: x[0] / x[1])
# TODO: Mostrar resultadosprint("Precio máximo por símbolo:")for simbolo, precio in sorted(precio_max.collect()): print(f"{simbolo}: ${precio:.2f}")Pista: Para el promedio, acumula (suma_precios, cantidad_registros) y luego divide
Paso 4: Análisis de Volumen - Top Días de Mayor Actividad
Section titled “Paso 4: Análisis de Volumen - Top Días de Mayor Actividad”Objetivo: Identificar los días con mayor volumen de transacciones.
Instrucciones:
- Crea pares
(fecha, volumen)desde el RDD parseado - Suma el volumen total por fecha usando
reduceByKey - Ordena por volumen descendente
- Muestra los top 10 días con mayor volumen
Código guía:
# TODO: Crear pares (fecha, volumen)fecha_volumen = rdd_parseado.map(lambda x: (..., ...))
# TODO: Sumar volumen por fechavolumen_por_fecha = fecha_volumen.reduceByKey(...)
# TODO: Ordenar por volumen descendente# Pista: sortBy(lambda x: x[1], ascending=False)top_fechas = volumen_por_fecha.sortBy(..., ascending=False)
# TODO: Mostrar top 10print("Top 10 días con mayor volumen:")for fecha, volumen in top_fechas.take(10): print(f"{fecha}: {volumen:,} acciones")Pista: Usa sortBy() para ordenar por el segundo elemento de la tupla (volumen)
Paso 5: Análisis Avanzado - Volatilidad por Símbolo
Section titled “Paso 5: Análisis Avanzado - Volatilidad por Símbolo”Objetivo: Calcular la volatilidad (diferencia entre máximo y mínimo) de cada símbolo.
Instrucciones:
- Para cada símbolo, calcula el precio máximo y mínimo
- Calcula la volatilidad:
(precio_max - precio_min) / precio_min * 100 - Identifica los 5 símbolos más volátiles
- Identifica los 5 símbolos más estables (menor volatilidad)
Código guía:
# TODO: Calcular max y min por símbolo en una sola pasada# Usa aggregateByKey con valor inicial (precio_max, precio_min)stats_por_simbolo = simbolo_precio.aggregateByKey( (float('-inf'), float('inf')), # (max, min) lambda acc, precio: ..., # Actualizar max y min lambda acc1, acc2: ... # Combinar dos acumuladores)
# TODO: Calcular volatilidadvolatilidad = stats_por_simbolo.mapValues( lambda x: ((x[0] - x[1]) / x[1]) * 100)
# TODO: Ordenar por volatilidadmas_volatiles = volatilidad.sortBy(lambda x: x[1], ascending=False).take(5)mas_estables = volatilidad.sortBy(lambda x: x[1], ascending=True).take(5)
# TODO: Mostrar resultadosprint("Símbolos más volátiles:")for simbolo, vol in mas_volatiles: print(f"{simbolo}: {vol:.2f}%")Pista: Para actualizar max y min: (max(acc[0], precio), min(acc[1], precio))
Paso 6: Guardar Resultados y Análisis Final
Section titled “Paso 6: Guardar Resultados y Análisis Final”Objetivo: Guardar los resultados del análisis y generar un resumen ejecutivo.
Instrucciones:
- Guarda los resultados de volumen por fecha en un archivo de texto
- Guarda las estadísticas por símbolo en otro archivo
- Crea un resumen ejecutivo con:
- Total de registros procesados
- Número de símbolos únicos
- Símbolo con mayor precio promedio
- Día con mayor volumen total
- Símbolo más volátil
- Cierra la sesión de Spark correctamente
Código guía:
# TODO: Guardar resultados# Para archivos locales, usa file:// en la rutaoutput_path_volumen = "file://" + os.path.abspath("./RESULTADOS/volumen_por_fecha")volumen_por_fecha.saveAsTextFile(output_path_volumen)
output_path_stats = "file://" + os.path.abspath("./RESULTADOS/stats_por_simbolo")stats_por_simbolo.saveAsTextFile(output_path_stats)
# TODO: Calcular métricas para resumentotal_registros = rdd_parseado.count()simbolos_unicos = rdd_parseado.map(lambda x: x[0]).distinct().count()simbolo_mayor_precio = precio_promedio.sortBy(lambda x: x[1], ascending=False).first()dia_mayor_volumen = volumen_por_fecha.sortBy(lambda x: x[1], ascending=False).first()simbolo_mas_volatil = volatilidad.sortBy(lambda x: x[1], ascending=False).first()
# TODO: Mostrar resumen ejecutivoprint("\n" + "="*60)print("RESUMEN EJECUTIVO - ANÁLISIS DE STOCKS")print("="*60)print(f"\nTotal de registros procesados: {total_registros:,}")print(f"Número de símbolos únicos: {simbolos_unicos}")print(f"\nSímbolo con mayor precio promedio: {simbolo_mayor_precio[0]} (${simbolo_mayor_precio[1]:.2f})")print(f"Día con mayor volumen: {dia_mayor_volumen[0]} ({dia_mayor_volumen[1]:,} acciones)")print(f"Símbolo más volátil: {simbolo_mas_volatil[0]} ({simbolo_mas_volatil[1]:.2f}%)")
# TODO: Cerrar sesiónspark.stop()print("\n✅ Análisis completado. Sesión cerrada.")Pista: Crea la carpeta RESULTADOS antes de guardar: !mkdir -p RESULTADOS
Preguntas de Reflexión
Section titled “Preguntas de Reflexión”-
Eficiencia: ¿Por qué es importante hacer
split()una sola vez por línea? ¿Cuánto tiempo ahorrarías con 1 millón de líneas? -
Particionamiento: ¿Cuántas particiones tiene tu RDD? ¿Cómo afecta esto al rendimiento? ¿Qué pasaría si tuvieras solo 1 partición?
-
Lazy Evaluation: ¿En qué momento se ejecutaron realmente las transformaciones? ¿Qué ventajas tiene este modelo?
-
Memoria: ¿Qué pasaría si intentaras hacer
collect()en un RDD de 10GB? ¿Cómo evitarías este problema? -
Optimización: Si tuvieras que procesar 100x más datos, ¿qué cambiarías en tu código? ¿Usarías más particiones? ¿Cachearías algún RDD?
-
HDFS vs Local: ¿Cuáles son las ventajas de usar HDFS en lugar de archivos locales para este análisis?
Solución Completa
Section titled “Solución Completa”La solución completa del proyecto está disponible en index_PROYECTO_SOL.txt