Fundamentos de RDD en Spark
Aunque en Spark 3.x el 95% del tiempo trabajarás con DataFrames (tablas estructuradas), los DataFrames son solo una capa de abstracción sobre los RDDs. Entender los RDDs es entender cómo piensa Spark.
Si no entiendes RDDs, escribirás código que funciona, pero que es lento o ineficiente.
1. ¿Qué es realmente un RDD?
Section titled “1. ¿Qué es realmente un RDD?”RDD significa Resilient Distributed Dataset. Desglosemos el acrónimo con una arquitectura real:
- Resilient (Resiliente): Tolerante a fallos. Si un nodo se rompe, Spark reconstruye los datos perdidos automáticamente usando el Linaje (ver más abajo).
- Distributed (Distribuido): Los datos no están en un solo ordenador. Están troceados y repartidos por la memoria RAM de un clúster.
- Dataset: Es la colección de datos inmutable (no se puede modificar).
¿Qué significa inmutable?
Una vez creado un RDD, NO puedes cambiar sus datos. Cada transformación (map, filter) crea un nuevo RDD, el original permanece intacto.
# Ejemplo de inmutabilidadrdd_original = sc.parallelize([1, 2, 3, 4, 5])
# Aplicamos una transformaciónrdd_multiplicado = rdd_original.map(lambda x: x * 2)
print("RDD original:", rdd_original.collect()) # [1, 2, 3, 4, 5]print("RDD multiplicado:", rdd_multiplicado.collect()) # [2, 4, 6, 8, 10]
# El RDD original NO ha cambiado ✅# Se creó un NUEVO RDD con los datos transformados¿Por qué es importante la inmutabilidad?
- Tolerancia a fallos: Spark puede reconstruir datos perdidos re-ejecutando las transformaciones
- Paralelismo seguro: Múltiples tareas pueden leer el mismo RDD sin conflictos
- Linaje (DAG): Spark mantiene un historial de transformaciones para optimización
Arquitectura Driver-Executor
Section titled “Arquitectura Driver-Executor”En Spark, tu aplicación no corre en un solo lugar. Se divide físicamente en dos componentes que pueden estar en ordenadores distintos:
- Driver (El Cerebro): Es el proceso donde corre tu script Python (
main).- Mantiene el
SparkContext. - Convierte tu código en un plan de ejecución (DAG).
- Envía las tareas a los Executors.
- Mantiene el
- Executors (El Músculo): Son procesos JVM que corren en los nodos del clúster.
- Reciben código serializado del Driver.
- Ejecutan las tareas (
map,filter, etc.) sobre sus trozos de datos (particiones). - Almacenan los datos en su memoria RAM.
graph TB Driver["🧠 Driver<br/>(Tu Script Python)<br/><br/>- SparkContext<br/>- Planifica tareas<br/>- Coordina ejecución"]
Driver -->|"1. Envía código<br/>serializado"| E1["⚙️ Executor 1<br/>(Nodo A)<br/><br/>Ejecuta tareas<br/>Guarda en RAM"] Driver -->|"1. Envía código<br/>serializado"| E2["⚙️ Executor 2<br/>(Nodo B)<br/><br/>Ejecuta tareas<br/>Guarda en RAM"] Driver -->|"1. Envía código<br/>serializado"| E3["⚙️ Executor 3<br/>(Nodo C)<br/><br/>Ejecuta tareas<br/>Guarda en RAM"]
E1 -.->|"2. Reporta<br/>resultados"| Driver E2 -.->|"2. Reporta<br/>resultados"| Driver E3 -.->|"2. Reporta<br/>resultados"| Driver
style Driver fill:#e3f2fd,stroke:#1976d2,stroke-width:3px style E1 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style E2 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style E3 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px🔍 Demostración con Código: La mejor forma de entender esto es preguntándole al sistema “¿Quién soy?”.
import socket
# --- CÓDIGO DEL DRIVER ---# Esto se ejecuta 1 sola vez en tu máquina localprint(f"🏠 Soy el DRIVER. Estoy en: {socket.gethostname()}")
def procesar_dato(x): # --- CÓDIGO DEL EXECUTOR --- # Esta función se serializa y se envía a los nodos. # Se ejecutará millones de veces en paralelo. maquina = socket.gethostname() return f"Dato {x} procesado en {maquina}"
rdd = sc.parallelize([1, 2, 3, 4], 2)resultado = rdd.map(procesar_dato).collect()
# Al imprimir el resultado, verás nombres de máquinas diferentesfor linea in resultado: print(linea)Nota importante: Si pones un
print()dentro deprocesar_dato, NO lo verás en tu pantalla. Ese print ocurrirá en la consola de la máquina remota (Executor). Esta es la prueba definitiva de que el código se ha ido de tu ordenador.
2. Creación y Particionamiento
Section titled “2. Creación y Particionamiento”Todo empieza con el SparkContext (sc).
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FundamentosRDD").getOrCreate()sc = spark.sparkContext
# Datos de ejemplodata = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# IMPORTANTE: El segundo argumento (2) indica el número de particionesrdd_numeros = sc.parallelize(data, 2)
print(f"Número de particiones: {rdd_numeros.getNumPartitions()}")¿Qué está pasando físicamente?
Section titled “¿Qué está pasando físicamente?”Si tienes 2 particiones, los datos se cortan físicamente.
graph TD A["Datos: [1,2,3,4,5,6,7,8,9,10]"] --> B["Partición 0: [1,2,3,4,5]"] A --> C["Partición 1: [6,7,8,9,10]"] B --> D["Executor 1 (Nodo A)"] C --> E["Executor 2 (Nodo B)"]
style A fill:#fff3cd style B fill:#d1ecf1 style C fill:#d1ecf1 style D fill:#d4edda style E fill:#d4eddaRegla de oro: El paralelismo máximo es igual al número de particiones. Si tienes 50 CPUs pero solo 2 particiones, 48 CPUs estarán ociosas.
¿Cómo elegir el número óptimo de particiones?
Section titled “¿Cómo elegir el número óptimo de particiones?”Elegir mal el número de particiones puede arruinar el rendimiento. Aquí tienes las reglas:
Fórmula general:
Particiones = [2..4] × número de CPUs totales del clústerEjemplos prácticos:
| Escenario | CPUs | Particiones Recomendadas | Razón |
|---|---|---|---|
| Laptop local | 4 cores | 8-16 | Permite paralelismo mientras otras tareas corren |
| Clúster pequeño | 20 cores | 40-80 | Balance entre overhead y paralelismo |
| Clúster grande | 200 cores | 400-800 | Maximiza uso de recursos |
| Datos muy pequeños (<1GB) | Cualquiera | 4-8 | Demasiadas particiones = overhead innecesario |
| Datos enormes (>100GB) | 100 cores | 1000+ | Cada partición debe ser ~128MB idealmente |
⚠️ Problemas comunes:
- Muy pocas particiones: CPUs ociosas, tareas lentas, riesgo de Out of Memory.
- Demasiadas particiones: Overhead de gestión (cada partición es una tarea), más tiempo en scheduling.
Ejemplo: Ajustando particiones manualmente
# Leer archivo (Spark decide automáticamente según el tamaño)rdd = sc.textFile("datos.txt")print(f"Particiones automáticas: {rdd.getNumPartitions()}")
# Si es muy bajo, aumentar con repartition (CUIDADO: causa Shuffle)rdd_reparticionado = rdd.repartition(100)
# Si es muy alto, reducir con coalesce (SIN Shuffle, más eficiente)rdd_reducido = rdd.coalesce(10)💡 Diferencia clave:
repartition(n): Puede aumentar o reducir particiones. Causa Shuffle (lento).coalesce(n): Solo reduce particiones. Sin Shuffle (rápido). Usa esto cuando termines un filtro que eliminó el 90% de los datos.
3. Lazy Evaluation y el Linaje (DAG)
Section titled “3. Lazy Evaluation y el Linaje (DAG)”[!WARNING] Concepto crítico: Spark es “perezoso” (Lazy). Las transformaciones NO se ejecutan inmediatamente. Solo se ejecutan cuando llamas a una acción como
collect(),count(), osaveAsTextFile(). Esto es fundamental para entender cómo debuggear código Spark.
Spark es “perezoso” (Lazy). Cuando ejecutas transformaciones (map, filter), Spark NO procesa nada. Solo toma notas en una libreta llamada DAG (Grafo Acíclico Dirigido).
# Transformación 1 (No hace nada)rdd1 = sc.parallelize([1, 2, 3, 4])# Transformación 2 (No hace nada)rdd2 = rdd1.map(lambda x: x * 2)# Transformación 3 (No hace nada)rdd3 = rdd2.filter(lambda x: x > 5)
# ACCIÓN (¡BOOM! Se ejecuta todo)rdd3.collect()Visualización del DAG
Section titled “Visualización del DAG”Así es como Spark construye internamente el plan de ejecución:
graph LR A["RDD1: parallelize([1,2,3,4])"] -->|"map(x*2)"| B["RDD2: [2,4,6,8]"] B -->|"filter(x>5)"| C["RDD3: [6,8]"] C -->|"collect()"| D["🚀 ACCIÓN: Se ejecuta todo el DAG"]
style A fill:#e1f5ff style B fill:#e1f5ff style C fill:#e1f5ff style D fill:#ffcccc💡 Concepto clave: Las transformaciones (azul) solo construyen el plan. La acción (rojo) dispara la ejecución real.
El Linaje (La Receta)
Section titled “El Linaje (La Receta)”Si el Executor 2 se muere y pierde sus datos, Spark no llora. Mira su “receta” (Linaje):
- “Tenía que cargar datos…”
- “Multiplicar por 2…”
- “Filtrar mayores de 5…”
Spark simplemente busca otro ordenador y re-ejecuta los pasos solo para los datos perdidos. Spark no guarda los datos, guarda la receta para crearlos.
4. Transformaciones: Narrow vs Wide (La Clave del Rendimiento)
Section titled “4. Transformaciones: Narrow vs Wide (La Clave del Rendimiento)”El rendimiento de un trabajo en Spark depende drásticamente de cuánto se mueven los datos por la red. Podemos clasificar las transformaciones en dos categorías según si los datos pueden procesarse localmente o si requieren ser redistribuidos entre los nodos (Shuffle):
Comparación Visual: Narrow vs Wide
Section titled “Comparación Visual: Narrow vs Wide”graph TB subgraph "WIDE: Requiere Shuffle" W1["Partición 1<br/>(A:1, B:2)"] -.->|"Shuffle<br/>Red + Disco"| W3["Partición 1<br/>(A: [1,3])"] W2["Partición 2<br/>(A:3, B:4)"] -.->|"Shuffle<br/>Red + Disco"| W4["Partición 2<br/>(B: [2,4])"] W5["⚠️ Los datos viajan<br/>entre nodos"] end
subgraph "NARROW: Sin movimiento de datos" N1["Partición 1<br/>[1, 2, 3]"] -->|"map(x*2)"| N2["Partición 1<br/>[2, 4, 6]"] N3["Partición 2<br/>[4, 5, 6]"] -->|"map(x*2)"| N4["Partición 2<br/>[8, 10, 12]"] N5["✅ Todo en el mismo nodo<br/>Sin red, sin disco"] end
style N1 fill:#d4edda style N2 fill:#d4edda style N3 fill:#d4edda style N4 fill:#d4edda style N5 fill:#c3e6cb style W1 fill:#f8d7da style W2 fill:#f8d7da style W3 fill:#f8d7da style W4 fill:#f8d7da style W5 fill:#f5c6cbA. Narrow Transformations (Dependencias Estrechas) - “Las Rápidas”
Section titled “A. Narrow Transformations (Dependencias Estrechas) - “Las Rápidas””Ocurren cuando cada partición de salida depende solo de una partición de entrada.
- Sin tráfico de red: El dato se procesa en el mismo ordenador donde está.
- Pipelining: Spark es muy listo. Si encadenas
map().filter().map(), Spark fusiona las 3 operaciones en una sola pasada. Lee el dato una vez y le aplica las 3 funciones de golpe.
Ejemplos: map, filter, flatMap, union.
graph TB subgraph P1["PARTICIÓN 1 (Nodo 1)"] D1["📦 Dato: 1"] -->|"map(x*2)"| R1["Resultado: 2"] R1 -->|"filter(>5)"| F1["❌ Descartado<br/>(2 no es > 5)"]
D2["📦 Dato: 3"] -->|"map(x*2)"| R2["Resultado: 6"] R2 -->|"filter(>5)"| F2["✅ Pasa: 6"] end
subgraph P2["PARTICIÓN 2 (Nodo 2)"] D3["📦 Dato: 10"] -->|"map(x*2)"| R3["Resultado: 20"] R3 -->|"filter(>5)"| F3["✅ Pasa: 20"]
D4["📦 Dato: 15"] -->|"map(x*2)"| R4["Resultado: 30"] R4 -->|"filter(>5)"| F4["✅ Pasa: 30"] end
F2 --> Final["Resultado final:<br/>[6, 20, 30]"] F3 --> Final F4 --> Final
Note["💡 Pipeline Optimization<br/><br/>Spark fusiona map() y filter()<br/>en UNA SOLA pasada<br/><br/>Cada partición se procesa<br/>independientemente en su nodo<br/><br/>Sin red, sin disco<br/>Máxima velocidad"]
style D1 fill:#e3f2fd,stroke:#1976d2 style D2 fill:#e3f2fd,stroke:#1976d2 style D3 fill:#e3f2fd,stroke:#1976d2 style D4 fill:#e3f2fd,stroke:#1976d2 style R1 fill:#fff9c4,stroke:#f57c00 style R2 fill:#fff9c4,stroke:#f57c00 style R3 fill:#fff9c4,stroke:#f57c00 style R4 fill:#fff9c4,stroke:#f57c00 style F1 fill:#ffcdd2,stroke:#c62828 style F2 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style F3 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style F4 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style Final fill:#a5d6a7,stroke:#2e7d32,stroke-width:3px style Note fill:#fff9c4,stroke:#f57c00,stroke-width:2px style P1 fill:#f5f5f5,stroke:#757575 style P2 fill:#f5f5f5,stroke:#757575Código Python que corresponde al diagrama:
# Datos: 2 valores por partición (4 valores totales)datos = [1, 3, 10, 15]# Creamos explícitamente 2 particiones (como en el diagrama)rdd = sc.parallelize(datos, 2)
print(f"Número de particiones: {rdd.getNumPartitions()}") # Salida: 2
# Pipeline: map + filter (igual que en el diagrama)resultado = rdd.map(lambda x: x * 2) \ .filter(lambda x: x > 5)
print("Datos originales:", datos)print("Resultado final:", resultado.collect())
# Veamos el flujo paso a paso (como en el diagrama):## PARTICIÓN 1 (Nodo 1):# 📦 Dato: 1# → map(x*2) → 2# → filter(>5) → 2 > 5? NO ❌ Descartado## 📦 Dato: 3# → map(x*2) → 6# → filter(>5) → 6 > 5? SÍ ✅ Resultado: 6## PARTICIÓN 2 (Nodo 1):# 📦 Dato: 10# → map(x*2) → 20# → filter(>5) → 20 > 5? SÍ ✅ Resultado: 20## 📦 Dato: 15# → map(x*2) → 30# → filter(>5) → 30 > 5? SÍ ✅ Resultado: 30
# Salida:# Número de particiones: 2# Datos originales: [1, 3, 10, 15]# Resultado final: [6, 20, 30]## ✨ Spark fusiona map() y filter() en UNA SOLA pasada# Cada partición se procesa independientemente en su nodo# Todo ocurre en la CPU local, sin red, sin discoB. Wide Transformations (Dependencias Anchas) - “El Shuffle”
Section titled “B. Wide Transformations (Dependencias Anchas) - “El Shuffle””Ocurren cuando para calcular una fila del resultado, necesito leer datos de muchas particiones que podrían estar en otros ordenadores.
- Requiere Shuffle (Barajado): Los datos deben viajar por la red para agruparse.
- Rompe el Pipeline: Spark no puede seguir procesando linealmente. Debe detenerse, escribir datos intermedios en disco, enviarlos por la red y esperar a que lleguen todos. Esto divide el trabajo en Stages (Etapas).
Ejemplos: reduceByKey, groupByKey, sortByKey, distinct, join.
Visualización del Shuffle
Section titled “Visualización del Shuffle”El Shuffle es el proceso más costoso en Spark. Aquí puedes ver qué ocurre internamente:
graph TB subgraph "STAGE 1: Map Side (Nodos Originales)" A1["Nodo A<br/>Partición 0<br/>(X:1, Y:2, X:3)"] A2["Nodo B<br/>Partición 1<br/>(Y:4, X:5, Z:6)"] end
subgraph "Shuffle (Red + Disco)" A1 -->|"Escribir a disco<br/>Serializar"| S1["Shuffle Write"] A2 -->|"Escribir a disco<br/>Serializar"| S2["Shuffle Write"] S1 -->|"Red: Enviar datos<br/>por clave"| S3["Shuffle Read"] S2 -->|"Red: Enviar datos<br/>por clave"| S3 end
subgraph "STAGE 2: Reduce Side (Reagrupación)" S3 --> B1["Nodo C<br/>Clave X: [1,3,5]"] S3 --> B2["Nodo D<br/>Clave Y: [2,4]"] S3 --> B3["Nodo E<br/>Clave Z: [6]"] end
style A1 fill:#d1ecf1 style A2 fill:#d1ecf1 style S1 fill:#fff3cd style S2 fill:#fff3cd style S3 fill:#f8d7da style B1 fill:#d4edda style B2 fill:#d4edda style B3 fill:#d4edda[!CAUTION] El Shuffle es la operación MÁS COSTOSA en Spark
Costes del Shuffle:
- Red: Latencia y ancho de banda (los datos viajan entre máquinas)
- Disco: Los datos intermedios se escriben en disco (por seguridad ante fallos)
- Serialización: Coste de CPU para empaquetar/desempaquetar objetos
- Barrera de sincronización: El Stage 2 NO puede empezar hasta que el Stage 1 termine completamente
Regla de oro: Minimiza las operaciones Wide. Usa
reduceByKeyen lugar degroupByKeysiempre que sea posible.
Resumen Visual
Section titled “Resumen Visual”| Tipo | ¿Mueve datos? | ¿Coste? | Ejemplos |
|---|---|---|---|
| Narrow | ❌ No | Bajo (CPU) | map, filter, sample |
| Wide | ✅ Sí (Shuffle) | Alto (Red + Disco I/O) | reduceByKey, distinct, repartition |
🧪 Experimento: Midiendo la diferencia real
Section titled “🧪 Experimento: Midiendo la diferencia real”Vamos a demostrar esto con código. Ejecuta este bloque en tu Notebook. Crearemos una lista grande y compararemos tiempos.
import time
# 1. Crear un RDD "pesado" (5 millones de números)# 5 particiones para obligar a distribuir el trabajordd_grande = sc.parallelize(range(5000000), 5)
# --- PRUEBA 1: Narrow Transformation (Map + Filter) ---start_time = time.time()
# Multiplicamos y filtramos. Todo ocurre en memoria local.# Spark fusiona esto en una sola pasada (Pipeline).count_narrow = rdd_grande.map(lambda x: x * 2).filter(lambda x: x < 500).count()
end_time = time.time()print(f"⏱️ Tiempo Narrow (Local): {end_time - start_time:.4f} segundos")
# --- PRUEBA 2: Wide Transformation (GroupBy / Shuffle) ---start_time = time.time()
# Agrupamos por el último dígito (0-9).# ¡Esto obliga a mover los 5 millones de números por la red para ordenarlos!count_wide = rdd_grande.groupBy(lambda x: x % 10).count()
end_time = time.time()print(f"⏱️ Tiempo Wide (Shuffle): {end_time - start_time:.4f} segundos")Resultados típicos: Verás que la operación Wide es entre 5x y 10x más lenta, incluso en tu ordenador local. En un clúster real, la diferencia es aún mayor por la latencia de red.
¿Qué ver en la Spark UI (localhost:4040)?
- Ve a la pestaña “Stages”.
- Narrow: Verás 1 solo Stage (todas las tareas se hicieron de golpe).
- Wide: Verás 2 Stages. El primero prepara los datos y el segundo espera a que lleguen los datos del Shuffle (
Shuffle Read).
5. Operaciones Clave-Valor y Optimización
Section titled “5. Operaciones Clave-Valor y Optimización”El peligro de groupByKey vs reduceByKey vs aggregateByKey
Section titled “El peligro de groupByKey vs reduceByKey vs aggregateByKey”Estas tres funciones agrupan datos por clave, pero su rendimiento y casos de uso son radicalmente distintos.
Comparación Visual del Tráfico de Red
Section titled “Comparación Visual del Tráfico de Red”graph LR subgraph G["groupByKey - INEFICIENTE ❌"] direction LR G1["Nodo 1: (A,1), (A,1), (A,1)"] -->|"Envía 3 valores"| GR["Reducer<br/>Recibe: A → [1,1,1]"] G2["Nodo 2: (A,1), (A,1)"] -->|"Envía 2 valores"| GR end
style G1 fill:#f8d7da style G2 fill:#f8d7da style GR fill:#f5c6cbgraph LR
subgraph R["reduceByKey - EFICIENTE ✅"] direction LR R1["Nodo 1: (A,1), (A,1), (A,1)"] -->|"Pre-agrega → Envía 1 valor: (A,3)"| RR["Reducer<br/>Recibe: A → 3, 2"] R2["Nodo 2: (A,1), (A,1)"] -->|"Pre-agrega → Envía 1 valor: (A,2)"| RR end style R1 fill:#d4edda style R2 fill:#d4edda style RR fill:#c3e6cbTabla Comparativa Completa
Section titled “Tabla Comparativa Completa”| Característica | groupByKey() | reduceByKey(func) | aggregateByKey(zeroValue)(seqFunc, combFunc) |
|---|---|---|---|
| Pre-agregación | ❌ No | ✅ Sí (Combiner) | ✅ Sí (Combiner) |
| Tráfico de red | 🔴 Alto (todos los valores) | 🟢 Bajo (valores agregados) | 🟢 Bajo (valores agregados) |
| Riesgo OOM | 🔴 Alto | 🟢 Bajo | 🟢 Bajo |
| Tipo de salida | Mismo que entrada | Mismo que entrada | Puede cambiar |
| Cuándo usar | Casi nunca | Agregaciones simples (suma, max) | Agregaciones complejas (listas, sets, promedios) |
| Ejemplo | Agrupar logs por IP | Contar palabras | Calcular promedio, crear histogramas |
Ejemplos Prácticos
Section titled “Ejemplos Prácticos”Escenario: Contar palabras
palabras = [("hola", 1), ("mundo", 1), ("hola", 1), ("spark", 1), ("hola", 1)]rdd = sc.parallelize(palabras, 2)
# ❌ MALO: groupByKey (envía [1, 1, 1] por la red)conteo_malo = rdd.groupByKey().mapValues(sum)# Resultado: [('hola', 3), ('mundo', 1), ('spark', 1)]
# ✅ BUENO: reduceByKey (envía solo el total pre-agregado)conteo_bueno = rdd.reduceByKey(lambda a, b: a + b)# Resultado: [('hola', 3), ('mundo', 1), ('spark', 1)]Escenario Avanzado: Calcular promedio (requiere aggregateByKey)
# Datos: (nombre, nota)notas = [("Ana", 8), ("Juan", 6), ("Ana", 9), ("Juan", 7), ("Ana", 10)]rdd = sc.parallelize(notas)
# aggregateByKey permite cambiar el tipo de valor# Valor inicial: (suma, contador)promedio = rdd.aggregateByKey( (0, 0), # zeroValue: (suma_inicial, contador_inicial)
# seqFunc: Cómo combinar un valor nuevo con el acumulador en la misma partición lambda acc, valor: (acc[0] + valor, acc[1] + 1),
# combFunc: Cómo combinar dos acumuladores de diferentes particiones lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])).mapValues(lambda x: x[0] / x[1])
# Resultado: [('Ana', 9.0), ('Juan', 6.5)][!TIP] Regla de decisión para agregaciones:
groupByKey: Solo si realmente necesitas TODOS los valores agrupados (raro). Ejemplo: generar listas completas.reduceByKey: Para agregaciones donde el tipo de entrada = tipo de salida (suma, max, min, concatenación).aggregateByKey: Para agregaciones donde el tipo cambia (calcular promedio, crear sets, histogramas).
⚠️ Advertencia de rendimiento: En un dataset de 1TB con
groupByKey, podrías enviar 1TB por la red. ConreduceByKey, quizás solo 1GB. ¡1000x de diferencia!
6. Persistencia (Caching)
Section titled “6. Persistencia (Caching)”Por defecto, cada vez que ejecutas una acción sobre un RDD, Spark recalcula todo el linaje desde el principio.
Si vas a usar el mismo RDD varias veces, guárdalo en memoria.
rdd = sc.textFile("log_gigante.txt").map(...).filter(...)
# Marcamos para guardar en RAM la primera vez que se computerdd.cache()
print(rdd.count()) # Tarda 10 minutos (Lee archivo + map + filter + guarda en RAM)print(rdd.count()) # Tarda 0 segundos (Lee directo de RAM)Niveles de Almacenamiento
Section titled “Niveles de Almacenamiento”Spark ofrece diferentes estrategias de persistencia según tus necesidades:
| Nivel | Ubicación | Serializado | Replicado | Uso de Memoria | Cuándo usarlo |
|---|---|---|---|---|---|
MEMORY_ONLY | RAM | ❌ No | ❌ No | Alto | Por defecto. Datos pequeños que caben en RAM. Máximo rendimiento. |
MEMORY_ONLY_SER | RAM | ✅ Sí | ❌ No | Medio | Datos grandes. Ahorra RAM (5-10x) pero más CPU para deserializar. |
MEMORY_AND_DISK | RAM + Disco | ❌ No | ❌ No | Variable | Recomendado. Si no cabe en RAM, va a disco automáticamente. |
MEMORY_AND_DISK_SER | RAM + Disco | ✅ Sí | ❌ No | Bajo | Datos muy grandes. Combina ahorro de RAM + seguridad de disco. |
DISK_ONLY | Disco | ✅ Sí | ❌ No | Muy Bajo | Datos enormes que nunca cabrán en RAM. Evita recalcular. |
MEMORY_ONLY_2 | RAM | ❌ No | ✅ Sí (2x) | Muy Alto | Datos críticos. Tolerancia a fallos mejorada (2 copias). |
OFF_HEAP | Memoria externa | ✅ Sí | ❌ No | Bajo | Datos muy grandes. Evita el Garbage Collector de Java. |
💡 Regla práctica:
- Iteraciones de Machine Learning: Usa
MEMORY_AND_DISK(los datos se reutilizan muchas veces).- ETL de una sola pasada: No uses cache (es un desperdicio de memoria).
- Datos críticos en producción: Usa niveles replicados (
_2) para evitar recalcular si un nodo falla.
Ejemplo Práctico: Comparando niveles
Section titled “Ejemplo Práctico: Comparando niveles”from pyspark import StorageLevel
# Crear un RDD granderdd_grande = sc.parallelize(range(10000000), 10)
# Opción 1: Cache por defecto (MEMORY_ONLY)rdd_grande.cache()
# Opción 2: Especificar nivel manualmenterdd_grande.persist(StorageLevel.MEMORY_AND_DISK_SER)
# Ver qué está en cacheprint(rdd_grande.getStorageLevel())
# IMPORTANTE: Liberar memoria cuando ya no lo necesitesrdd_grande.unpersist()[!WARNING] Advertencia sobre cache():
cache()es solo un atajo parapersist(StorageLevel.MEMORY_ONLY). Si tus datos no caben en RAM, Spark descartará particiones y las recalculará cada vez (muy ineficiente).Recomendación: Usa
persist(StorageLevel.MEMORY_AND_DISK)para datos grandes que se reutilizan múltiples veces.
7. Variables Compartidas: Accumulators y Broadcast
Section titled “7. Variables Compartidas: Accumulators y Broadcast”Cuando trabajas en un clúster distribuido, hay dos problemas comunes:
- ¿Cómo contar eventos globales? (ej: líneas erróneas procesadas)
- ¿Cómo compartir datos de solo lectura eficientemente? (ej: tabla de lookup de 100MB)
Spark ofrece dos soluciones: Accumulators y Broadcast Variables.
A. Accumulators (Contadores Distribuidos)
Section titled “A. Accumulators (Contadores Distribuidos)”Los accumulators permiten sumar valores desde los Executors de forma segura. Solo el Driver puede leer el valor final.
Caso de uso típico: Contar errores, registros procesados, métricas de calidad de datos.
# Crear un acumuladorerrores = sc.accumulator(0)lineas_procesadas = sc.accumulator(0)
def procesar_linea(linea): global errores, lineas_procesadas lineas_procesadas.add(1)
try: # Intentar parsear campos = linea.split(",") if len(campos) != 3: errores.add(1) return None return (campos[0], int(campos[1])) except: errores.add(1) return None
# Procesar datosrdd = sc.textFile("datos_sucios.csv")rdd_limpio = rdd.map(procesar_linea).filter(lambda x: x is not None)
# IMPORTANTE: Forzar la acción para que se ejecuteresultado = rdd_limpio.collect()
# Ahora podemos leer los contadores en el Driverprint(f"✅ Líneas procesadas: {lineas_procesadas.value}")print(f"❌ Líneas con errores: {errores.value}")print(f"📊 Tasa de error: {errores.value / lineas_procesadas.value * 100:.2f}%")⚠️ Advertencias importantes:
- Los accumulators solo se actualizan cuando se ejecuta una acción.
- Si una tarea falla y se reintenta, el accumulator puede contar dos veces (no son transaccionales).
- Solo usa accumulators para debugging y métricas, no para lógica de negocio crítica.
B. Broadcast Variables (Datos de Solo Lectura Compartidos)
Section titled “B. Broadcast Variables (Datos de Solo Lectura Compartidos)”Las broadcast variables permiten enviar una sola copia de datos grandes a cada nodo, en lugar de enviarlos con cada tarea.
Sin Broadcast (MALO):
# Tabla de códigos postales (100MB)tabla_codigos = {"28001": "Madrid Centro", "08001": "Barcelona Centro", ...} # 100,000 entradas
# ❌ PROBLEMA: Esta tabla se serializa y envía con CADA TAREA# Si tienes 1000 tareas, envías 100GB por la red!rdd.map(lambda cp: tabla_codigos.get(cp, "Desconocido"))Con Broadcast (BUENO):
# ✅ SOLUCIÓN: Broadcast envía la tabla UNA SOLA VEZ a cada nodotabla_broadcast = sc.broadcast(tabla_codigos)
# Ahora cada Executor tiene una copia local en memoriardd.map(lambda cp: tabla_broadcast.value.get(cp, "Desconocido"))
# Liberar memoria cuando terminestabla_broadcast.unpersist()Comparación Visual
Section titled “Comparación Visual”graph TB subgraph "SIN Broadcast - Ineficiente ❌" D1["Driver<br/>Tabla: 100MB"] -->|"100MB"| T1["Tarea 1"] D1 -->|"100MB"| T2["Tarea 2"] D1 -->|"100MB"| T3["Tarea 3"] D1 -->|"100MB"| T4["Tarea ...1000"] T5["Total red: 100GB 😱"] end
subgraph "CON Broadcast - Eficiente ✅" D2["Driver<br/>Tabla: 100MB"] -->|"100MB"| N1["Nodo 1"] D2 -->|"100MB"| N2["Nodo 2"] N1 --> T6["Tareas 1-500<br/>usan copia local"] N2 --> T7["Tareas 501-1000<br/>usan copia local"] T8["Total red: 200MB 🎉"] end
style D1 fill:#f8d7da style T1 fill:#f8d7da style T2 fill:#f8d7da style T3 fill:#f8d7da style T4 fill:#f8d7da style T5 fill:#f5c6cb style D2 fill:#d4edda style N1 fill:#d4edda style N2 fill:#d4edda style T6 fill:#c3e6cb style T7 fill:#c3e6cb style T8 fill:#c3e6cbEjemplo Práctico Completo: Enriquecimiento de Datos
Section titled “Ejemplo Práctico Completo: Enriquecimiento de Datos”# Escenario: Tienes 1 billón de transacciones y una tabla de 10,000 productosproductos = { "P001": {"nombre": "Laptop", "categoria": "Electrónica"}, "P002": {"nombre": "Ratón", "categoria": "Accesorios"}, # ... 10,000 productos}
# Broadcast la tabla de productosproductos_bc = sc.broadcast(productos)
# Acumulador para contar productos no encontradosproductos_no_encontrados = sc.accumulator(0)
def enriquecer_transaccion(linea): # linea = "2024-01-01,P001,2,100.50" fecha, producto_id, cantidad, precio = linea.split(",")
# Buscar en la tabla broadcast info_producto = productos_bc.value.get(producto_id)
if info_producto is None: productos_no_encontrados.add(1) return None
return { "fecha": fecha, "producto": info_producto["nombre"], "categoria": info_producto["categoria"], "cantidad": int(cantidad), "total": float(precio) * int(cantidad) }
# Procesartransacciones = sc.textFile("transacciones.csv")transacciones_enriquecidas = transacciones.map(enriquecer_transaccion).filter(lambda x: x is not None)
# Ejecutarresultado = transacciones_enriquecidas.take(10)
print(f"Productos no encontrados: {productos_no_encontrados.value}")💡 Cuándo usar Broadcast:
- Tablas de lookup pequeñas/medianas (<2GB recomendado)
- Configuraciones, reglas de negocio, modelos de ML
- Cualquier dato de solo lectura que se usa en muchas tareas
⚠️ No uses Broadcast si:
- Los datos cambian frecuentemente (broadcast es inmutable)
- Los datos son enormes (>2GB puede causar problemas de memoria)
- Solo se usan en pocas tareas (no vale la pena el overhead)
8. Resumen de Operaciones Comunes
Section titled “8. Resumen de Operaciones Comunes”Acciones (Disparan el Job)
Section titled “Acciones (Disparan el Job)”| Acción | Descripción |
|---|---|
collect() | Trae todos los datos al Driver (¡Cuidado con la memoria!) |
count() | Cuenta elementos |
first() / take(n) | Devuelve los primeros N elementos |
saveAsTextFile(path) | Guarda a disco |
foreach(func) | Aplica función a cada elemento (útil para guardar en BBDD) |
Transformaciones
Section titled “Transformaciones”| Transf. | Tipo | Descripción |
|---|---|---|
map | Narrow | 1 entrada -> 1 salida exacta |
filter | Narrow | 1 entrada -> 0 o 1 salida |
flatMap | Narrow | 1 entrada -> 0, 1 o N salidas (aplana listas) |
distinct | Wide | Elimina duplicados (requiere Shuffle) |
union | Narrow | Une dos RDDs |
intersection | Wide | Intersección de conjuntos (requiere Shuffle) |
9. Errores Comunes y Debugging
Section titled “9. Errores Comunes y Debugging”A. Errores Típicos y Cómo Solucionarlos
Section titled “A. Errores Típicos y Cómo Solucionarlos”1. OutOfMemoryError en el Driver
java.lang.OutOfMemoryError: Java heap space (Driver)Causa: Llamaste a collect() sobre un RDD enorme y no cabe en la memoria del Driver.
Solución:
# ❌ MALO: Traer 100GB al Driverresultado = rdd_gigante.collect()
# ✅ BUENO: Opciones alternativas# Opción 1: Tomar solo una muestramuestra = rdd_gigante.take(100)
# Opción 2: Guardar a discordd_gigante.saveAsTextFile("resultado/")
# Opción 3: Contar o agregartotal = rdd_gigante.count()suma = rdd_gigante.reduce(lambda a, b: a + b)2. OutOfMemoryError en los Executors
java.lang.OutOfMemoryError: Java heap space (Executor)Causa: Particiones demasiado grandes o uso de groupByKey con muchos valores por clave.
Solución:
# Aumentar el número de particionesrdd_reparticionado = rdd.repartition(200)
# O usar reduceByKey en lugar de groupByKey# ❌ MALOrdd.groupByKey().mapValues(sum)
# ✅ BUENOrdd.reduceByKey(lambda a, b: a + b)3. Serialization Error
org.apache.spark.SparkException: Task not serializableCausa: Intentas enviar un objeto no serializable (ej: conexión a base de datos) a los Executors.
Solución:
# ❌ MALO: La conexión no es serializableconexion_db = crear_conexion()rdd.map(lambda x: conexion_db.query(x))
# ✅ BUENO: Crear la conexión dentro de la funcióndef procesar_con_db(x): # Cada Executor crea su propia conexión conexion = crear_conexion() resultado = conexion.query(x) conexion.close() return resultado
rdd.map(procesar_con_db)4. Shuffle Fetch Failed
org.apache.spark.shuffle.FetchFailedExceptionCausa: Un Executor murió durante el Shuffle o problemas de red.
Solución:
# Aumentar la configuración de reintentosspark.conf.set("spark.shuffle.io.maxRetries", "10")spark.conf.set("spark.shuffle.io.retryWait", "60s")
# Persistir antes de operaciones Widerdd.persist(StorageLevel.MEMORY_AND_DISK)resultado = rdd.reduceByKey(lambda a, b: a + b)5. Código que “no hace nada”
rdd = sc.textFile("archivo.txt")rdd_filtrado = rdd.filter(lambda x: "ERROR" in x)# No pasa nada... ¿Por qué?Causa: Lazy Evaluation. No has ejecutado ninguna acción.
Solución:
# Añadir una accióncount = rdd_filtrado.count() # Ahora sí se ejecutaB. Debugging con Spark UI (localhost:4040)
Section titled “B. Debugging con Spark UI (localhost:4040)”La Spark UI es tu mejor amiga para entender qué está pasando. Aquí está lo que debes mirar:
1. Pestaña “Jobs”
- Qué ver: Tiempo total de cada Job
- Busca: Jobs que tardan mucho más de lo esperado
- Acción: Haz clic en el Job para ver los Stages
2. Pestaña “Stages”
- Qué ver:
- Número de tareas (tasks)
- Tiempo de cada Stage
- Shuffle Read/Write
- Señales de alerta:
- Shuffle Write/Read muy alto: Optimiza con
reduceByKeyen lugar degroupByKey - Tareas desbalanceadas: Algunas tareas tardan 10x más que otras → Problema de data skew
- Muchos Stages: Demasiadas operaciones Wide → Intenta reducirlas
- Shuffle Write/Read muy alto: Optimiza con
3. Pestaña “Storage”
- Qué ver: RDDs en cache y cuánta memoria usan
- Acción: Si un RDD no cabe en memoria, cambia a
MEMORY_AND_DISK
4. Pestaña “Executors”
- Qué ver:
- Memoria usada por Executor
- Tareas completadas/fallidas
- Shuffle Read/Write por Executor
- Señales de alerta:
- Un Executor con muchas tareas fallidas: Ese nodo tiene problemas
- Memoria al 100%: Aumenta
spark.executor.memoryo reduce particiones
C. Checklist de Optimización
Section titled “C. Checklist de Optimización”Antes de quejarte de que “Spark es lento”, verifica:
- ¿Usas
reduceByKeyen lugar degroupByKey? - ¿El número de particiones es 2-4x el número de CPUs?
- ¿Cacheas RDDs que usas múltiples veces?
- ¿Evitas
collect()en datasets grandes? - ¿Usas broadcast para tablas de lookup?
- ¿Filtras datos lo antes posible en el pipeline?
- ¿Evitas operaciones Wide innecesarias?
💡 Regla de oro del debugging:
- Mira la Spark UI (localhost:4040)
- Identifica el Stage más lento
- Busca Shuffle Read/Write alto
- Optimiza esa operación específica
10. Ejercicios Cortos (Básicos)
Section titled “10. Ejercicios Cortos (Básicos)”Ejercicio 1: Creación y Particionamiento
Section titled “Ejercicio 1: Creación y Particionamiento”Objetivo: Entender parallelize y las particiones. Crea un RDD con una lista de números del 1 al 50. Configura el RDD para que tenga 5 particiones.
- Imprime por pantalla el número de particiones para verificarlo.
- Ejecuta una acción para contar cuántos elementos hay en total.
Ejercicio 2: Pipeline de Transformaciones (Filter + Map)
Section titled “Ejercicio 2: Pipeline de Transformaciones (Filter + Map)”Objetivo: Encadenar transformaciones filter y map.
Tienes una lista de precios en dólares: [10, 25, 40, 5, 100, 15].
- Filtra para quedarte solo con los precios mayores o iguales a 20.
- Convierte esos precios filtrados a euros (multiplica por 0.9).
- Muestra el resultado final usando collect().
Ejercicio 3: Aplanando listas con flatMap
Section titled “Ejercicio 3: Aplanando listas con flatMap”Objetivo: Diferenciar map de flatMap.
Tienes un RDD donde cada elemento es una frase de una lista de la compra:
['Leche,Huevos,Pan', 'Manzanas,Peras', 'Leche,Chocolate']
Obtén una lista única con todos los productos individuales.
Ejercicio 4: Teoría de Conjuntos
Section titled “Ejercicio 4: Teoría de Conjuntos”Objetivo: Usar intersection y distinct. Tenemos los correos electrónicos de los usuarios que visitaron la web ayer y los que la han visitado hoy.
- Ayer:
["a@test.com", "b@test.com", "c@test.com"] - Hoy:
["c@test.com", "d@test.com", "e@test.com"]Encuentra los correos de los usuarios que han entrado ambos días.
Ejercicio 5: Agregación por Clave (Word Count simplificado)
Section titled “Ejercicio 5: Agregación por Clave (Word Count simplificado)”Objetivo: Usar reduceByKey para sumar valores agrupados.
Tienes una lista de ventas de frutas donde se indica la fruta y la cantidad vendida:
[("Manzana", 2), ("Pera", 3), ("Manzana", 5), ("Plátano", 10), ("Pera", 2)].
Calcula el total de unidades vendidas por cada tipo de fruta.
Ejercicio 6: Parseo de CSV sucio
Section titled “Ejercicio 6: Parseo de CSV sucio”Objetivo: Combinar lectura, limpieza, transformación y ordenación.
Simula que lees un archivo CSV con datos de empleados (Nombre, Departamento, Salario).
Datos crudos: ["Juan,IT,2000", "Ana,Ventas,3000", "Maria,IT,2500", "Pedro,Ventas,1800"]
- Convierte cada línea en una tupla: (Departamento, Salario_float).
- Calcula el salario total que gasta la empresa por cada departamento.
- Ordena el resultado de mayor gasto a menor gasto.
9. Ejercicios Nuevos (Conceptos Avanzados)
Section titled “9. Ejercicios Nuevos (Conceptos Avanzados)”Ejercicio 1: Pipeline y Shuffle
Section titled “Ejercicio 1: Pipeline y Shuffle”Dada una lista de ventas: ("Madrid", 100), ("Barcelona", 200), ("Madrid", 50), ("Valencia", 50)
- Filtra las ventas menores de 60.
- Suma las ventas por ciudad.
- Ordena el resultado por facturación (de mayor a menor). Pista: filter -> reduceByKey -> sortBy
Ejercicio 2: Diferencia Map vs FlatMap
Section titled “Ejercicio 2: Diferencia Map vs FlatMap”Tienes un RDD con frases: ["Hola mundo", "Apache Spark es guay"].
- Aplica
mapconsplit(" "). ¿Qué estructura obtienes? - Aplica
flatMapconsplit(" "). ¿Qué diferencia ves en elcollect()?
Ejercicio 3: Análisis de Logs (Proyecto Mini)
Section titled “Ejercicio 3: Análisis de Logs (Proyecto Mini)”Archivo simulado: ip,hora,metodo,url
192.168.1.1,10:00,GET,/index192.168.1.2,10:01,POST,/login192.168.1.1,10:02,GET,/imagen.jpg- Carga el archivo.
- Cuenta cuántas peticiones ha hecho cada IP (
mapa par clave-valor +reduceByKey). - Filtra las IPs que han hecho más de 1 petición.
- Guarda el resultado en un archivo de texto.
10. Proyecto Práctico: Análisis de Calificaciones 🎓
Section titled “10. Proyecto Práctico: Análisis de Calificaciones 🎓”Vamos a aplicar todo lo aprendido (Parseo, Tuplas, ReduceByKey, Operaciones de Conjuntos) en un escenario realista.
Escenario: Eres el administrador de un instituto. Tienes las notas de tres asignaturas en archivos de texto separados y sucios. Necesitas unificar los datos y generar un informe estadístico usando RDDs.
Estructura de Datos de Entrada
Section titled “Estructura de Datos de Entrada”Tres archivos con formato: nombre,nota
notas_mates.txt:Angel,6Maria,2Ramon,4.5
notas_fisica.txt:Angel,9Maria,3
notas_ingles.txt:Angel,4Maria,6Ejercicios
Section titled “Ejercicios”Intenta resolver estos problemas usando las transformaciones adecuadas:
-
- Nota mas baja por alumno
-
- Nota media por alumno
-
- Notables o sobresalientes por alumno (nota >= 7)
-
- Alumnos que no se presentaron a Ingles
-
- Numero de asignaturas por alumno
-
- RDD con cada alumno y todas sus notas
RDD vs DataFrame: ¿Cuándo usar RDD?
Section titled “RDD vs DataFrame: ¿Cuándo usar RDD?”Aunque el DataFrame es superior en rendimiento (gracias al optimizador Catalyst), usaremos RDDs cuando:
- Los datos no tienen estructura (texto libre, audio, logs binarios).
- Necesitas transformaciones matemáticas complejas a bajo nivel que SQL no permite fácilmente.
- La estructura de los datos es muy irregular.
Para todo lo demás (CSV, JSON, Parquet, Tablas SQL), usa DataFrames.