Skip to content

Fundamentos de RDD en Spark

Aunque en Spark 3.x el 95% del tiempo trabajarás con DataFrames (tablas estructuradas), los DataFrames son solo una capa de abstracción sobre los RDDs. Entender los RDDs es entender cómo piensa Spark.

Si no entiendes RDDs, escribirás código que funciona, pero que es lento o ineficiente.

RDD significa Resilient Distributed Dataset. Desglosemos el acrónimo con una arquitectura real:

  1. Resilient (Resiliente): Tolerante a fallos. Si un nodo se rompe, Spark reconstruye los datos perdidos automáticamente usando el Linaje (ver más abajo).
  2. Distributed (Distribuido): Los datos no están en un solo ordenador. Están troceados y repartidos por la memoria RAM de un clúster.
  3. Dataset: Es la colección de datos inmutable (no se puede modificar).

¿Qué significa inmutable?

Una vez creado un RDD, NO puedes cambiar sus datos. Cada transformación (map, filter) crea un nuevo RDD, el original permanece intacto.

# Ejemplo de inmutabilidad
rdd_original = sc.parallelize([1, 2, 3, 4, 5])
# Aplicamos una transformación
rdd_multiplicado = rdd_original.map(lambda x: x * 2)
print("RDD original:", rdd_original.collect()) # [1, 2, 3, 4, 5]
print("RDD multiplicado:", rdd_multiplicado.collect()) # [2, 4, 6, 8, 10]
# El RDD original NO ha cambiado ✅
# Se creó un NUEVO RDD con los datos transformados

¿Por qué es importante la inmutabilidad?

  • Tolerancia a fallos: Spark puede reconstruir datos perdidos re-ejecutando las transformaciones
  • Paralelismo seguro: Múltiples tareas pueden leer el mismo RDD sin conflictos
  • Linaje (DAG): Spark mantiene un historial de transformaciones para optimización

En Spark, tu aplicación no corre en un solo lugar. Se divide físicamente en dos componentes que pueden estar en ordenadores distintos:

  1. Driver (El Cerebro): Es el proceso donde corre tu script Python (main).
    • Mantiene el SparkContext.
    • Convierte tu código en un plan de ejecución (DAG).
    • Envía las tareas a los Executors.
  2. Executors (El Músculo): Son procesos JVM que corren en los nodos del clúster.
    • Reciben código serializado del Driver.
    • Ejecutan las tareas (map, filter, etc.) sobre sus trozos de datos (particiones).
    • Almacenan los datos en su memoria RAM.
graph TB
Driver["🧠 Driver<br/>(Tu Script Python)<br/><br/>- SparkContext<br/>- Planifica tareas<br/>- Coordina ejecución"]
Driver -->|"1. Envía código<br/>serializado"| E1["⚙️ Executor 1<br/>(Nodo A)<br/><br/>Ejecuta tareas<br/>Guarda en RAM"]
Driver -->|"1. Envía código<br/>serializado"| E2["⚙️ Executor 2<br/>(Nodo B)<br/><br/>Ejecuta tareas<br/>Guarda en RAM"]
Driver -->|"1. Envía código<br/>serializado"| E3["⚙️ Executor 3<br/>(Nodo C)<br/><br/>Ejecuta tareas<br/>Guarda en RAM"]
E1 -.->|"2. Reporta<br/>resultados"| Driver
E2 -.->|"2. Reporta<br/>resultados"| Driver
E3 -.->|"2. Reporta<br/>resultados"| Driver
style Driver fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style E1 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style E2 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style E3 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px

🔍 Demostración con Código: La mejor forma de entender esto es preguntándole al sistema “¿Quién soy?”.

import socket
# --- CÓDIGO DEL DRIVER ---
# Esto se ejecuta 1 sola vez en tu máquina local
print(f"🏠 Soy el DRIVER. Estoy en: {socket.gethostname()}")
def procesar_dato(x):
# --- CÓDIGO DEL EXECUTOR ---
# Esta función se serializa y se envía a los nodos.
# Se ejecutará millones de veces en paralelo.
maquina = socket.gethostname()
return f"Dato {x} procesado en {maquina}"
rdd = sc.parallelize([1, 2, 3, 4], 2)
resultado = rdd.map(procesar_dato).collect()
# Al imprimir el resultado, verás nombres de máquinas diferentes
for linea in resultado:
print(linea)

Nota importante: Si pones un print() dentro de procesar_dato, NO lo verás en tu pantalla. Ese print ocurrirá en la consola de la máquina remota (Executor). Esta es la prueba definitiva de que el código se ha ido de tu ordenador.

Todo empieza con el SparkContext (sc).

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FundamentosRDD").getOrCreate()
sc = spark.sparkContext
# Datos de ejemplo
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# IMPORTANTE: El segundo argumento (2) indica el número de particiones
rdd_numeros = sc.parallelize(data, 2)
print(f"Número de particiones: {rdd_numeros.getNumPartitions()}")

Si tienes 2 particiones, los datos se cortan físicamente.

graph TD
A["Datos: [1,2,3,4,5,6,7,8,9,10]"] --> B["Partición 0: [1,2,3,4,5]"]
A --> C["Partición 1: [6,7,8,9,10]"]
B --> D["Executor 1 (Nodo A)"]
C --> E["Executor 2 (Nodo B)"]
style A fill:#fff3cd
style B fill:#d1ecf1
style C fill:#d1ecf1
style D fill:#d4edda
style E fill:#d4edda

Regla de oro: El paralelismo máximo es igual al número de particiones. Si tienes 50 CPUs pero solo 2 particiones, 48 CPUs estarán ociosas.

