Skip to content

Fundamentos de RDD en Spark

🚀 Introducción: ¿Por Qué Aprender RDDs?

Section titled “🚀 Introducción: ¿Por Qué Aprender RDDs?”

Nota Importante: Aunque en Spark 3.x el 95% del tiempo trabajarás con DataFrames, los DataFrames son solo una capa de abstracción sobre los RDDs. Entender los RDDs es entender cómo piensa Spark.

Sin entender RDDs: Escribirás código que funciona, pero que es lento o ineficiente.
Entendiendo RDDs: Sabrás exactamente qué hace Spark internamente y cómo optimizar tus jobs.


RDD significa Resilient Distributed Dataset. Desglosemos cada palabra:

  1. Resilient (Resiliente): Tolerante a fallos. Si un nodo falla, Spark reconstruye los datos automáticamente.
  2. Distributed (Distribuido): Los datos están repartidos por la memoria RAM de múltiples ordenadores.
  3. Dataset: Es una colección de datos inmutable (no se puede modificar).

Una vez creado un RDD, NO puedes cambiar sus datos. Cada transformación crea un nuevo RDD.

# Crear un RDD original
rdd_original = sc.parallelize([1, 2, 3, 4, 5])
# Aplicar una transformación (crea un NUEVO RDD)
rdd_multiplicado = rdd_original.map(lambda x: x * 2)
# Verificar que el original NO cambió
print("RDD original:", rdd_original.collect()) # [1, 2, 3, 4, 5]
print("RDD multiplicado:", rdd_multiplicado.collect()) # [2, 4, 6, 8, 10]

Ejemplo 2: Reasignar variable NO modifica el RDD original

Section titled “Ejemplo 2: Reasignar variable NO modifica el RDD original”
# Crear RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# Reasignar la variable 'rdd' a un nuevo RDD
# Esto NO modifica los datos del RDD anterior,
# solo hace que la variable apunte a uno nuevo.
rdd = rdd.map(lambda x: x * 10)
print(rdd.collect()) # [10, 20, 30, 40, 50]

Ejemplo 3: Intentar “modificar” un RDD (NO funciona como esperas)

Section titled “Ejemplo 3: Intentar “modificar” un RDD (NO funciona como esperas)”
# Crear RDD
numeros = sc.parallelize([1, 2, 3, 4, 5])
# Esto NO modifica 'numeros', crea un nuevo RDD que se descarta si no lo asignas
numeros.map(lambda x: x * 2) # ⚠️ El resultado se pierde
print(numeros.collect()) # [1, 2, 3, 4, 5] - ¡Sin cambios!
# Correcto: Asignar a una variable
numeros_dobles = numeros.map(lambda x: x * 2)

¿Por qué es importante la inmutabilidad?

  • Tolerancia a fallos: Spark puede reconstruir datos perdidos re-ejecutando transformaciones
  • Paralelismo seguro: Múltiples tareas pueden leer el mismo RDD sin conflictos
  • Optimización: Spark mantiene un historial (DAG) para optimizar la ejecución

Vamos a crear tu primer RDD paso a paso:

from pyspark.sql import SparkSession
# Paso 1: Crear la sesión de Spark
spark = SparkSession.builder.appName("MiPrimerRDD").getOrCreate()
sc = spark.sparkContext
# Paso 2: Crear un RDD desde una lista
datos = [1, 2, 3, 4, 5]
mi_primer_rdd = sc.parallelize(datos)
# Paso 3: Verificar que se creó correctamente
print("Número de elementos:", mi_primer_rdd.count()) # 5
print("Primeros 3 elementos:", mi_primer_rdd.take(3)) # [1, 2, 3]
print("Todos los elementos:", mi_primer_rdd.collect()) # [1, 2, 3, 4, 5]

🎓 Explicación:

  • parallelize(): Convierte una lista de Python en un RDD distribuido
  • count(): Acción que cuenta los elementos
  • take(n): Acción que devuelve los primeros n elementos
  • collect(): Acción que trae todos los datos al Driver

Antes de continuar, asegúrate de que puedes:

  • Explicar qué significa “inmutable” con tus propias palabras
  • Crear un RDD desde una lista de Python
  • Usar count() y collect() para ver los datos

Mini-Quiz:

  1. ¿Qué devuelve rdd.map(lambda x: x*2)?

    • a) Una lista de Python
    • b) Un nuevo RDD ✅
    • c) Modifica el RDD original
  2. ¿Cuál de estas es una ACCIÓN?

    • a) map()
    • b) filter()
    • c) count()
