Fundamentos de RDD en Spark
🚀 Introducción: ¿Por Qué Aprender RDDs?
Section titled “🚀 Introducción: ¿Por Qué Aprender RDDs?”Nota Importante: Aunque en Spark 3.x el 95% del tiempo trabajarás con DataFrames, los DataFrames son solo una capa de abstracción sobre los RDDs. Entender los RDDs es entender cómo piensa Spark.
Sin entender RDDs: Escribirás código que funciona, pero que es lento o ineficiente.
Entendiendo RDDs: Sabrás exactamente qué hace Spark internamente y cómo optimizar tus jobs.
📖 Parte 1: Conceptos Fundamentales
Section titled “📖 Parte 1: Conceptos Fundamentales”1.1 ¿Qué es un RDD?
Section titled “1.1 ¿Qué es un RDD?”RDD significa Resilient Distributed Dataset. Desglosemos cada palabra:
- Resilient (Resiliente): Tolerante a fallos. Si un nodo falla, Spark reconstruye los datos automáticamente.
- Distributed (Distribuido): Los datos están repartidos por la memoria RAM de múltiples ordenadores.
- Dataset: Es una colección de datos inmutable (no se puede modificar).
1.2 Concepto Clave: Inmutabilidad
Section titled “1.2 Concepto Clave: Inmutabilidad”Una vez creado un RDD, NO puedes cambiar sus datos. Cada transformación crea un nuevo RDD.
Ejemplo 1: Transformación crea nuevo RDD
Section titled “Ejemplo 1: Transformación crea nuevo RDD”# Crear un RDD originalrdd_original = sc.parallelize([1, 2, 3, 4, 5])
# Aplicar una transformación (crea un NUEVO RDD)rdd_multiplicado = rdd_original.map(lambda x: x * 2)
# Verificar que el original NO cambióprint("RDD original:", rdd_original.collect()) # [1, 2, 3, 4, 5]print("RDD multiplicado:", rdd_multiplicado.collect()) # [2, 4, 6, 8, 10]Ejemplo 2: Reasignar variable NO modifica el RDD original
Section titled “Ejemplo 2: Reasignar variable NO modifica el RDD original”# Crear RDDrdd = sc.parallelize([1, 2, 3, 4, 5])
# Reasignar la variable 'rdd' a un nuevo RDD# Esto NO modifica los datos del RDD anterior,# solo hace que la variable apunte a uno nuevo.rdd = rdd.map(lambda x: x * 10)
print(rdd.collect()) # [10, 20, 30, 40, 50]Ejemplo 3: Intentar “modificar” un RDD (NO funciona como esperas)
Section titled “Ejemplo 3: Intentar “modificar” un RDD (NO funciona como esperas)”# Crear RDDnumeros = sc.parallelize([1, 2, 3, 4, 5])
# Esto NO modifica 'numeros', crea un nuevo RDD que se descarta si no lo asignasnumeros.map(lambda x: x * 2) # ⚠️ El resultado se pierde
print(numeros.collect()) # [1, 2, 3, 4, 5] - ¡Sin cambios!
# Correcto: Asignar a una variablenumeros_dobles = numeros.map(lambda x: x * 2)¿Por qué es importante la inmutabilidad?
- ✅ Tolerancia a fallos: Spark puede reconstruir datos perdidos re-ejecutando transformaciones
- ✅ Paralelismo seguro: Múltiples tareas pueden leer el mismo RDD sin conflictos
- ✅ Optimización: Spark mantiene un historial (DAG) para optimizar la ejecución
1.2 Tu Primer RDD
Section titled “1.2 Tu Primer RDD”Vamos a crear tu primer RDD paso a paso:
from pyspark.sql import SparkSession
# Paso 1: Crear la sesión de Sparkspark = SparkSession.builder.appName("MiPrimerRDD").getOrCreate()sc = spark.sparkContext
# Paso 2: Crear un RDD desde una listadatos = [1, 2, 3, 4, 5]mi_primer_rdd = sc.parallelize(datos)
# Paso 3: Verificar que se creó correctamenteprint("Número de elementos:", mi_primer_rdd.count()) # 5print("Primeros 3 elementos:", mi_primer_rdd.take(3)) # [1, 2, 3]print("Todos los elementos:", mi_primer_rdd.collect()) # [1, 2, 3, 4, 5]🎓 Explicación:
parallelize(): Convierte una lista de Python en un RDD distribuidocount(): Acción que cuenta los elementostake(n): Acción que devuelve los primeros n elementoscollect(): Acción que trae todos los datos al Driver
✅ Checkpoint 1: ¿Lo has entendido?
Section titled “✅ Checkpoint 1: ¿Lo has entendido?”Antes de continuar, asegúrate de que puedes:
- Explicar qué significa “inmutable” con tus propias palabras
- Crear un RDD desde una lista de Python
- Usar
count()ycollect()para ver los datos
Mini-Quiz:
-
¿Qué devuelve
rdd.map(lambda x: x*2)?- a) Una lista de Python
- b) Un nuevo RDD ✅
- c) Modifica el RDD original
-
¿Cuál de estas es una ACCIÓN?
- a)
map() - b)
filter() - c)
count()✅
- a)
Ver explicación
Respuesta 1: b) Un nuevo RDD. Las transformaciones siempre devuelven un nuevo RDD.
Respuesta 2: c) count() es una acción porque dispara la ejecución y devuelve un resultado al Driver.
📖 Parte 2: Transformaciones Básicas
Section titled “📖 Parte 2: Transformaciones Básicas”2.1 Transformaciones vs Acciones
Section titled “2.1 Transformaciones vs Acciones”Esta es la distinción MÁS IMPORTANTE en Spark:
| Tipo | ¿Qué hace? | ¿Cuándo se ejecuta? | Ejemplos |
|---|---|---|---|
| Transformación | Crea un nuevo RDD | ❌ NO se ejecuta (lazy) | map, filter, flatMap |
| Acción | Devuelve un resultado | ✅ Ejecuta TODO el pipeline | collect, count, take |
Ejemplo Visual:
# Ninguna de estas líneas ejecuta nadardd1 = sc.parallelize([1, 2, 3, 4])rdd2 = rdd1.map(lambda x: x * 2) # Solo toma notardd3 = rdd2.filter(lambda x: x > 5) # Solo toma nota
# ESTA línea ejecuta todo el pipelineresultado = rdd3.collect() # ¡Ahora sí se ejecuta!print(resultado) # [6, 8]2.2 Transformación: map()
Section titled “2.2 Transformación: map()”map() aplica una función a cada elemento del RDD y devuelve un nuevo RDD.
Regla: 1 elemento de entrada → 1 elemento de salida
Ejemplo 1: Multiplicar por 2
Section titled “Ejemplo 1: Multiplicar por 2”# Crear RDDnumeros = sc.parallelize([1, 2, 3, 4, 5])
# Aplicar map (NO se ejecuta aún)dobles = numeros.map(lambda x: x * 2)
# Ejecutar con acciónprint(dobles.collect()) # [2, 4, 6, 8, 10]Ejemplo 2: Convertir a mayúsculas
Section titled “Ejemplo 2: Convertir a mayúsculas”# Crear RDD de palabraspalabras = sc.parallelize(["hola", "mundo", "spark"])
# Aplicar mapmayusculas = palabras.map(lambda palabra: palabra.upper())
# Ejecutarprint(mayusculas.collect()) # ['HOLA', 'MUNDO', 'SPARK']Ejemplo 3: Extraer longitud
Section titled “Ejemplo 3: Extraer longitud”# Crear RDDpalabras = sc.parallelize(["hola", "mundo", "spark"])
# Aplicar map para obtener longitudeslongitudes = palabras.map(lambda palabra: len(palabra))
# Ejecutarprint(longitudes.collect()) # [4, 5, 5]Ejemplo 4: Transformar tuplas
Section titled “Ejemplo 4: Transformar tuplas”# Crear RDD de tuplas (nombre, edad)personas = sc.parallelize([("Ana", 25), ("Juan", 30), ("María", 28)])
# Extraer solo nombresnombres = personas.map(lambda persona: persona[0])print(nombres.collect()) # ['Ana', 'Juan', 'María']
# Incrementar edad en 1personas_mayor = personas.map(lambda p: (p[0], p[1] + 1))print(personas_mayor.collect()) # [('Ana', 26), ('Juan', 31), ('María', 29)]2.3 Transformación: filter()
Section titled “2.3 Transformación: filter()”filter() selecciona solo los elementos que cumplen una condición.
Regla: 1 elemento de entrada → 0 o 1 elemento de salida
Ejemplo 1: Números pares
Section titled “Ejemplo 1: Números pares”# Crear RDDnumeros = sc.parallelize([1, 2, 3, 4, 5, 6])
# Filtrar parespares = numeros.filter(lambda x: x % 2 == 0)
# Ejecutarprint(pares.collect()) # [2, 4, 6]Ejemplo 2: Palabras largas
Section titled “Ejemplo 2: Palabras largas”# Crear RDDpalabras = sc.parallelize(["hola", "mundo", "spark", "python"])
# Filtrar palabras con más de 4 letraslargas = palabras.filter(lambda palabra: len(palabra) > 4)
# Ejecutarprint(largas.collect()) # ['mundo', 'spark', 'python']Ejemplo 3: Filtrar por condición compleja
Section titled “Ejemplo 3: Filtrar por condición compleja”# Crear RDD de tuplas (producto, precio)productos = sc.parallelize([ ("Laptop", 1000), ("Mouse", 25), ("Teclado", 75), ("Monitor", 300)])
# Filtrar productos caros (precio > 100)caros = productos.filter(lambda p: p[1] > 100)
# Ejecutarprint(caros.collect()) # [('Laptop', 1000), ('Monitor', 300)]Ejemplo 4: Combinar map y filter
Section titled “Ejemplo 4: Combinar map y filter”# Crear RDDnumeros = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# Pipeline: multiplicar por 2, luego filtrar mayores a 10resultado = numeros.map(lambda x: x * 2) \ .filter(lambda x: x > 10)
# Ejecutarprint(resultado.collect()) # [12, 14, 16, 18, 20]2.4 Transformación: flatMap()
Section titled “2.4 Transformación: flatMap()”flatMap() es como map(), pero “aplana” listas anidadas.
Regla: 1 elemento de entrada → 0, 1 o N elementos de salida
Ejemplo 1: Dividir frases en palabras
Section titled “Ejemplo 1: Dividir frases en palabras”# Crear RDD de frasesfrases = sc.parallelize(["Hola mundo", "Apache Spark es genial"])
# Con map() obtienes listas anidadascon_map = frases.map(lambda frase: frase.split(" "))print(con_map.collect())# [['Hola', 'mundo'], ['Apache', 'Spark', 'es', 'genial']]
# Con flatMap() obtienes una lista planacon_flatmap = frases.flatMap(lambda frase: frase.split(" "))print(con_flatmap.collect())# ['Hola', 'mundo', 'Apache', 'Spark', 'es', 'genial']Ejemplo 2: Generar rangos
Section titled “Ejemplo 2: Generar rangos”# Crear RDDnumeros = sc.parallelize([1, 2, 3])
# flatMap para generar rangosrangos = numeros.flatMap(lambda x: range(1, x + 1))
# Ejecutarprint(rangos.collect()) # [1, 1, 2, 1, 2, 3]Ejemplo 3: Procesar CSV
Section titled “Ejemplo 3: Procesar CSV”# Crear RDD de líneas CSVcsv_lines = sc.parallelize([ "Leche,Huevos,Pan", "Manzanas,Peras", "Leche,Chocolate"])
# Dividir y aplanarproductos = csv_lines.flatMap(lambda linea: linea.split(","))
# Ejecutarprint(productos.collect())# ['Leche', 'Huevos', 'Pan', 'Manzanas', 'Peras', 'Leche', 'Chocolate']2.5 Transformación: distinct()
Section titled “2.5 Transformación: distinct()”Elimina elementos duplicados.
⚠️ Nota: distinct() es una transformación Wide (causa Shuffle).
Ejemplo Completo:
Section titled “Ejemplo Completo:”# Crear RDD con duplicadosnumeros = sc.parallelize([1, 2, 2, 3, 3, 3, 4, 5, 5])
# Eliminar duplicadosunicos = numeros.distinct()
# Ejecutarprint(unicos.collect()) # [1, 2, 3, 4, 5] (orden puede variar)2.6 Transformación: union()
Section titled “2.6 Transformación: union()”Une dos RDDs (permite duplicados).
Ejemplo Completo:
Section titled “Ejemplo Completo:”# Crear dos RDDsrdd1 = sc.parallelize([1, 2, 3])rdd2 = sc.parallelize([3, 4, 5])
# Unir (permite duplicados)union = rdd1.union(rdd2)
# Ejecutarprint(union.collect()) # [1, 2, 3, 3, 4, 5]
# Si quieres sin duplicadosunion_sin_duplicados = rdd1.union(rdd2).distinct()print(union_sin_duplicados.collect()) # [1, 2, 3, 4, 5]2.7 Transformación: intersection()
Section titled “2.7 Transformación: intersection()”Devuelve solo elementos que están en AMBOS RDDs.
⚠️ Nota: intersection() es una transformación Wide (causa Shuffle).
Ejemplo Completo:
Section titled “Ejemplo Completo:”# Usuarios que visitaron la web ayer y hoyayer = sc.parallelize(["a@test.com", "b@test.com", "c@test.com"])hoy = sc.parallelize(["c@test.com", "d@test.com", "e@test.com"])
# Usuarios que visitaron ambos díasambos_dias = ayer.intersection(hoy)
# Ejecutarprint(ambos_dias.collect()) # ['c@test.com']📖 Parte 3: Acciones Principales
Section titled “📖 Parte 3: Acciones Principales”3.1 Acción: collect()
Section titled “3.1 Acción: collect()”Trae todos los datos del RDD al Driver.
⚠️ Peligro: Si el RDD es muy grande, puede causar OutOfMemoryError.
Ejemplo Completo:
Section titled “Ejemplo Completo:”# Crear RDD pequeñonumeros = sc.parallelize([1, 2, 3, 4, 5])
# Traer todos los datosdatos = numeros.collect()
# 'datos' es una lista de Pythonprint(type(datos)) # <class 'list'>print(datos) # [1, 2, 3, 4, 5]3.2 Acción: count()
Section titled “3.2 Acción: count()”Cuenta el número de elementos.
Ejemplo Completo:
Section titled “Ejemplo Completo:”# Crear RDDpalabras = sc.parallelize(["hola", "mundo", "spark", "python"])
# Contar elementostotal = palabras.count()
# Ejecutarprint(f"Total de palabras: {total}") # Total de palabras: 43.3 Acción: first()
Section titled “3.3 Acción: first()”Devuelve el primer elemento.
Ejemplo Completo:
Section titled “Ejemplo Completo:”# Crear RDDnumeros = sc.parallelize([10, 20, 30, 40, 50])
# Obtener primer elementoprimero = numeros.first()
# Ejecutarprint(f"Primer elemento: {primero}") # Primer elemento: 103.4 Acción: take(n)
Section titled “3.4 Acción: take(n)”Devuelve los primeros N elementos.
Ejemplo Completo:
Section titled “Ejemplo Completo:”# Crear RDDnumeros = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# Tomar primeros 5primeros_5 = numeros.take(5)
# Ejecutarprint(primeros_5) # [1, 2, 3, 4, 5]3.5 Acción: reduce()
Section titled “3.5 Acción: reduce()”Agrega todos los elementos usando una función.
Ejemplo 1: Suma total
Section titled “Ejemplo 1: Suma total”# Crear RDDnumeros = sc.parallelize([1, 2, 3, 4, 5])
# Sumar todos los elementossuma = numeros.reduce(lambda a, b: a + b)
# Ejecutarprint(f"Suma total: {suma}") # Suma total: 15Ejemplo 2: Máximo
Section titled “Ejemplo 2: Máximo”# Crear RDDnumeros = sc.parallelize([10, 5, 20, 15, 8])
# Encontrar máximomaximo = numeros.reduce(lambda a, b: a if a > b else b)
# Ejecutarprint(f"Máximo: {maximo}") # Máximo: 203.6 Acción: foreach()
Section titled “3.6 Acción: foreach()”Aplica una función a cada elemento (útil para efectos secundarios como guardar en BD).
Ejemplo Completo:
Section titled “Ejemplo Completo:”# Crear RDDnumeros = sc.parallelize([1, 2, 3, 4, 5])
# Imprimir cada elemento (se ejecuta en los Executors)numeros.foreach(lambda x: print(f"Procesando: {x}"))
# Nota: Los prints aparecen en los logs de los Executors, no en tu consola3.7 Acción: saveAsTextFile()
Section titled “3.7 Acción: saveAsTextFile()”Guarda el RDD en archivos de texto.
Ejemplo Completo:
Section titled “Ejemplo Completo:”# Crear RDDpalabras = sc.parallelize(["hola", "mundo", "spark"])
# Guardar a discopalabras.saveAsTextFile("output/palabras")
# Esto crea una carpeta 'output/palabras' con archivos:# part-00000, part-00001, etc.✅ Checkpoint 3: Practica las Transformaciones y Acciones
Section titled “✅ Checkpoint 3: Practica las Transformaciones y Acciones”Ejercicio 1: Completa el código
# Tienes una lista de precios en dólaresprecios_usd = sc.parallelize([10, 25, 40, 5, 100, 15])
# TODO: Filtra precios >= 20precios_altos = precios_usd.filter(lambda x: ______)
# TODO: Convierte a euros (multiplica por 0.9)precios_eur = precios_altos.map(lambda x: ______)
print(precios_eur.collect()) # Debería dar: [22.5, 36.0, 90.0, 13.5]Ejercicio 2: map vs flatMap
# Tienes una lista de la compracompras = sc.parallelize(['Leche,Huevos,Pan', 'Manzanas,Peras'])
# TODO: Usa flatMap para obtener una lista de productos individualesproductos = compras.flatMap(lambda linea: ______)
print(productos.collect())# Debería dar: ['Leche', 'Huevos', 'Pan', 'Manzanas', 'Peras']3.7 Transformación: intersection()
Section titled “3.7 Transformación: intersection()”Devuelve solo elementos que están en AMBOS RDDs.
⚠️ Nota: intersection() es una transformación Wide (causa Shuffle).
Ejemplo Completo:
Section titled “Ejemplo Completo:”# Usuarios que visitaron la web ayer y hoyayer = sc.parallelize(["a@test.com", "b@test.com", "c@test.com"])hoy = sc.parallelize(["c@test.com", "d@test.com", "e@test.com"])
# Usuarios que visitaron ambos díasambos_dias = ayer.intersection(hoy)
# Ejecutarprint(ambos_dias.collect()) # ['c@test.com']📖 Parte 4: Arquitectura Distribuida
Section titled “📖 Parte 4: Arquitectura Distribuida”4.1 Driver vs Executor
Section titled “4.1 Driver vs Executor”En Spark, tu aplicación se divide en dos componentes que pueden estar en ordenadores distintos:
graph TB Driver["🧠 DRIVER<br/><br/>Tu script Python<br/>Planifica tareas<br/>Coordina ejecución"]
Driver -->|"Envía código"| E1["⚙️ EXECUTOR 1<br/><br/>Ejecuta tareas<br/>Guarda en RAM"] Driver -->|"Envía código"| E2["⚙️ EXECUTOR 2<br/><br/>Ejecuta tareas<br/>Guarda en RAM"] Driver -->|"Envía código"| E3["⚙️ EXECUTOR 3<br/><br/>Ejecuta tareas<br/>Guarda en RAM"]
E1 -.->|"Reporta resultados"| Driver E2 -.->|"Reporta resultados"| Driver E3 -.->|"Reporta resultados"| Driver
style Driver fill:#e3f2fd,stroke:#1976d2,stroke-width:3px style E1 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style E2 fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style E3 fill:#c8e6c9,stroke:#388e3c,stroke-width:2pxDriver (El Cerebro):
- Corre tu script Python
- Convierte tu código en un plan de ejecución
- Envía tareas a los Executors
Executors (El Músculo):
- Ejecutan las tareas (
map,filter, etc.) - Almacenan datos en su memoria RAM
- Reportan resultados al Driver
3.2 Particionamiento: La Clave del Paralelismo
Section titled “3.2 Particionamiento: La Clave del Paralelismo”Los datos de un RDD están divididos en particiones. Cada partición se procesa en paralelo.
Regla de Oro: El paralelismo máximo = número de particiones
datos = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# Crear RDD con 2 particionesrdd = sc.parallelize(datos, 2)
print(f"Número de particiones: {rdd.getNumPartitions()}") # 2Visualización:
Datos: [1,2,3,4,5,6,7,8,9,10] ↓Partición 0: [1,2,3,4,5] → Executor 1Partición 1: [6,7,8,9,10] → Executor 2¿Cuántas particiones usar?
Section titled “¿Cuántas particiones usar?”Fórmula general:
Particiones = 2-4 × número de CPUsEjemplos:
- Laptop (4 cores): 8-16 particiones
- Clúster pequeño (20 cores): 40-80 particiones
- Clúster grande (200 cores): 400-800 particiones
✅ Checkpoint 3: Arquitectura
Section titled “✅ Checkpoint 3: Arquitectura”Quiz Rápido:
-
¿Dónde se ejecuta
rdd.map(lambda x: x*2)?- a) En el Driver
- b) En los Executors ✅
- c) En ambos
-
Si tienes 50 CPUs pero solo 2 particiones, ¿cuántas CPUs trabajarán?
- a) 50
- b) 2 ✅
- c) 25
Ver explicación
Respuesta 1: b) En los Executors. El Driver solo envía el código, los Executors lo ejecutan.
Respuesta 2: b) Solo 2 CPUs trabajarán (una por partición). Las otras 48 estarán ociosas.
📖 Parte 5: Lazy Evaluation y el DAG
Section titled “📖 Parte 5: Lazy Evaluation y el DAG”5.1 ¿Qué es Lazy Evaluation?
Section titled “5.1 ¿Qué es Lazy Evaluation?”Concepto Crítico: Spark es “perezoso”. Las transformaciones NO se ejecutan inmediatamente.
Spark solo toma notas de lo que quieres hacer. Solo ejecuta cuando llamas a una acción.
Ejemplo:
# Ninguna de estas líneas ejecuta nadardd1 = sc.parallelize([1, 2, 3, 4])rdd2 = rdd1.map(lambda x: x * 2) # Solo toma notardd3 = rdd2.filter(lambda x: x > 5) # Solo toma nota
# ESTA línea ejecuta todo el pipelineresultado = rdd3.collect() # ¡Ahora sí se ejecuta!Visualización del DAG (Plan de Ejecución):
graph LR A["RDD1: [1,2,3,4]"] -->|"map(x*2)"| B["RDD2: [2,4,6,8]"] B -->|"filter(x>5)"| C["RDD3: [6,8]"] C -->|"collect()"| D["🚀 ACCIÓN<br/>Ejecuta todo"]
style A fill:#e1f5ff style B fill:#e1f5ff style C fill:#e1f5ff style D fill:#ffcccc¿Por qué es útil?
- ✅ Spark puede optimizar el plan completo antes de ejecutar
- ✅ Puede fusionar múltiples transformaciones en una sola pasada
- ✅ Evita cálculos innecesarios
📖 Parte 6: Narrow vs Wide
Section titled “📖 Parte 6: Narrow vs Wide”6.1 Narrow Transformations (Rápidas)
Section titled “6.1 Narrow Transformations (Rápidas)”Definición: Cada partición de salida depende solo de UNA partición de entrada.
Características:
- ✅ Sin tráfico de red
- ✅ Procesamiento local
- ✅ Spark fusiona múltiples operaciones (pipeline)
Ejemplos: map, filter, flatMap
Visualización:
PARTICIÓN 1 (Nodo 1) PARTICIÓN 1 (Nodo 1)[1, 2, 3] [2, 4, 6] ↓ map(x*2) ↓Sin movimiento de datosTodo en el mismo nodo5.2 Wide Transformations (Lentas - Shuffle)
Section titled “5.2 Wide Transformations (Lentas - Shuffle)”Definición: Para calcular una partición de salida, necesitas datos de MUCHAS particiones de entrada.
Características:
- ❌ Requiere Shuffle (mover datos por la red)
- ❌ Escribe datos intermedios en disco
- ❌ Rompe el pipeline (crea Stages separados)
Ejemplos: reduceByKey, groupByKey, sortByKey, distinct
Visualización del Shuffle:
graph TB subgraph "ANTES del Shuffle" A1["Nodo A<br/>(X:1, Y:2, X:3)"] A2["Nodo B<br/>(Y:4, X:5, Z:6)"] end
subgraph "Shuffle (Red + Disco)" S["⚠️ Datos viajan<br/>por la red"] end
subgraph "DESPUÉS del Shuffle" B1["Nodo C<br/>X: [1,3,5]"] B2["Nodo D<br/>Y: [2,4]"] B3["Nodo E<br/>Z: [6]"] end
A1 --> S A2 --> S S --> B1 S --> B2 S --> B3
style A1 fill:#d1ecf1 style A2 fill:#d1ecf1 style S fill:#f8d7da style B1 fill:#d4edda style B2 fill:#d4edda style B3 fill:#d4edda⚠️ El Shuffle es la operación MÁS COSTOSA en Spark
Costes:
- Red: Latencia y ancho de banda
- Disco: Datos intermedios se escriben
- CPU: Serialización/deserialización
- Sincronización: Espera a que termine todo
✅ Checkpoint 4: Narrow vs Wide
Section titled “✅ Checkpoint 4: Narrow vs Wide”Clasifica estas operaciones:
rdd.map(lambda x: x * 2)→ Narrow ✅ / Wide ❌rdd.filter(lambda x: x > 10)→ Narrow ✅ / Wide ❌rdd.reduceByKey(lambda a,b: a+b)→ Narrow ❌ / Wide ✅rdd.distinct()→ Narrow ❌ / Wide ✅
Regla Práctica:
- Si cada elemento se procesa independientemente → Narrow
- Si necesitas agrupar/ordenar/comparar elementos → Wide
📖 Parte 7: Operaciones Clave-Valor (Pair RDDs)
Section titled “📖 Parte 7: Operaciones Clave-Valor (Pair RDDs)”Estas transformaciones solo funcionan en RDDs de tuplas (clave, valor).
7.1 Transformación: reduceByKey()
Section titled “7.1 Transformación: reduceByKey()”Agrupa los valores de cada clave usando una función de agregación (suma, multiplicación, etc.).
⚠️ Nota: reduceByKey() es una transformación Wide (causa Shuffle), pero se ejecuta parcialmente en el Mapper (Combiner) para reducir tráfico de red.
Ejemplo Completo:
Section titled “Ejemplo Completo:”# Ventas por ciudad: (Ciudad, Precio)ventas = sc.parallelize([ ("Madrid", 100), ("Barcelona", 200), ("Madrid", 50), ("Barcelona", 50), ("Sevilla", 300)])
# Sumar ventas por ciudad# a: acumulador, b: nuevo valortotal_ventas = ventas.reduceByKey(lambda a, b: a + b)
# Ejecutarprint(total_ventas.collect())# [('Madrid', 150), ('Barcelona', 250), ('Sevilla', 300)]7.2 Transformación: groupByKey()
Section titled “7.2 Transformación: groupByKey()”Agrupa todos los valores de cada clave en una secuencia (iterable).
⚠️ Nota: groupByKey() es una transformación Wide (causa Shuffle) y muy costosa. Si vas a agregar datos (sumar, contar), NO la uses. Usa reduceByKey.
Ejemplo Completo:
Section titled “Ejemplo Completo:”# Estudiantes por claseclases = sc.parallelize([ ("Clase A", "Juan"), ("Clase B", "Maria"), ("Clase A", "Pedro"), ("Clase B", "Luis")])
# Agrupar estudiantesagrupados = clases.groupByKey()
# Ejecutar y formatear resultadoresultado = agrupados.mapValues(list).collect()print(resultado)# [('Clase A', ['Juan', 'Pedro']), ('Clase B', ['Maria', 'Luis'])]7.3 Transformación: sortByKey()
Section titled “7.3 Transformación: sortByKey()”Ordena el RDD por su clave (ascending=True por defecto).
⚠️ Nota: sortByKey() es una transformación Wide (causa Shuffle).
Ejemplo Completo:
Section titled “Ejemplo Completo:”# Puntuaciones: (Puntos, Nombre)puntos = sc.parallelize([ (50, "Jugador C"), (100, "Jugador A"), (10, "Jugador B")])
# Ordenar por puntos (clave) de menor a mayorranking = puntos.sortByKey()
# Ordenar de mayor a menorranking_desc = puntos.sortByKey(ascending=False)
print(ranking.collect()) # [(10, 'Jugador B'), (50, 'Jugador C'), (100, 'Jugador A')]print(ranking_desc.collect()) # [(100, 'Jugador A'), (50, 'Jugador C'), (10, 'Jugador B')]7.4 Optimización: reduceByKey vs groupByKey
Section titled “7.4 Optimización: reduceByKey vs groupByKey”Problema: groupByKey envía TODOS los valores por la red sin pre-agregar.
Ejemplo Ineficiente (❌ MALO):
palabras = [("hola", 1), ("mundo", 1), ("hola", 1), ("hola", 1)]rdd = sc.parallelize(palabras, 2)
# groupByKey envía [1, 1, 1] por la redconteo_malo = rdd.groupByKey().mapValues(sum)Solución Eficiente (✅ BUENO):
# reduceByKey pre-agrega localmente antes de enviarconteo_bueno = rdd.reduceByKey(lambda a, b: a + b)Comparación Visual:
groupByKey (MALO):Nodo 1: (hola,1), (hola,1), (hola,1) → Envía 3 valoresNodo 2: (hola,1), (hola,1) → Envía 2 valoresTotal red: 5 valores
reduceByKey (BUENO):Nodo 1: (hola,1), (hola,1), (hola,1) → Pre-agrega → Envía (hola,3)Nodo 2: (hola,1), (hola,1) → Pre-agrega → Envía (hola,2)Total red: 2 valores7.5 Resumen: Cuándo Usar Cada Operación
Section titled “7.5 Resumen: Cuándo Usar Cada Operación”| Operación | Cuándo Usar | Ejemplo |
|---|---|---|
reduceByKey | Agregación simple (suma, max, min) | Contar palabras |
aggregateByKey | Agregación compleja (cambio de tipo) | Calcular promedio |
groupByKey | Casi nunca (solo si necesitas TODOS los valores) | Generar listas completas |
sortByKey | Ordenar por clave | Rankings |
mapValues | Transformar solo valores (sin cambiar clave) | Convertir USD a EUR |
📖 Parte 8: Persistencia (Caching)
Section titled “📖 Parte 8: Persistencia (Caching)”8.1 ¿Por Qué Cachear?
Section titled “8.1 ¿Por Qué Cachear?”Por defecto, Spark recalcula el RDD cada vez que ejecutas una acción.
Sin cache:
rdd = sc.textFile("archivo_grande.txt").map(...).filter(...)
print(rdd.count()) # Tarda 10 minutosprint(rdd.count()) # Tarda OTROS 10 minutos (recalcula todo)Con cache:
rdd = sc.textFile("archivo_grande.txt").map(...).filter(...)rdd.cache() # Marca para guardar en RAM
print(rdd.count()) # Tarda 10 minutos (calcula y guarda)print(rdd.count()) # Tarda 0 segundos (lee de RAM)7.2 Niveles de Almacenamiento
Section titled “7.2 Niveles de Almacenamiento”| Nivel | Ubicación | Cuándo Usar |
|---|---|---|
MEMORY_ONLY | RAM | Datos pequeños, máximo rendimiento |
MEMORY_AND_DISK | RAM + Disco | Recomendado - Seguro y eficiente |
MEMORY_ONLY_SER | RAM (serializado) | Datos grandes, ahorra RAM |
DISK_ONLY | Disco | Datos enormes que no caben en RAM |
Ejemplo:
from pyspark import StorageLevel
rdd.persist(StorageLevel.MEMORY_AND_DISK)📖 Parte 9: Variables Compartidas
Section titled “📖 Parte 9: Variables Compartidas”9.1 Accumulators (Contadores)
Section titled “9.1 Accumulators (Contadores)”Permiten sumar valores desde los Executors de forma segura.
Ejemplo: Contar errores
errores = sc.accumulator(0)
def procesar_linea(linea): try: # Procesar... return resultado except: errores.add(1) return None
rdd.map(procesar_linea).collect()
print(f"Errores encontrados: {errores.value}")8.2 Broadcast Variables (Datos Compartidos)
Section titled “8.2 Broadcast Variables (Datos Compartidos)”Envía UNA SOLA copia de datos grandes a cada nodo.
Sin Broadcast (❌ MALO):
tabla_grande = {...} # 100MBrdd.map(lambda x: tabla_grande.get(x)) # Envía 100MB con CADA tareaCon Broadcast (✅ BUENO):
tabla_bc = sc.broadcast(tabla_grande)rdd.map(lambda x: tabla_bc.value.get(x)) # Envía 100MB UNA VEZ por nodo🐛 Errores Comunes
Section titled “🐛 Errores Comunes”Error 1: OutOfMemoryError en Driver
Section titled “Error 1: OutOfMemoryError en Driver”Causa: collect() sobre un RDD enorme
Solución:
# ❌ MALOresultado = rdd_gigante.collect()
# ✅ BUENOmuestra = rdd_gigante.take(100)# O guardar a discordd_gigante.saveAsTextFile("resultado/")Error 2: Task Not Serializable
Section titled “Error 2: Task Not Serializable”Causa: Intentas enviar un objeto no serializable
Solución:
# ❌ MALOconexion_db = crear_conexion()rdd.map(lambda x: conexion_db.query(x))
# ✅ BUENOdef procesar(x): conexion = crear_conexion() # Crear dentro de la función resultado = conexion.query(x) conexion.close() return resultado
rdd.map(procesar)✅ Resumen Final
Section titled “✅ Resumen Final”Conceptos Clave Aprendidos
Section titled “Conceptos Clave Aprendidos”- RDDs son inmutables - Cada transformación crea un nuevo RDD
- Lazy Evaluation - Las transformaciones no se ejecutan hasta una acción
- Narrow vs Wide - Narrow es rápido, Wide causa Shuffle
- Particionamiento - Más particiones = más paralelismo
- Cache - Reutiliza RDDs para evitar recalcular
- reduceByKey > groupByKey - Pre-agrega localmente
Checklist de Optimización
Section titled “Checklist de Optimización”Antes de quejarte de que “Spark es lento”:
- ¿Usas
reduceByKeyen lugar degroupByKey? - ¿El número de particiones es 2-4x el número de CPUs?
- ¿Cacheas RDDs que usas múltiples veces?
- ¿Evitas
collect()en datasets grandes? - ¿Filtras datos lo antes posible?
🎯 Ejercicios Prácticos
Section titled “🎯 Ejercicios Prácticos”Ver soluciones paso a paso en: 02RDD_new_SOL.txt
Ejercicio 1: Word Count Básico
Section titled “Ejercicio 1: Word Count Básico”Cuenta cuántas veces aparece cada palabra en un texto.
texto = ["hola mundo", "hola spark", "mundo spark"]
Ejercicio 2: Análisis de Ventas
Section titled “Ejercicio 2: Análisis de Ventas”Datos: [("Madrid", 100), ("Barcelona", 200), ("Madrid", 50)]
- Suma las ventas por ciudad
- Filtra ciudades con ventas > 100
- Ordena de mayor a menor
Ejercicio 3: Filtrado y Transformación
Section titled “Ejercicio 3: Filtrado y Transformación”Teniendo precios en USD: [10, 25, 40, 5, 100, 15], filtra los >= 20 y conviértelos a EUR (x0.9).
Ejercicio 4: Procesamiento de CSV
Section titled “Ejercicio 4: Procesamiento de CSV”Divide, aplana y cuenta productos únicos de: ["Leche,Huevos,Pan", "Manzanas,Peras", "Leche,Chocolate"]
Ejercicio 5: Análisis de Logs
Section titled “Ejercicio 5: Análisis de Logs”Teniendo logs ip,hora,metodo,url, cuenta peticiones por IP y filtra las que tengan > 1 petición.
Ejercicio 6: Optimización
Section titled “Ejercicio 6: Optimización”Tienes un RDD que usas 5 veces en diferentes cálculos. ¿Qué debes hacer para optimizar?
Última actualización: 2026-02-02