¿Cómo elegir el número óptimo de particiones?

Section titled “¿Cómo elegir el número óptimo de particiones?”

Elegir mal el número de particiones puede arruinar el rendimiento. Aquí tienes las reglas:

Fórmula general:

Particiones = [2..4] × número de CPUs totales del clúster

Ejemplos prácticos:

EscenarioCPUsParticiones RecomendadasRazón
Laptop local4 cores8-16Permite paralelismo mientras otras tareas corren
Clúster pequeño20 cores40-80Balance entre overhead y paralelismo
Clúster grande200 cores400-800Maximiza uso de recursos
Datos muy pequeños (<1GB)Cualquiera4-8Demasiadas particiones = overhead innecesario
Datos enormes (>100GB)100 cores1000+Cada partición debe ser ~128MB idealmente

⚠️ Problemas comunes:

  • Muy pocas particiones: CPUs ociosas, tareas lentas, riesgo de Out of Memory.
  • Demasiadas particiones: Overhead de gestión (cada partición es una tarea), más tiempo en scheduling.

Ejemplo: Ajustando particiones manualmente

# Leer archivo (Spark decide automáticamente según el tamaño)
rdd = sc.textFile("datos.txt")
print(f"Particiones automáticas: {rdd.getNumPartitions()}")
# Si es muy bajo, aumentar con repartition (CUIDADO: causa Shuffle)
rdd_reparticionado = rdd.repartition(100)
# Si es muy alto, reducir con coalesce (SIN Shuffle, más eficiente)
rdd_reducido = rdd.coalesce(10)

💡 Diferencia clave:

  • repartition(n): Puede aumentar o reducir particiones. Causa Shuffle (lento).
  • coalesce(n): Solo reduce particiones. Sin Shuffle (rápido). Usa esto cuando termines un filtro que eliminó el 90% de los datos.

[!WARNING] Concepto crítico: Spark es “perezoso” (Lazy). Las transformaciones NO se ejecutan inmediatamente. Solo se ejecutan cuando llamas a una acción como collect(), count(), o saveAsTextFile(). Esto es fundamental para entender cómo debuggear código Spark.

Spark es “perezoso” (Lazy). Cuando ejecutas transformaciones (map, filter), Spark NO procesa nada. Solo toma notas en una libreta llamada DAG (Grafo Acíclico Dirigido).

# Transformación 1 (No hace nada)
rdd1 = sc.parallelize([1, 2, 3, 4])
# Transformación 2 (No hace nada)
rdd2 = rdd1.map(lambda x: x * 2)
# Transformación 3 (No hace nada)
rdd3 = rdd2.filter(lambda x: x > 5)
# ACCIÓN (¡BOOM! Se ejecuta todo)
rdd3.collect()

Así es como Spark construye internamente el plan de ejecución:

graph LR
A["RDD1: parallelize([1,2,3,4])"] -->|"map(x*2)"| B["RDD2: [2,4,6,8]"]
B -->|"filter(x>5)"| C["RDD3: [6,8]"]
C -->|"collect()"| D["🚀 ACCIÓN: Se ejecuta todo el DAG"]
style A fill:#e1f5ff
style B fill:#e1f5ff
style C fill:#e1f5ff
style D fill:#ffcccc

💡 Concepto clave: Las transformaciones (azul) solo construyen el plan. La acción (rojo) dispara la ejecución real.

Si el Executor 2 se muere y pierde sus datos, Spark no llora. Mira su “receta” (Linaje):

  1. “Tenía que cargar datos…”
  2. “Multiplicar por 2…”
  3. “Filtrar mayores de 5…”

Spark simplemente busca otro ordenador y re-ejecuta los pasos solo para los datos perdidos. Spark no guarda los datos, guarda la receta para crearlos.

4. Transformaciones: Narrow vs Wide (La Clave del Rendimiento)

Section titled “4. Transformaciones: Narrow vs Wide (La Clave del Rendimiento)”

El rendimiento de un trabajo en Spark depende drásticamente de cuánto se mueven los datos por la red. Podemos clasificar las transformaciones en dos categorías según si los datos pueden procesarse localmente o si requieren ser redistribuidos entre los nodos (Shuffle):

graph TB
subgraph "WIDE: Requiere Shuffle"
W1["Partición 1<br/>(A:1, B:2)"] -.->|"Shuffle<br/>Red + Disco"| W3["Partición 1<br/>(A: [1,3])"]
W2["Partición 2<br/>(A:3, B:4)"] -.->|"Shuffle<br/>Red + Disco"| W4["Partición 2<br/>(B: [2,4])"]
W5["⚠️ Los datos viajan<br/>entre nodos"]
end
subgraph "NARROW: Sin movimiento de datos"
N1["Partición 1<br/>[1, 2, 3]"] -->|"map(x*2)"| N2["Partición 1<br/>[2, 4, 6]"]
N3["Partición 2<br/>[4, 5, 6]"] -->|"map(x*2)"| N4["Partición 2<br/>[8, 10, 12]"]
N5["✅ Todo en el mismo nodo<br/>Sin red, sin disco"]
end
style N1 fill:#d4edda
style N2 fill:#d4edda
style N3 fill:#d4edda
style N4 fill:#d4edda
style N5 fill:#c3e6cb
style W1 fill:#f8d7da
style W2 fill:#f8d7da
style W3 fill:#f8d7da
style W4 fill:#f8d7da
style W5 fill:#f5c6cb