Ver explicación

Respuesta 1: b) Un nuevo RDD. Las transformaciones siempre devuelven un nuevo RDD.

Respuesta 2: c) count() es una acción porque dispara la ejecución y devuelve un resultado al Driver.


Esta es la distinción MÁS IMPORTANTE en Spark:

Tipo¿Qué hace?¿Cuándo se ejecuta?Ejemplos
TransformaciónCrea un nuevo RDD❌ NO se ejecuta (lazy)map, filter, flatMap
AcciónDevuelve un resultado✅ Ejecuta TODO el pipelinecollect, count, take

Ejemplo Visual:

# Ninguna de estas líneas ejecuta nada
rdd1 = sc.parallelize([1, 2, 3, 4])
rdd2 = rdd1.map(lambda x: x * 2) # Solo toma nota
rdd3 = rdd2.filter(lambda x: x > 5) # Solo toma nota
# ESTA línea ejecuta todo el pipeline
resultado = rdd3.collect() # ¡Ahora sí se ejecuta!
print(resultado) # [6, 8]

map() aplica una función a cada elemento del RDD y devuelve un nuevo RDD.

Regla: 1 elemento de entrada → 1 elemento de salida

# Crear RDD
numeros = sc.parallelize([1, 2, 3, 4, 5])
# Aplicar map (NO se ejecuta aún)
dobles = numeros.map(lambda x: x * 2)
# Ejecutar con acción
print(dobles.collect()) # [2, 4, 6, 8, 10]
# Crear RDD de palabras
palabras = sc.parallelize(["hola", "mundo", "spark"])
# Aplicar map
mayusculas = palabras.map(lambda palabra: palabra.upper())
# Ejecutar
print(mayusculas.collect()) # ['HOLA', 'MUNDO', 'SPARK']
# Crear RDD
palabras = sc.parallelize(["hola", "mundo", "spark"])
# Aplicar map para obtener longitudes
longitudes = palabras.map(lambda palabra: len(palabra))
# Ejecutar
print(longitudes.collect()) # [4, 5, 5]
# Crear RDD de tuplas (nombre, edad)
personas = sc.parallelize([("Ana", 25), ("Juan", 30), ("María", 28)])
# Extraer solo nombres
nombres = personas.map(lambda persona: persona[0])
print(nombres.collect()) # ['Ana', 'Juan', 'María']
# Incrementar edad en 1
personas_mayor = personas.map(lambda p: (p[0], p[1] + 1))
print(personas_mayor.collect()) # [('Ana', 26), ('Juan', 31), ('María', 29)]

filter() selecciona solo los elementos que cumplen una condición.

Regla: 1 elemento de entrada → 0 o 1 elemento de salida

# Crear RDD
numeros = sc.parallelize([1, 2, 3, 4, 5, 6])
# Filtrar pares
pares = numeros.filter(lambda x: x % 2 == 0)
# Ejecutar
print(pares.collect()) # [2, 4, 6]
# Crear RDD
palabras = sc.parallelize(["hola", "mundo", "spark", "python"])
# Filtrar palabras con más de 4 letras
largas = palabras.filter(lambda palabra: len(palabra) > 4)
# Ejecutar
print(largas.collect()) # ['mundo', 'spark', 'python']

Ejemplo 3: Filtrar por condición compleja

Section titled “Ejemplo 3: Filtrar por condición compleja”
# Crear RDD de tuplas (producto, precio)
productos = sc.parallelize([
("Laptop", 1000),
("Mouse", 25),
("Teclado", 75),
("Monitor", 300)
])
# Filtrar productos caros (precio > 100)
caros = productos.filter(lambda p: p[1] > 100)
# Ejecutar
print(caros.collect()) # [('Laptop', 1000), ('Monitor', 300)]
# Crear RDD
numeros = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# Pipeline: multiplicar por 2, luego filtrar mayores a 10
resultado = numeros.map(lambda x: x * 2) \
.filter(lambda x: x > 10)
# Ejecutar
print(resultado.collect()) # [12, 14, 16, 18, 20]

flatMap() es como map(), pero “aplana” listas anidadas.

Regla: 1 elemento de entrada → 0, 1 o N elementos de salida