A. Narrow Transformations (Dependencias Estrechas) - “Las Rápidas”

Section titled “A. Narrow Transformations (Dependencias Estrechas) - “Las Rápidas””

Ocurren cuando cada partición de salida depende solo de una partición de entrada.

  • Sin tráfico de red: El dato se procesa en el mismo ordenador donde está.
  • Pipelining: Spark es muy listo. Si encadenas map().filter().map(), Spark fusiona las 3 operaciones en una sola pasada. Lee el dato una vez y le aplica las 3 funciones de golpe.

Ejemplos: map, filter, flatMap, union.

graph TB
subgraph P1["PARTICIÓN 1 (Nodo 1)"]
D1["📦 Dato: 1"] -->|"map(x*2)"| R1["Resultado: 2"]
R1 -->|"filter(>5)"| F1["❌ Descartado<br/>(2 no es > 5)"]
D2["📦 Dato: 3"] -->|"map(x*2)"| R2["Resultado: 6"]
R2 -->|"filter(>5)"| F2["✅ Pasa: 6"]
end
subgraph P2["PARTICIÓN 2 (Nodo 2)"]
D3["📦 Dato: 10"] -->|"map(x*2)"| R3["Resultado: 20"]
R3 -->|"filter(>5)"| F3["✅ Pasa: 20"]
D4["📦 Dato: 15"] -->|"map(x*2)"| R4["Resultado: 30"]
R4 -->|"filter(>5)"| F4["✅ Pasa: 30"]
end
F2 --> Final["Resultado final:<br/>[6, 20, 30]"]
F3 --> Final
F4 --> Final
Note["💡 Pipeline Optimization<br/><br/>Spark fusiona map() y filter()<br/>en UNA SOLA pasada<br/><br/>Cada partición se procesa<br/>independientemente en su nodo<br/><br/>Sin red, sin disco<br/>Máxima velocidad"]
style D1 fill:#e3f2fd,stroke:#1976d2
style D2 fill:#e3f2fd,stroke:#1976d2
style D3 fill:#e3f2fd,stroke:#1976d2
style D4 fill:#e3f2fd,stroke:#1976d2
style R1 fill:#fff9c4,stroke:#f57c00
style R2 fill:#fff9c4,stroke:#f57c00
style R3 fill:#fff9c4,stroke:#f57c00
style R4 fill:#fff9c4,stroke:#f57c00
style F1 fill:#ffcdd2,stroke:#c62828
style F2 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style F3 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style F4 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style Final fill:#a5d6a7,stroke:#2e7d32,stroke-width:3px
style Note fill:#fff9c4,stroke:#f57c00,stroke-width:2px
style P1 fill:#f5f5f5,stroke:#757575
style P2 fill:#f5f5f5,stroke:#757575

Código Python que corresponde al diagrama:

# Datos: 2 valores por partición (4 valores totales)
datos = [1, 3, 10, 15]
# Creamos explícitamente 2 particiones (como en el diagrama)
rdd = sc.parallelize(datos, 2)
print(f"Número de particiones: {rdd.getNumPartitions()}") # Salida: 2
# Pipeline: map + filter (igual que en el diagrama)
resultado = rdd.map(lambda x: x * 2) \
.filter(lambda x: x > 5)
print("Datos originales:", datos)
print("Resultado final:", resultado.collect())
# Veamos el flujo paso a paso (como en el diagrama):
#
# PARTICIÓN 1 (Nodo 1):
# 📦 Dato: 1
# → map(x*2) → 2
# → filter(>5) → 2 > 5? NO ❌ Descartado
#
# 📦 Dato: 3
# → map(x*2) → 6
# → filter(>5) → 6 > 5? SÍ ✅ Resultado: 6
#
# PARTICIÓN 2 (Nodo 1):
# 📦 Dato: 10
# → map(x*2) → 20
# → filter(>5) → 20 > 5? SÍ ✅ Resultado: 20
#
# 📦 Dato: 15
# → map(x*2) → 30
# → filter(>5) → 30 > 5? SÍ ✅ Resultado: 30
# Salida:
# Número de particiones: 2
# Datos originales: [1, 3, 10, 15]
# Resultado final: [6, 20, 30]
#
# ✨ Spark fusiona map() y filter() en UNA SOLA pasada
# Cada partición se procesa independientemente en su nodo
# Todo ocurre en la CPU local, sin red, sin disco

B. Wide Transformations (Dependencias Anchas) - “El Shuffle”

Section titled “B. Wide Transformations (Dependencias Anchas) - “El Shuffle””

Ocurren cuando para calcular una fila del resultado, necesito leer datos de muchas particiones que podrían estar en otros ordenadores.

  • Requiere Shuffle (Barajado): Los datos deben viajar por la red para agruparse.
  • Rompe el Pipeline: Spark no puede seguir procesando linealmente. Debe detenerse, escribir datos intermedios en disco, enviarlos por la red y esperar a que lleguen todos. Esto divide el trabajo en Stages (Etapas).

Ejemplos: reduceByKey, groupByKey, sortByKey, distinct, join.

El Shuffle es el proceso más costoso en Spark. Aquí puedes ver qué ocurre internamente:

graph TB
subgraph "STAGE 1: Map Side (Nodos Originales)"
A1["Nodo A<br/>Partición 0<br/>(X:1, Y:2, X:3)"]
A2["Nodo B<br/>Partición 1<br/>(Y:4, X:5, Z:6)"]
end
subgraph "Shuffle (Red + Disco)"
A1 -->|"Escribir a disco<br/>Serializar"| S1["Shuffle Write"]
A2 -->|"Escribir a disco<br/>Serializar"| S2["Shuffle Write"]
S1 -->|"Red: Enviar datos<br/>por clave"| S3["Shuffle Read"]
S2 -->|"Red: Enviar datos<br/>por clave"| S3
end
subgraph "STAGE 2: Reduce Side (Reagrupación)"
S3 --> B1["Nodo C<br/>Clave X: [1,3,5]"]
S3 --> B2["Nodo D<br/>Clave Y: [2,4]"]
S3 --> B3["Nodo E<br/>Clave Z: [6]"]
end
style A1 fill:#d1ecf1
style A2 fill:#d1ecf1
style S1 fill:#fff3cd
style S2 fill:#fff3cd
style S3 fill:#f8d7da
style B1 fill:#d4edda
style B2 fill:#d4edda
style B3 fill:#d4edda

[!CAUTION] El Shuffle es la operación MÁS COSTOSA en Spark

Costes del Shuffle:

  1. Red: Latencia y ancho de banda (los datos viajan entre máquinas)
  2. Disco: Los datos intermedios se escriben en disco (por seguridad ante fallos)
  3. Serialización: Coste de CPU para empaquetar/desempaquetar objetos
  4. Barrera de sincronización: El Stage 2 NO puede empezar hasta que el Stage 1 termine completamente

Regla de oro: Minimiza las operaciones Wide. Usa reduceByKey en lugar de groupByKey siempre que sea posible.

Tipo¿Mueve datos?¿Coste?Ejemplos
Narrow❌ NoBajo (CPU)map, filter, sample
Wide✅ Sí (Shuffle)Alto (Red + Disco I/O)reduceByKey, distinct, repartition

🧪 Experimento: Midiendo la diferencia real

Section titled “🧪 Experimento: Midiendo la diferencia real”

Vamos a demostrar esto con código. Ejecuta este bloque en tu Notebook. Crearemos una lista grande y compararemos tiempos.

import time
# 1. Crear un RDD "pesado" (5 millones de números)
# 5 particiones para obligar a distribuir el trabajo
rdd_grande = sc.parallelize(range(5000000), 5)
# --- PRUEBA 1: Narrow Transformation (Map + Filter) ---
start_time = time.time()
# Multiplicamos y filtramos. Todo ocurre en memoria local.
# Spark fusiona esto en una sola pasada (Pipeline).
count_narrow = rdd_grande.map(lambda x: x * 2).filter(lambda x: x < 500).count()
end_time = time.time()
print(f"⏱️ Tiempo Narrow (Local): {end_time - start_time:.4f} segundos")
# --- PRUEBA 2: Wide Transformation (GroupBy / Shuffle) ---
start_time = time.time()
# Agrupamos por el último dígito (0-9).
# ¡Esto obliga a mover los 5 millones de números por la red para ordenarlos!
count_wide = rdd_grande.groupBy(lambda x: x % 10).count()
end_time = time.time()
print(f"⏱️ Tiempo Wide (Shuffle): {end_time - start_time:.4f} segundos")

Resultados típicos: Verás que la operación Wide es entre 5x y 10x más lenta, incluso en tu ordenador local. En un clúster real, la diferencia es aún mayor por la latencia de red.

¿Qué ver en la Spark UI (localhost:4040)?

  • Ve a la pestaña “Stages”.
  • Narrow: Verás 1 solo Stage (todas las tareas se hicieron de golpe).
  • Wide: Verás 2 Stages. El primero prepara los datos y el segundo espera a que lleguen los datos del Shuffle (Shuffle Read).

5. Operaciones Clave-Valor y Optimización

Section titled “5. Operaciones Clave-Valor y Optimización”

El peligro de groupByKey vs reduceByKey vs aggregateByKey

Section titled “El peligro de groupByKey vs reduceByKey vs aggregateByKey”

Estas tres funciones agrupan datos por clave, pero su rendimiento y casos de uso son radicalmente distintos.

graph LR
subgraph G["groupByKey - INEFICIENTE ❌"]
direction LR
G1["Nodo 1: (A,1), (A,1), (A,1)"] -->|"Envía 3 valores"| GR["Reducer<br/>Recibe: A → [1,1,1]"]
G2["Nodo 2: (A,1), (A,1)"] -->|"Envía 2 valores"| GR
end
style G1 fill:#f8d7da
style G2 fill:#f8d7da
style GR fill:#f5c6cb
graph LR
subgraph R["reduceByKey - EFICIENTE ✅"]
direction LR
R1["Nodo 1: (A,1), (A,1), (A,1)"] -->|"Pre-agrega → Envía 1 valor: (A,3)"| RR["Reducer<br/>Recibe: A → 3, 2"]
R2["Nodo 2: (A,1), (A,1)"] -->|"Pre-agrega → Envía 1 valor: (A,2)"| RR
end
style R1 fill:#d4edda
style R2 fill:#d4edda
style RR fill:#c3e6cb
CaracterísticagroupByKey()reduceByKey(func)aggregateByKey(zeroValue)(seqFunc, combFunc)
Pre-agregación❌ No✅ Sí (Combiner)✅ Sí (Combiner)
Tráfico de red🔴 Alto (todos los valores)🟢 Bajo (valores agregados)🟢 Bajo (valores agregados)
Riesgo OOM🔴 Alto🟢 Bajo🟢 Bajo
Tipo de salidaMismo que entradaMismo que entradaPuede cambiar
Cuándo usarCasi nuncaAgregaciones simples (suma, max)Agregaciones complejas (listas, sets, promedios)
EjemploAgrupar logs por IPContar palabrasCalcular promedio, crear histogramas

Escenario: Contar palabras

palabras = [("hola", 1), ("mundo", 1), ("hola", 1), ("spark", 1), ("hola", 1)]
rdd = sc.parallelize(palabras, 2)
# ❌ MALO: groupByKey (envía [1, 1, 1] por la red)
conteo_malo = rdd.groupByKey().mapValues(sum)
# Resultado: [('hola', 3), ('mundo', 1), ('spark', 1)]
# ✅ BUENO: reduceByKey (envía solo el total pre-agregado)
conteo_bueno = rdd.reduceByKey(lambda a, b: a + b)
# Resultado: [('hola', 3), ('mundo', 1), ('spark', 1)]

Escenario Avanzado: Calcular promedio (requiere aggregateByKey)

# Datos: (nombre, nota)
notas = [("Ana", 8), ("Juan", 6), ("Ana", 9), ("Juan", 7), ("Ana", 10)]
rdd = sc.parallelize(notas)
# aggregateByKey permite cambiar el tipo de valor
# Valor inicial: (suma, contador)
promedio = rdd.aggregateByKey(
(0, 0), # zeroValue: (suma_inicial, contador_inicial)
# seqFunc: Cómo combinar un valor nuevo con el acumulador en la misma partición
lambda acc, valor: (acc[0] + valor, acc[1] + 1),
# combFunc: Cómo combinar dos acumuladores de diferentes particiones
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
).mapValues(lambda x: x[0] / x[1])
# Resultado: [('Ana', 9.0), ('Juan', 6.5)]

[!TIP] Regla de decisión para agregaciones:

  • groupByKey: Solo si realmente necesitas TODOS los valores agrupados (raro). Ejemplo: generar listas completas.
  • reduceByKey: Para agregaciones donde el tipo de entrada = tipo de salida (suma, max, min, concatenación).
  • aggregateByKey: Para agregaciones donde el tipo cambia (calcular promedio, crear sets, histogramas).

⚠️ Advertencia de rendimiento: En un dataset de 1TB con groupByKey, podrías enviar 1TB por la red. Con reduceByKey, quizás solo 1GB. ¡1000x de diferencia!

Por defecto, cada vez que ejecutas una acción sobre un RDD, Spark recalcula todo el linaje desde el principio.

Si vas a usar el mismo RDD varias veces, guárdalo en memoria.

rdd = sc.textFile("log_gigante.txt").map(...).filter(...)
# Marcamos para guardar en RAM la primera vez que se compute
rdd.cache()
print(rdd.count()) # Tarda 10 minutos (Lee archivo + map + filter + guarda en RAM)
print(rdd.count()) # Tarda 0 segundos (Lee directo de RAM)

Spark ofrece diferentes estrategias de persistencia según tus necesidades:

NivelUbicaciónSerializadoReplicadoUso de MemoriaCuándo usarlo
MEMORY_ONLYRAM❌ No❌ NoAltoPor defecto. Datos pequeños que caben en RAM. Máximo rendimiento.
MEMORY_ONLY_SERRAM✅ Sí❌ NoMedioDatos grandes. Ahorra RAM (5-10x) pero más CPU para deserializar.
MEMORY_AND_DISKRAM + Disco❌ No❌ NoVariableRecomendado. Si no cabe en RAM, va a disco automáticamente.
MEMORY_AND_DISK_SERRAM + Disco✅ Sí❌ NoBajoDatos muy grandes. Combina ahorro de RAM + seguridad de disco.
DISK_ONLYDisco✅ Sí❌ NoMuy BajoDatos enormes que nunca cabrán en RAM. Evita recalcular.
MEMORY_ONLY_2RAM❌ No✅ Sí (2x)Muy AltoDatos críticos. Tolerancia a fallos mejorada (2 copias).
OFF_HEAPMemoria externa✅ Sí❌ NoBajoDatos muy grandes. Evita el Garbage Collector de Java.

💡 Regla práctica:

  • Iteraciones de Machine Learning: Usa MEMORY_AND_DISK (los datos se reutilizan muchas veces).
  • ETL de una sola pasada: No uses cache (es un desperdicio de memoria).
  • Datos críticos en producción: Usa niveles replicados (_2) para evitar recalcular si un nodo falla.
from pyspark import StorageLevel
# Crear un RDD grande
rdd_grande = sc.parallelize(range(10000000), 10)
# Opción 1: Cache por defecto (MEMORY_ONLY)
rdd_grande.cache()
# Opción 2: Especificar nivel manualmente
rdd_grande.persist(StorageLevel.MEMORY_AND_DISK_SER)
# Ver qué está en cache
print(rdd_grande.getStorageLevel())
# IMPORTANTE: Liberar memoria cuando ya no lo necesites
rdd_grande.unpersist()

[!WARNING] Advertencia sobre cache():

cache() es solo un atajo para persist(StorageLevel.MEMORY_ONLY). Si tus datos no caben en RAM, Spark descartará particiones y las recalculará cada vez (muy ineficiente).

Recomendación: Usa persist(StorageLevel.MEMORY_AND_DISK) para datos grandes que se reutilizan múltiples veces.


7. Variables Compartidas: Accumulators y Broadcast

Section titled “7. Variables Compartidas: Accumulators y Broadcast”

Cuando trabajas en un clúster distribuido, hay dos problemas comunes:

  1. ¿Cómo contar eventos globales? (ej: líneas erróneas procesadas)
  2. ¿Cómo compartir datos de solo lectura eficientemente? (ej: tabla de lookup de 100MB)