# Crear RDD de frases
frases = sc.parallelize(["Hola mundo", "Apache Spark es genial"])
# Con map() obtienes listas anidadas
con_map = frases.map(lambda frase: frase.split(" "))
print(con_map.collect())
# [['Hola', 'mundo'], ['Apache', 'Spark', 'es', 'genial']]
# Con flatMap() obtienes una lista plana
con_flatmap = frases.flatMap(lambda frase: frase.split(" "))
print(con_flatmap.collect())
# ['Hola', 'mundo', 'Apache', 'Spark', 'es', 'genial']
# Crear RDD
numeros = sc.parallelize([1, 2, 3])
# flatMap para generar rangos
rangos = numeros.flatMap(lambda x: range(1, x + 1))
# Ejecutar
print(rangos.collect()) # [1, 1, 2, 1, 2, 3]
# Crear RDD de líneas CSV
csv_lines = sc.parallelize([
"Leche,Huevos,Pan",
"Manzanas,Peras",
"Leche,Chocolate"
])
# Dividir y aplanar
productos = csv_lines.flatMap(lambda linea: linea.split(","))
# Ejecutar
print(productos.collect())
# ['Leche', 'Huevos', 'Pan', 'Manzanas', 'Peras', 'Leche', 'Chocolate']

Elimina elementos duplicados.

⚠️ Nota: distinct() es una transformación Wide (causa Shuffle).

# Crear RDD con duplicados
numeros = sc.parallelize([1, 2, 2, 3, 3, 3, 4, 5, 5])
# Eliminar duplicados
unicos = numeros.distinct()
# Ejecutar
print(unicos.collect()) # [1, 2, 3, 4, 5] (orden puede variar)

Une dos RDDs (permite duplicados).

# Crear dos RDDs
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
# Unir (permite duplicados)
union = rdd1.union(rdd2)
# Ejecutar
print(union.collect()) # [1, 2, 3, 3, 4, 5]
# Si quieres sin duplicados
union_sin_duplicados = rdd1.union(rdd2).distinct()
print(union_sin_duplicados.collect()) # [1, 2, 3, 4, 5]

Devuelve solo elementos que están en AMBOS RDDs.

⚠️ Nota: intersection() es una transformación Wide (causa Shuffle).

# Usuarios que visitaron la web ayer y hoy
ayer = sc.parallelize(["a@test.com", "b@test.com", "c@test.com"])
hoy = sc.parallelize(["c@test.com", "d@test.com", "e@test.com"])
# Usuarios que visitaron ambos días
ambos_dias = ayer.intersection(hoy)
# Ejecutar
print(ambos_dias.collect()) # ['c@test.com']

Trae todos los datos del RDD al Driver.

⚠️ Peligro: Si el RDD es muy grande, puede causar OutOfMemoryError.

# Crear RDD pequeño
numeros = sc.parallelize([1, 2, 3, 4, 5])
# Traer todos los datos
datos = numeros.collect()
# 'datos' es una lista de Python
print(type(datos)) # <class 'list'>
print(datos) # [1, 2, 3, 4, 5]

Cuenta el número de elementos.

# Crear RDD
palabras = sc.parallelize(["hola", "mundo", "spark", "python"])
# Contar elementos
total = palabras.count()
# Ejecutar
print(f"Total de palabras: {total}") # Total de palabras: 4

Devuelve el primer elemento.

# Crear RDD
numeros = sc.parallelize([10, 20, 30, 40, 50])
# Obtener primer elemento
primero = numeros.first()
# Ejecutar
print(f"Primer elemento: {primero}") # Primer elemento: 10

Devuelve los primeros N elementos.

# Crear RDD
numeros = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# Tomar primeros 5
primeros_5 = numeros.take(5)
# Ejecutar
print(primeros_5) # [1, 2, 3, 4, 5]

Agrega todos los elementos usando una función.

# Crear RDD
numeros = sc.parallelize([1, 2, 3, 4, 5])
# Sumar todos los elementos
suma = numeros.reduce(lambda a, b: a + b)
# Ejecutar
print(f"Suma total: {suma}") # Suma total: 15
# Crear RDD
numeros = sc.parallelize([10, 5, 20, 15, 8])
# Encontrar máximo
maximo = numeros.reduce(lambda a, b: a if a > b else b)
# Ejecutar
print(f"Máximo: {maximo}") # Máximo: 20

Aplica una función a cada elemento (útil para efectos secundarios como guardar en BD).

# Crear RDD
numeros = sc.parallelize([1, 2, 3, 4, 5])
# Imprimir cada elemento (se ejecuta en los Executors)
numeros.foreach(lambda x: print(f"Procesando: {x}"))
# Nota: Los prints aparecen en los logs de los Executors, no en tu consola

Guarda el RDD en archivos de texto.

# Crear RDD
palabras = sc.parallelize(["hola", "mundo", "spark"])
# Guardar a disco
palabras.saveAsTextFile("output/palabras")
# Esto crea una carpeta 'output/palabras' con archivos:
# part-00000, part-00001, etc.

✅ Checkpoint 3: Practica las Transformaciones y Acciones

Section titled “✅ Checkpoint 3: Practica las Transformaciones y Acciones”

Ejercicio 1: Completa el código

# Tienes una lista de precios en dólares
precios_usd = sc.parallelize([10, 25, 40, 5, 100, 15])
# TODO: Filtra precios >= 20
precios_altos = precios_usd.filter(lambda x: ______)
# TODO: Convierte a euros (multiplica por 0.9)
precios_eur = precios_altos.map(lambda x: ______)
print(precios_eur.collect()) # Debería dar: [22.5, 36.0, 90.0, 13.5]

Ejercicio 2: map vs flatMap

# Tienes una lista de la compra
compras = sc.parallelize(['Leche,Huevos,Pan', 'Manzanas,Peras'])
# TODO: Usa flatMap para obtener una lista de productos individuales
productos = compras.flatMap(lambda linea: ______)
print(productos.collect())
# Debería dar: ['Leche', 'Huevos', 'Pan', 'Manzanas', 'Peras']

Devuelve solo elementos que están en AMBOS RDDs.

⚠️ Nota: intersection() es una transformación Wide (causa Shuffle).

# Usuarios que visitaron la web ayer y hoy
ayer = sc.parallelize(["a@test.com", "b@test.com", "c@test.com"])
hoy = sc.parallelize(["c@test.com", "d@test.com", "e@test.com"])
# Usuarios que visitaron ambos días
ambos_dias = ayer.intersection(hoy)
# Ejecutar
print(ambos_dias.collect()) # ['c@test.com']

En Spark, tu aplicación se divide en dos componentes que pueden estar en ordenadores distintos:

graph TB
Driver["🧠 DRIVER<br/><br/>Tu script Python<br/>Planifica tareas<br/>Coordina ejecución"]
Driver -->|"Envía código"| E1["⚙️ EXECUTOR 1<br/><br/>Ejecuta tareas<br/>Guarda en RAM"]
Driver -->|"Envía código"| E2["⚙️ EXECUTOR 2<br/><br/>Ejecuta tareas<br/>Guarda en RAM"]
Driver -->|"Envía código"| E3["⚙️ EXECUTOR 3<br/><br/>Ejecuta tareas<br/>Guarda en RAM"]
E1 -.->|"Reporta resultados"| Driver
E2 -.->|"Reporta resultados"| Driver
E3 -.->|"Reporta resultados"| Driver
style Driver fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style E1 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style E2 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style E3 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px

Driver (El Cerebro):

  • Corre tu script Python
  • Convierte tu código en un plan de ejecución
  • Envía tareas a los Executors

Executors (El Músculo):

  • Ejecutan las tareas (map, filter, etc.)
  • Almacenan datos en su memoria RAM
  • Reportan resultados al Driver

3.2 Particionamiento: La Clave del Paralelismo

Section titled “3.2 Particionamiento: La Clave del Paralelismo”

Los datos de un RDD están divididos en particiones. Cada partición se procesa en paralelo.

Regla de Oro: El paralelismo máximo = número de particiones

datos = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# Crear RDD con 2 particiones
rdd = sc.parallelize(datos, 2)
print(f"Número de particiones: {rdd.getNumPartitions()}") # 2

Visualización:

Datos: [1,2,3,4,5,6,7,8,9,10]
Partición 0: [1,2,3,4,5] → Executor 1
Partición 1: [6,7,8,9,10] → Executor 2

Fórmula general:

Particiones = 2-4 × número de CPUs

Ejemplos:

  • Laptop (4 cores): 8-16 particiones
  • Clúster pequeño (20 cores): 40-80 particiones
  • Clúster grande (200 cores): 400-800 particiones