Spark ofrece dos soluciones: Accumulators y Broadcast Variables.

Los accumulators permiten sumar valores desde los Executors de forma segura. Solo el Driver puede leer el valor final.

Caso de uso típico: Contar errores, registros procesados, métricas de calidad de datos.

# Crear un acumulador
errores = sc.accumulator(0)
lineas_procesadas = sc.accumulator(0)
def procesar_linea(linea):
global errores, lineas_procesadas
lineas_procesadas.add(1)
try:
# Intentar parsear
campos = linea.split(",")
if len(campos) != 3:
errores.add(1)
return None
return (campos[0], int(campos[1]))
except:
errores.add(1)
return None
# Procesar datos
rdd = sc.textFile("datos_sucios.csv")
rdd_limpio = rdd.map(procesar_linea).filter(lambda x: x is not None)
# IMPORTANTE: Forzar la acción para que se ejecute
resultado = rdd_limpio.collect()
# Ahora podemos leer los contadores en el Driver
print(f"✅ Líneas procesadas: {lineas_procesadas.value}")
print(f"❌ Líneas con errores: {errores.value}")
print(f"📊 Tasa de error: {errores.value / lineas_procesadas.value * 100:.2f}%")

⚠️ Advertencias importantes:

  • Los accumulators solo se actualizan cuando se ejecuta una acción.
  • Si una tarea falla y se reintenta, el accumulator puede contar dos veces (no son transaccionales).
  • Solo usa accumulators para debugging y métricas, no para lógica de negocio crítica.

B. Broadcast Variables (Datos de Solo Lectura Compartidos)

Section titled “B. Broadcast Variables (Datos de Solo Lectura Compartidos)”

Las broadcast variables permiten enviar una sola copia de datos grandes a cada nodo, en lugar de enviarlos con cada tarea.

Sin Broadcast (MALO):

# Tabla de códigos postales (100MB)
tabla_codigos = {"28001": "Madrid Centro", "08001": "Barcelona Centro", ...} # 100,000 entradas
# ❌ PROBLEMA: Esta tabla se serializa y envía con CADA TAREA
# Si tienes 1000 tareas, envías 100GB por la red!
rdd.map(lambda cp: tabla_codigos.get(cp, "Desconocido"))

Con Broadcast (BUENO):

# ✅ SOLUCIÓN: Broadcast envía la tabla UNA SOLA VEZ a cada nodo
tabla_broadcast = sc.broadcast(tabla_codigos)
# Ahora cada Executor tiene una copia local en memoria
rdd.map(lambda cp: tabla_broadcast.value.get(cp, "Desconocido"))
# Liberar memoria cuando termines
tabla_broadcast.unpersist()
graph TB
subgraph "SIN Broadcast - Ineficiente ❌"
D1["Driver<br/>Tabla: 100MB"] -->|"100MB"| T1["Tarea 1"]
D1 -->|"100MB"| T2["Tarea 2"]
D1 -->|"100MB"| T3["Tarea 3"]
D1 -->|"100MB"| T4["Tarea ...1000"]
T5["Total red: 100GB 😱"]
end
subgraph "CON Broadcast - Eficiente ✅"
D2["Driver<br/>Tabla: 100MB"] -->|"100MB"| N1["Nodo 1"]
D2 -->|"100MB"| N2["Nodo 2"]
N1 --> T6["Tareas 1-500<br/>usan copia local"]
N2 --> T7["Tareas 501-1000<br/>usan copia local"]
T8["Total red: 200MB 🎉"]
end
style D1 fill:#f8d7da
style T1 fill:#f8d7da
style T2 fill:#f8d7da
style T3 fill:#f8d7da
style T4 fill:#f8d7da
style T5 fill:#f5c6cb
style D2 fill:#d4edda
style N1 fill:#d4edda
style N2 fill:#d4edda
style T6 fill:#c3e6cb
style T7 fill:#c3e6cb
style T8 fill:#c3e6cb

Ejemplo Práctico Completo: Enriquecimiento de Datos

Section titled “Ejemplo Práctico Completo: Enriquecimiento de Datos”
# Escenario: Tienes 1 billón de transacciones y una tabla de 10,000 productos
productos = {
"P001": {"nombre": "Laptop", "categoria": "Electrónica"},
"P002": {"nombre": "Ratón", "categoria": "Accesorios"},
# ... 10,000 productos
}
# Broadcast la tabla de productos
productos_bc = sc.broadcast(productos)
# Acumulador para contar productos no encontrados
productos_no_encontrados = sc.accumulator(0)
def enriquecer_transaccion(linea):
# linea = "2024-01-01,P001,2,100.50"
fecha, producto_id, cantidad, precio = linea.split(",")
# Buscar en la tabla broadcast
info_producto = productos_bc.value.get(producto_id)
if info_producto is None:
productos_no_encontrados.add(1)
return None
return {
"fecha": fecha,
"producto": info_producto["nombre"],
"categoria": info_producto["categoria"],
"cantidad": int(cantidad),
"total": float(precio) * int(cantidad)
}
# Procesar
transacciones = sc.textFile("transacciones.csv")
transacciones_enriquecidas = transacciones.map(enriquecer_transaccion).filter(lambda x: x is not None)
# Ejecutar
resultado = transacciones_enriquecidas.take(10)
print(f"Productos no encontrados: {productos_no_encontrados.value}")

💡 Cuándo usar Broadcast:

  • Tablas de lookup pequeñas/medianas (<2GB recomendado)
  • Configuraciones, reglas de negocio, modelos de ML
  • Cualquier dato de solo lectura que se usa en muchas tareas