Quiz Rápido:

  1. ¿Dónde se ejecuta rdd.map(lambda x: x*2)?

    • a) En el Driver
    • b) En los Executors ✅
    • c) En ambos
  2. Si tienes 50 CPUs pero solo 2 particiones, ¿cuántas CPUs trabajarán?

    • a) 50
    • b) 2 ✅
    • c) 25
Ver explicación

Respuesta 1: b) En los Executors. El Driver solo envía el código, los Executors lo ejecutan.

Respuesta 2: b) Solo 2 CPUs trabajarán (una por partición). Las otras 48 estarán ociosas.


Concepto Crítico: Spark es “perezoso”. Las transformaciones NO se ejecutan inmediatamente.

Spark solo toma notas de lo que quieres hacer. Solo ejecuta cuando llamas a una acción.

Ejemplo:

# Ninguna de estas líneas ejecuta nada
rdd1 = sc.parallelize([1, 2, 3, 4])
rdd2 = rdd1.map(lambda x: x * 2) # Solo toma nota
rdd3 = rdd2.filter(lambda x: x > 5) # Solo toma nota
# ESTA línea ejecuta todo el pipeline
resultado = rdd3.collect() # ¡Ahora sí se ejecuta!

Visualización del DAG (Plan de Ejecución):

graph LR
A["RDD1: [1,2,3,4]"] -->|"map(x*2)"| B["RDD2: [2,4,6,8]"]
B -->|"filter(x>5)"| C["RDD3: [6,8]"]
C -->|"collect()"| D["🚀 ACCIÓN<br/>Ejecuta todo"]
style A fill:#e1f5ff
style B fill:#e1f5ff
style C fill:#e1f5ff
style D fill:#ffcccc

¿Por qué es útil?

  • ✅ Spark puede optimizar el plan completo antes de ejecutar
  • ✅ Puede fusionar múltiples transformaciones en una sola pasada
  • ✅ Evita cálculos innecesarios

Definición: Cada partición de salida depende solo de UNA partición de entrada.

Características:

  • ✅ Sin tráfico de red
  • ✅ Procesamiento local
  • ✅ Spark fusiona múltiples operaciones (pipeline)

Ejemplos: map, filter, flatMap

Visualización:

PARTICIÓN 1 (Nodo 1) PARTICIÓN 1 (Nodo 1)
[1, 2, 3] [2, 4, 6]
↓ map(x*2) ↓
Sin movimiento de datos
Todo en el mismo nodo

5.2 Wide Transformations (Lentas - Shuffle)

Section titled “5.2 Wide Transformations (Lentas - Shuffle)”

Definición: Para calcular una partición de salida, necesitas datos de MUCHAS particiones de entrada.

Características:

  • ❌ Requiere Shuffle (mover datos por la red)
  • ❌ Escribe datos intermedios en disco
  • ❌ Rompe el pipeline (crea Stages separados)

Ejemplos: reduceByKey, groupByKey, sortByKey, distinct

Visualización del Shuffle:

graph TB
subgraph "ANTES del Shuffle"
A1["Nodo A<br/>(X:1, Y:2, X:3)"]
A2["Nodo B<br/>(Y:4, X:5, Z:6)"]
end
subgraph "Shuffle (Red + Disco)"
S["⚠️ Datos viajan<br/>por la red"]
end
subgraph "DESPUÉS del Shuffle"
B1["Nodo C<br/>X: [1,3,5]"]
B2["Nodo D<br/>Y: [2,4]"]
B3["Nodo E<br/>Z: [6]"]
end
A1 --> S
A2 --> S
S --> B1
S --> B2
S --> B3
style A1 fill:#d1ecf1
style A2 fill:#d1ecf1
style S fill:#f8d7da
style B1 fill:#d4edda
style B2 fill:#d4edda
style B3 fill:#d4edda

⚠️ El Shuffle es la operación MÁS COSTOSA en Spark

Costes:

  1. Red: Latencia y ancho de banda
  2. Disco: Datos intermedios se escriben
  3. CPU: Serialización/deserialización
  4. Sincronización: Espera a que termine todo

Clasifica estas operaciones:

  1. rdd.map(lambda x: x * 2) → Narrow ✅ / Wide ❌
  2. rdd.filter(lambda x: x > 10) → Narrow ✅ / Wide ❌
  3. rdd.reduceByKey(lambda a,b: a+b) → Narrow ❌ / Wide ✅
  4. rdd.distinct() → Narrow ❌ / Wide ✅