⚠️ No uses Broadcast si:

  • Los datos cambian frecuentemente (broadcast es inmutable)
  • Los datos son enormes (>2GB puede causar problemas de memoria)
  • Solo se usan en pocas tareas (no vale la pena el overhead)

AcciónDescripción
collect()Trae todos los datos al Driver (¡Cuidado con la memoria!)
count()Cuenta elementos
first() / take(n)Devuelve los primeros N elementos
saveAsTextFile(path)Guarda a disco
foreach(func)Aplica función a cada elemento (útil para guardar en BBDD)
Transf.TipoDescripción
mapNarrow1 entrada -> 1 salida exacta
filterNarrow1 entrada -> 0 o 1 salida
flatMapNarrow1 entrada -> 0, 1 o N salidas (aplana listas)
distinctWideElimina duplicados (requiere Shuffle)
unionNarrowUne dos RDDs
intersectionWideIntersección de conjuntos (requiere Shuffle)

1. OutOfMemoryError en el Driver

java.lang.OutOfMemoryError: Java heap space (Driver)

Causa: Llamaste a collect() sobre un RDD enorme y no cabe en la memoria del Driver.

Solución:

# ❌ MALO: Traer 100GB al Driver
resultado = rdd_gigante.collect()
# ✅ BUENO: Opciones alternativas
# Opción 1: Tomar solo una muestra
muestra = rdd_gigante.take(100)
# Opción 2: Guardar a disco
rdd_gigante.saveAsTextFile("resultado/")
# Opción 3: Contar o agregar
total = rdd_gigante.count()
suma = rdd_gigante.reduce(lambda a, b: a + b)

2. OutOfMemoryError en los Executors

java.lang.OutOfMemoryError: Java heap space (Executor)

Causa: Particiones demasiado grandes o uso de groupByKey con muchos valores por clave.

Solución:

# Aumentar el número de particiones
rdd_reparticionado = rdd.repartition(200)
# O usar reduceByKey en lugar de groupByKey
# ❌ MALO
rdd.groupByKey().mapValues(sum)
# ✅ BUENO
rdd.reduceByKey(lambda a, b: a + b)

3. Serialization Error

org.apache.spark.SparkException: Task not serializable

Causa: Intentas enviar un objeto no serializable (ej: conexión a base de datos) a los Executors.

Solución:

# ❌ MALO: La conexión no es serializable
conexion_db = crear_conexion()
rdd.map(lambda x: conexion_db.query(x))
# ✅ BUENO: Crear la conexión dentro de la función
def procesar_con_db(x):
# Cada Executor crea su propia conexión
conexion = crear_conexion()
resultado = conexion.query(x)
conexion.close()
return resultado
rdd.map(procesar_con_db)

4. Shuffle Fetch Failed

org.apache.spark.shuffle.FetchFailedException

Causa: Un Executor murió durante el Shuffle o problemas de red.

Solución:

# Aumentar la configuración de reintentos
spark.conf.set("spark.shuffle.io.maxRetries", "10")
spark.conf.set("spark.shuffle.io.retryWait", "60s")
# Persistir antes de operaciones Wide
rdd.persist(StorageLevel.MEMORY_AND_DISK)
resultado = rdd.reduceByKey(lambda a, b: a + b)

5. Código que “no hace nada”

rdd = sc.textFile("archivo.txt")
rdd_filtrado = rdd.filter(lambda x: "ERROR" in x)
# No pasa nada... ¿Por qué?

Causa: Lazy Evaluation. No has ejecutado ninguna acción.

Solución:

# Añadir una acción
count = rdd_filtrado.count() # Ahora sí se ejecuta

B. Debugging con Spark UI (localhost:4040)

Section titled “B. Debugging con Spark UI (localhost:4040)”

La Spark UI es tu mejor amiga para entender qué está pasando. Aquí está lo que debes mirar:

1. Pestaña “Jobs”

  • Qué ver: Tiempo total de cada Job
  • Busca: Jobs que tardan mucho más de lo esperado
  • Acción: Haz clic en el Job para ver los Stages

2. Pestaña “Stages”

  • Qué ver:
    • Número de tareas (tasks)
    • Tiempo de cada Stage
    • Shuffle Read/Write
  • Señales de alerta:
    • Shuffle Write/Read muy alto: Optimiza con reduceByKey en lugar de groupByKey
    • Tareas desbalanceadas: Algunas tareas tardan 10x más que otras → Problema de data skew
    • Muchos Stages: Demasiadas operaciones Wide → Intenta reducirlas

3. Pestaña “Storage”

  • Qué ver: RDDs en cache y cuánta memoria usan
  • Acción: Si un RDD no cabe en memoria, cambia a MEMORY_AND_DISK

4. Pestaña “Executors”

  • Qué ver:
    • Memoria usada por Executor
    • Tareas completadas/fallidas
    • Shuffle Read/Write por Executor
  • Señales de alerta:
    • Un Executor con muchas tareas fallidas: Ese nodo tiene problemas
    • Memoria al 100%: Aumenta spark.executor.memory o reduce particiones

Antes de quejarte de que “Spark es lento”, verifica:

  • ¿Usas reduceByKey en lugar de groupByKey?
  • ¿El número de particiones es 2-4x el número de CPUs?
  • ¿Cacheas RDDs que usas múltiples veces?
  • ¿Evitas collect() en datasets grandes?
  • ¿Usas broadcast para tablas de lookup?
  • ¿Filtras datos lo antes posible en el pipeline?
  • ¿Evitas operaciones Wide innecesarias?