Regla Práctica:

  • Si cada elemento se procesa independientemente → Narrow
  • Si necesitas agrupar/ordenar/comparar elementos → Wide

📖 Parte 7: Operaciones Clave-Valor (Pair RDDs)

Section titled “📖 Parte 7: Operaciones Clave-Valor (Pair RDDs)”

Estas transformaciones solo funcionan en RDDs de tuplas (clave, valor).

Agrupa los valores de cada clave usando una función de agregación (suma, multiplicación, etc.).

⚠️ Nota: reduceByKey() es una transformación Wide (causa Shuffle), pero se ejecuta parcialmente en el Mapper (Combiner) para reducir tráfico de red.

# Ventas por ciudad: (Ciudad, Precio)
ventas = sc.parallelize([
("Madrid", 100),
("Barcelona", 200),
("Madrid", 50),
("Barcelona", 50),
("Sevilla", 300)
])
# Sumar ventas por ciudad
# a: acumulador, b: nuevo valor
total_ventas = ventas.reduceByKey(lambda a, b: a + b)
# Ejecutar
print(total_ventas.collect())
# [('Madrid', 150), ('Barcelona', 250), ('Sevilla', 300)]

Agrupa todos los valores de cada clave en una secuencia (iterable).

⚠️ Nota: groupByKey() es una transformación Wide (causa Shuffle) y muy costosa. Si vas a agregar datos (sumar, contar), NO la uses. Usa reduceByKey.

# Estudiantes por clase
clases = sc.parallelize([
("Clase A", "Juan"),
("Clase B", "Maria"),
("Clase A", "Pedro"),
("Clase B", "Luis")
])
# Agrupar estudiantes
agrupados = clases.groupByKey()
# Ejecutar y formatear resultado
resultado = agrupados.mapValues(list).collect()
print(resultado)
# [('Clase A', ['Juan', 'Pedro']), ('Clase B', ['Maria', 'Luis'])]

Ordena el RDD por su clave (ascending=True por defecto).

⚠️ Nota: sortByKey() es una transformación Wide (causa Shuffle).

# Puntuaciones: (Puntos, Nombre)
puntos = sc.parallelize([
(50, "Jugador C"),
(100, "Jugador A"),
(10, "Jugador B")
])
# Ordenar por puntos (clave) de menor a mayor
ranking = puntos.sortByKey()
# Ordenar de mayor a menor
ranking_desc = puntos.sortByKey(ascending=False)
print(ranking.collect()) # [(10, 'Jugador B'), (50, 'Jugador C'), (100, 'Jugador A')]
print(ranking_desc.collect()) # [(100, 'Jugador A'), (50, 'Jugador C'), (10, 'Jugador B')]

7.4 Optimización: reduceByKey vs groupByKey

Section titled “7.4 Optimización: reduceByKey vs groupByKey”

Problema: groupByKey envía TODOS los valores por la red sin pre-agregar.

Ejemplo Ineficiente (❌ MALO):

palabras = [("hola", 1), ("mundo", 1), ("hola", 1), ("hola", 1)]
rdd = sc.parallelize(palabras, 2)
# groupByKey envía [1, 1, 1] por la red
conteo_malo = rdd.groupByKey().mapValues(sum)

Solución Eficiente (✅ BUENO):

# reduceByKey pre-agrega localmente antes de enviar
conteo_bueno = rdd.reduceByKey(lambda a, b: a + b)

Comparación Visual:

groupByKey (MALO):
Nodo 1: (hola,1), (hola,1), (hola,1) → Envía 3 valores
Nodo 2: (hola,1), (hola,1) → Envía 2 valores
Total red: 5 valores
reduceByKey (BUENO):
Nodo 1: (hola,1), (hola,1), (hola,1) → Pre-agrega → Envía (hola,3)
Nodo 2: (hola,1), (hola,1) → Pre-agrega → Envía (hola,2)
Total red: 2 valores

OperaciónCuándo UsarEjemplo
reduceByKeyAgregación simple (suma, max, min)Contar palabras
aggregateByKeyAgregación compleja (cambio de tipo)Calcular promedio
groupByKeyCasi nunca (solo si necesitas TODOS los valores)Generar listas completas
sortByKeyOrdenar por claveRankings
mapValuesTransformar solo valores (sin cambiar clave)Convertir USD a EUR