💡 Regla de oro del debugging:

  1. Mira la Spark UI (localhost:4040)
  2. Identifica el Stage más lento
  3. Busca Shuffle Read/Write alto
  4. Optimiza esa operación específica

Objetivo: Entender parallelize y las particiones. Crea un RDD con una lista de números del 1 al 50. Configura el RDD para que tenga 5 particiones.

  • Imprime por pantalla el número de particiones para verificarlo.
  • Ejecuta una acción para contar cuántos elementos hay en total.

Ejercicio 2: Pipeline de Transformaciones (Filter + Map)

Section titled “Ejercicio 2: Pipeline de Transformaciones (Filter + Map)”

Objetivo: Encadenar transformaciones filter y map. Tienes una lista de precios en dólares: [10, 25, 40, 5, 100, 15].

  • Filtra para quedarte solo con los precios mayores o iguales a 20.
  • Convierte esos precios filtrados a euros (multiplica por 0.9).
  • Muestra el resultado final usando collect().

Objetivo: Diferenciar map de flatMap. Tienes un RDD donde cada elemento es una frase de una lista de la compra: ['Leche,Huevos,Pan', 'Manzanas,Peras', 'Leche,Chocolate'] Obtén una lista única con todos los productos individuales.

Objetivo: Usar intersection y distinct. Tenemos los correos electrónicos de los usuarios que visitaron la web ayer y los que la han visitado hoy.

  • Ayer: ["a@test.com", "b@test.com", "c@test.com"]
  • Hoy: ["c@test.com", "d@test.com", "e@test.com"] Encuentra los correos de los usuarios que han entrado ambos días.

Ejercicio 5: Agregación por Clave (Word Count simplificado)

Section titled “Ejercicio 5: Agregación por Clave (Word Count simplificado)”

Objetivo: Usar reduceByKey para sumar valores agrupados. Tienes una lista de ventas de frutas donde se indica la fruta y la cantidad vendida: [("Manzana", 2), ("Pera", 3), ("Manzana", 5), ("Plátano", 10), ("Pera", 2)]. Calcula el total de unidades vendidas por cada tipo de fruta.

Objetivo: Combinar lectura, limpieza, transformación y ordenación. Simula que lees un archivo CSV con datos de empleados (Nombre, Departamento, Salario). Datos crudos: ["Juan,IT,2000", "Ana,Ventas,3000", "Maria,IT,2500", "Pedro,Ventas,1800"]

  • Convierte cada línea en una tupla: (Departamento, Salario_float).
  • Calcula el salario total que gasta la empresa por cada departamento.
  • Ordena el resultado de mayor gasto a menor gasto.

9. Ejercicios Nuevos (Conceptos Avanzados)

Section titled “9. Ejercicios Nuevos (Conceptos Avanzados)”

Dada una lista de ventas: ("Madrid", 100), ("Barcelona", 200), ("Madrid", 50), ("Valencia", 50)

  1. Filtra las ventas menores de 60.
  2. Suma las ventas por ciudad.
  3. Ordena el resultado por facturación (de mayor a menor). Pista: filter -> reduceByKey -> sortBy

Tienes un RDD con frases: ["Hola mundo", "Apache Spark es guay"].

  1. Aplica map con split(" "). ¿Qué estructura obtienes?
  2. Aplica flatMap con split(" "). ¿Qué diferencia ves en el collect()?

Ejercicio 3: Análisis de Logs (Proyecto Mini)

Section titled “Ejercicio 3: Análisis de Logs (Proyecto Mini)”

Archivo simulado: ip,hora,metodo,url

192.168.1.1,10:00,GET,/index
192.168.1.2,10:01,POST,/login
192.168.1.1,10:02,GET,/imagen.jpg
  1. Carga el archivo.
  2. Cuenta cuántas peticiones ha hecho cada IP (map a par clave-valor + reduceByKey).
  3. Filtra las IPs que han hecho más de 1 petición.
  4. Guarda el resultado en un archivo de texto.

10. Proyecto Práctico: Análisis de Calificaciones 🎓

Section titled “10. Proyecto Práctico: Análisis de Calificaciones 🎓”

Vamos a aplicar todo lo aprendido (Parseo, Tuplas, ReduceByKey, Operaciones de Conjuntos) en un escenario realista.

Escenario: Eres el administrador de un instituto. Tienes las notas de tres asignaturas en archivos de texto separados y sucios. Necesitas unificar los datos y generar un informe estadístico usando RDDs.

Tres archivos con formato: nombre,nota

notas_mates.txt:
Angel,6
Maria,2
Ramon,4.5
notas_fisica.txt:
Angel,9
Maria,3
notas_ingles.txt:
Angel,4
Maria,6

Intenta resolver estos problemas usando las transformaciones adecuadas:

    1. Nota mas baja por alumno
    1. Nota media por alumno
    1. Notables o sobresalientes por alumno (nota >= 7)
    1. Alumnos que no se presentaron a Ingles
    1. Numero de asignaturas por alumno
    1. RDD con cada alumno y todas sus notas

Aunque el DataFrame es superior en rendimiento (gracias al optimizador Catalyst), usaremos RDDs cuando:

  1. Los datos no tienen estructura (texto libre, audio, logs binarios).
  2. Necesitas transformaciones matemáticas complejas a bajo nivel que SQL no permite fácilmente.
  3. La estructura de los datos es muy irregular.

Para todo lo demás (CSV, JSON, Parquet, Tablas SQL), usa DataFrames.