Por defecto, Spark recalcula el RDD cada vez que ejecutas una acción.

Sin cache:

rdd = sc.textFile("archivo_grande.txt").map(...).filter(...)
print(rdd.count()) # Tarda 10 minutos
print(rdd.count()) # Tarda OTROS 10 minutos (recalcula todo)

Con cache:

rdd = sc.textFile("archivo_grande.txt").map(...).filter(...)
rdd.cache() # Marca para guardar en RAM
print(rdd.count()) # Tarda 10 minutos (calcula y guarda)
print(rdd.count()) # Tarda 0 segundos (lee de RAM)

NivelUbicaciónCuándo Usar
MEMORY_ONLYRAMDatos pequeños, máximo rendimiento
MEMORY_AND_DISKRAM + DiscoRecomendado - Seguro y eficiente
MEMORY_ONLY_SERRAM (serializado)Datos grandes, ahorra RAM
DISK_ONLYDiscoDatos enormes que no caben en RAM

Ejemplo:

from pyspark import StorageLevel
rdd.persist(StorageLevel.MEMORY_AND_DISK)

Permiten sumar valores desde los Executors de forma segura.

Ejemplo: Contar errores

errores = sc.accumulator(0)
def procesar_linea(linea):
try:
# Procesar...
return resultado
except:
errores.add(1)
return None
rdd.map(procesar_linea).collect()
print(f"Errores encontrados: {errores.value}")

8.2 Broadcast Variables (Datos Compartidos)

Section titled “8.2 Broadcast Variables (Datos Compartidos)”

Envía UNA SOLA copia de datos grandes a cada nodo.

Sin Broadcast (❌ MALO):

tabla_grande = {...} # 100MB
rdd.map(lambda x: tabla_grande.get(x)) # Envía 100MB con CADA tarea

Con Broadcast (✅ BUENO):

tabla_bc = sc.broadcast(tabla_grande)
rdd.map(lambda x: tabla_bc.value.get(x)) # Envía 100MB UNA VEZ por nodo

Causa: collect() sobre un RDD enorme

Solución:

# ❌ MALO
resultado = rdd_gigante.collect()
# ✅ BUENO
muestra = rdd_gigante.take(100)
# O guardar a disco
rdd_gigante.saveAsTextFile("resultado/")

Causa: Intentas enviar un objeto no serializable

Solución:

# ❌ MALO
conexion_db = crear_conexion()
rdd.map(lambda x: conexion_db.query(x))
# ✅ BUENO
def procesar(x):
conexion = crear_conexion() # Crear dentro de la función
resultado = conexion.query(x)
conexion.close()
return resultado
rdd.map(procesar)

  1. RDDs son inmutables - Cada transformación crea un nuevo RDD
  2. Lazy Evaluation - Las transformaciones no se ejecutan hasta una acción
  3. Narrow vs Wide - Narrow es rápido, Wide causa Shuffle
  4. Particionamiento - Más particiones = más paralelismo
  5. Cache - Reutiliza RDDs para evitar recalcular
  6. reduceByKey > groupByKey - Pre-agrega localmente

Antes de quejarte de que “Spark es lento”:

  • ¿Usas reduceByKey en lugar de groupByKey?
  • ¿El número de particiones es 2-4x el número de CPUs?
  • ¿Cacheas RDDs que usas múltiples veces?
  • ¿Evitas collect() en datasets grandes?
  • ¿Filtras datos lo antes posible?

Ver soluciones paso a paso en: 02RDD_new_SOL.txt

Cuenta cuántas veces aparece cada palabra en un texto. texto = ["hola mundo", "hola spark", "mundo spark"]

Datos: [("Madrid", 100), ("Barcelona", 200), ("Madrid", 50)]

  1. Suma las ventas por ciudad
  2. Filtra ciudades con ventas > 100
  3. Ordena de mayor a menor

Teniendo precios en USD: [10, 25, 40, 5, 100, 15], filtra los >= 20 y conviértelos a EUR (x0.9).

Divide, aplana y cuenta productos únicos de: ["Leche,Huevos,Pan", "Manzanas,Peras", "Leche,Chocolate"]

Teniendo logs ip,hora,metodo,url, cuenta peticiones por IP y filtra las que tengan > 1 petición.

Tienes un RDD que usas 5 veces en diferentes cálculos. ¿Qué debes hacer para optimizar?


Última actualización: 2026-02-02