DataFrames en Spark
1. ¿Por qué DataFrames?
Section titled “1. ¿Por qué DataFrames?”Mientras que los RDDs te obligan a decir CÓMO hacer las cosas (control manual), los DataFrames te permiten decir QUÉ quieres (declarativo) y Spark decide la mejor forma de hacerlo.
Comparación: RDD vs DataFrame
Section titled “Comparación: RDD vs DataFrame”graph TB subgraph RDD["RDD: Control Manual (Lento)"] R1["Tu código"] --> R2["Serialización Java"] --> R3["Ejecución literal"] end
style RDD fill:#ffcdd2,stroke:#c62828graph TB subgraph DF["DataFrame: Optimizado (Rápido)"] D1["Tu código"] --> D2["Catalyst Optimizer"] --> D3["Tungsten Binary"] --> D4["Ejecución C++ style"] end
style DF fill:#c8e6c9,stroke:#388e3c1.1 Catalyst Optimizer - El Cerebro
Section titled “1.1 Catalyst Optimizer - El Cerebro”El Catalyst Optimizer es la razón clave para usar DataFrames, reescribe las consultas internamente para que sean lo más rápida posible.
Optimizaciones automáticas:
- Predicate Pushdown: Mueve los
filter()lo más cerca posible de la fuente de datos (para leer menos). - Column Pruning: Si solo seleccionas 2 columnas, Spark solo leerá esas 2 columnas del disco, ignorando el resto.
- Join Reordering: Si haces múltiples joins, Spark decide el orden óptimo para minimizar el movimiento de datos (Shuffle).
2. Creación de DataFrames
Section titled “2. Creación de DataFrames”Desde RDD (Transición)
Section titled “Desde RDD (Transición)”Es común usar RDDs para limpiar datos muy sucios y luego pasarlos a DataFrames.
# Crear un RDD de tuplasrdd = sc.parallelize([("Angel", 9), ("Maria", 3), ("Ramon", 7)])
# Convertir a DataFrame (Lazy)df = rdd.toDF(["nombre", "nota"])
# Acción (Eager, carga los datos)df.show()# +------+----+# |nombre|nota|# +------+----+# | Angel| 9|# | Maria| 3|# | Ramon| 7|# +------+----+Desde Listas (para testing)
Section titled “Desde Listas (para testing)”data = [("Laptop", 1200), ("Mouse", 25)]df = spark.createDataFrame(data, ["producto", "precio"])
df.printSchema()# root# |-- producto: string (nullable = true)# |-- precio: long (nullable = true)Desde Archivos (Producción)
Section titled “Desde Archivos (Producción)”# Lectura optimizada (Lazy)df = spark.read.csv( "ventas.csv", header=True, # La primera línea son los nombres de columna inferSchema=True, # Spark adivina los tipos de datos (int, string...) sep=";" # Separador personalizado)
# Acción (Eager)df.show(5)3. Operaciones Básicas
Section titled “3. Operaciones Básicas”3.1. Mostrar datos
Section titled “3.1. Mostrar datos”df.show() # Primeras 20 filasdf.show(5) # Primeras 5 filasdf.printSchema() # Estructura (esquema)df.head(3) # Array de primeras 3 filas3.2. Transformaciones
Section titled “3.2. Transformaciones”Estas operaciones son LAZY (no se ejecutan hasta llamar a una acción como show() o collect()).
select() - Elegir Columnas
Section titled “select() - Elegir Columnas”# Seleccionar columnas específicasdf_sel = df.select("producto", "precio")
# Seleccionar y transformar al vuelodf_calc = df.select( df["producto"], (df["precio"] * 1.21).alias("precio_iva"))
df_calc.show() # Acciónfilter() / where() - Filtrar Filas
Section titled “filter() / where() - Filtrar Filas”# ❌ INCORRECTO: SyntaxError# df.filter(df.precio > 100 and df.stock > 0)
# ✅ CORRECTO: Paréntesis obligatoriosfiltrado = df.filter((df.precio > 100) & (df.stock > 0))
# Filtro con ORofertazo = df.filter((df.descuento > 50) | (df.precio < 10))
# Filtro con NOT (~)stock_activo = df.filter(~(df.stock == 0))
filtrado.show() # AcciónwithColumn() - Añadir o Modificar Columnas
Section titled “withColumn() - Añadir o Modificar Columnas”from pyspark.sql.functions import col, lit
# Añadir una columna constante (lit = literal)df1 = df.withColumn("pais", lit("España"))
# Crear columna basada en otradf2 = df1.withColumn("precio_doble", col("precio") * 2)
# Modificar columna existente (casting de tipo)df3 = df2.withColumn("precio", col("precio").cast("double"))
df3.printSchema() # Acción (Metadata)drop() - Eliminar Columnas
Section titled “drop() - Eliminar Columnas”# Eliminar unadf_limpio = df.drop("columna_innecesaria")
# Eliminar variasdf_super_limpio = df.drop("aux1", "aux2", "temp")Eliminar duplicados
Section titled “Eliminar duplicados”df.select("Country").distinct().show()df.dropDuplicates(["Country"]).show()Ordenar
Section titled “Ordenar”df.sort("nota").show() # Ascendentedf.sort(df.nota.desc()).show() # Descendentedf.orderBy("nota", ascending=False).show() # Descendente✅ Checkpoint 1: Contesta las siguientes preguntas
Section titled “✅ Checkpoint 1: Contesta las siguientes preguntas”- Crea un DataFrame con datos de empleados:
[("Ana", 25000, "Venta"), ("Luis", 32000, "IT")] - Selecciona el nombre y crea una nueva columna
salario_futureaumentado un 10%. - Filtra solo los empleados de ‘Venta’ que ganen menos de 30000.
Pista
Usa `withColumn` o `select` con multiplicación, y `filter` con condición `&`.4. Agregaciones (groupBy)
Section titled “4. Agregaciones (groupBy)”Operaciones que reducen múltiples filas a una sola (similar a SQL).
from pyspark.sql.functions import count, avg, max, sum
# Agrupar por departamento y contardf.groupBy("departamento").count().show()
# Agregaciones múltiplesres = df.groupBy("departamento").agg( avg("salario").alias("salario_medio"), max("salario").alias("salario_maximo"), sum("plazas").alias("total_plazas"))
res.show()✅ Checkpoint 2: Agregaciones
Section titled “✅ Checkpoint 2: Agregaciones”Objetivo: Agrupar datos. Usando el DataFrame anterior:
- Agrupa por
departamento. - Calcula el salario medio y el número total de empleados por departamento.
Pista
Usa `groupBy("col").agg(avg(...), count(...))`.5. Window Functions (Análisis Avanzado)
Section titled “5. Window Functions (Análisis Avanzado)”Las funciones de ventana permiten hacer cálculos sobre grupos de filas sin colapsarlas (a diferencia de groupBy). Ideal para rankings, acumulados o comparaciones con filas anteriores.
Visualización
Section titled “Visualización”Imagina que quieres el “Ranking de ventas por empleado”.
groupByte daría solo 1 fila por empleado (se pierde el detalle de cada venta).Windowmantiene TODAS las ventas y añade una columna “Ranking” al lado.
Ejemplo: Ranking
Section titled “Ejemplo: Ranking”from pyspark.sql.window import Windowfrom pyspark.sql.functions import rank, col
# Definir la ventana: Particionar por Dept y Ordenar por Salarioventana = Window.partitionBy("departamento").orderBy(col("salario").desc())
# Aplicar (Lazy)df_rank = df.withColumn("ranking_salario", rank().over(ventana))
# Accióndf_rank.show()# Resultado:# | Nombre | Dept | Salario | Ranking |# | Maria | IT | 5000 | 1 |# | Juan | IT | 3000 | 2 |✅ Checkpoint 3: Window Functions
Section titled “✅ Checkpoint 3: Window Functions”Objetivo: Entender la partición de ventanas. Imagina que quieres clasificar a los empleados por salario, pero dentro de cada departamento (el que más gana de IT es el #1 de IT, el que más gana de Ventas es el #1 de Ventas). ¿Cómo definirías la ventana?
Pista
`Window.partitionBy("departamento").orderBy(...)`6. Joins (Combinar DataFrames)
Section titled “6. Joins (Combinar DataFrames)”Tipos de Join
Section titled “Tipos de Join”Un Join combina filas de dos DataFrames basándose en una columna común (clave). Es fundamental entender la teoría de conjuntos para saber qué datos obtendrás.
Visualización de Conjuntos
Section titled “Visualización de Conjuntos”graph TB subgraph "Dataframe derecho" DF2["Salarios<br/>id | salario<br/>2 | 3000<br/>3 | 2500<br/>4 | 2800"] end
subgraph "Dataframe izquierdo" DF1["Empleados<br/>id | nombre<br/>1 | Ana<br/>2 | Juan<br/>3 | María"]
endgraph TB subgraph "RIGHT JOIN" R["TODOS los registros del DF derecho<br/>+ coincidencias del izquierdo<br/><br/>Resultado:<br/>2 | Juan | 3000<br/>3 | María | 2500<br/>4 | null | 2800"] end
subgraph "LEFT JOIN" L["TODOS los registros del DF izquierdo<br/>+ coincidencias del derecho<br/><br/>Resultado:<br/>1 | Ana | null<br/>2 | Juan | 3000<br/>3 | María | 2500"] end
style L fill:#fff9c4,stroke:#f57c00 style R fill:#fff9c4,stroke:#f57c00graph TB subgraph "INNER JOIN" I["Solo registros que coinciden<br/>en AMBOS DataFrames<br/><br/>Resultado:<br/>2 | Juan | 3000<br/>3 | María | 2500"] end
subgraph "OUTER JOIN" O["TODOS los registros de AMBOS<br/><br/>Resultado:<br/>1 | Ana | null<br/>2 | Juan | 3000<br/>3 | María | 2500<br/>4 | null | 2800"] end
style I fill:#c8e6c9,stroke:#388e3c style O fill:#e1bee7,stroke:#8e24aaEjemplos prácticos:
# Datos de ejemploempleados = spark.createDataFrame([ (1, "Ana"), (2, "Juan"), (3, "María")], ["id", "nombre"])
salarios = spark.createDataFrame([ (2, 3000), (3, 2500), (4, 2800)], ["id", "salario"])
# INNER JOIN - Solo coincidencias (por defecto)empleados.join(salarios, "id").show()# Resultado: Juan y María (tienen salario)
# LEFT JOIN - Todos los empleados, con o sin salarioempleados.join(salarios, "id", "left").show()# Resultado: Ana (null), Juan (3000), María (2500)
# RIGHT JOIN - Todos los salarios, con o sin empleadoempleados.join(salarios, "id", "right").show()# Resultado: Juan (3000), María (2500), null (2800)
# OUTER JOIN - Todos los registros de ambosempleados.join(salarios, "id", "outer").show()# Resultado: Ana (null), Juan (3000), María (2500), null (2800)
# Join con múltiples columnasdf1.join(df2, ["columna1", "columna2"], "inner").show()
# Join con condiciones complejasdf1.join(df2, (df1.id == df2.emp_id) & (df1.dept == df2.dept)).show()Optimización: Broadcast Join
Section titled “Optimización: Broadcast Join”Si una tabla es pequeña (<10MB), usa broadcast(). Evita el Shuffle (movimiento de datos en red) enviando una copia de la tabla pequeña a todos los nodos.
from pyspark.sql.functions import broadcast
# df_grande join df_pequeñoresultado = df_grande.join(broadcast(df_pequeño), "clave")✅ Checkpoint 4: Joins
Section titled “✅ Checkpoint 4: Joins”Objetivo: Combinar datos.
Tienes empleados (id, nombre) y proyectos (id_empleado, proyecto).
¿Qué tipo de JOIN usarías para ver todos los empleados, tengan o no proyecto asignado?
Pista
Necesitas mantener todas las filas de la tabla izquierda (`empleados`).7. SQL Interoperabilidad
Section titled “7. SQL Interoperabilidad”Puedes tratar los DataFrames como tablas SQL temporales y escribir SQL puro.
# 1. Registrar DataFrame como vista temporaldf.createOrReplaceTempView("empleados")
# 2. Escribir SQL (retorna un DataFrame)df_sql = spark.sql(""" SELECT departamento, avg(salario) as media FROM empleados WHERE edad > 25 GROUP BY departamento""")
# 3. Accióndf_sql.show()8. Errores Comunes
Section titled “8. Errores Comunes”❌ Error 1: collect() en Big Data
Nunca hagas df.collect() si el DataFrame tiene millones de filas. Traerás todo a la RAM de tu ordenador (Driver) y explotará (OutOfMemoryError).
- ✅ Usa
df.show(20)para ver muestras. - ✅ Usa
df.limit(10).collect()si necesitas una lista Python pequeña.
❌ Error 2: Tipos de datos incorrectos Si sumas strings, Spark puede concatenar o fallar.
- ✅ Verifica con
df.printSchema(). - ✅ Corrige con
df.withColumn("precio", col("precio").cast("double")).
9. Debugging con Spark UI
Section titled “9. Debugging con Spark UI”- Accede a Spark UI:
http://localhost:4040(mientras tu aplicación corre) - Revisa:
- Jobs: Cuánto tarda cada job
- Stages: Identifica stages lentos
- Storage: Verifica qué DataFrames están cacheados
- SQL: Ve el plan de ejecución optimizado
# Ver el plan de ejecución físicodf.explain()
# == Physical Plan ==# *(1) Project [nombre#1, (salario#2L * 1.1) AS nuevo_salario#5]# +- *(1) Filter (isnotnull(salario#2L) AND (salario#2L > 30000))# +- *(1) Scan ExistingRDD[nombre#1,salario#2L]
# Ver el plan de ejecución completo (lógico + físico)df.explain(True)Ejemplo: Job Lento (Simulación)
Section titled “Ejemplo: Job Lento (Simulación)”Este script simula un “trabajo pesado” para que puedas verlo en la pestaña Jobs de Spark UI.
import timefrom pyspark.sql.functions import udffrom pyspark.sql.types import IntegerType
# 1. Definir una función lenta (duerme 1 segundo por fila)def proceso_lento(x): time.sleep(1) return x * x
# 2. Registrar como UDF (User Defined Function)spark.udf.register("proceso_lento", proceso_lento, IntegerType())slow_udf = udf(proceso_lento, IntegerType())
# 3. Crear DataFrame con 10 particiones (para ver paralelismo)# Range(0, 20) = 20 filas. Repartidas en 10 particiones.df = spark.range(0, 20, 1, 10)
# 4. Aplicar transformación (Lazy - no hace nada todavía)df_lento = df.withColumn("cuadrado", slow_udf("id"))
print("Iniciando Job lento...")start = time.time()
# 5. Acción (Eager) - Dispara el Job# Si tienes 2 cores, tardará aprox 10 seg (20 tareas / 2 paralelo * 1s)df_lento.count()
print(f"Job terminado en {time.time() - start:.2f} segundos")10. Ejercicios Prácticos
Section titled “10. Ejercicios Prácticos”Ejercicio 1: Creación y Exploración
Section titled “Ejercicio 1: Creación y Exploración”Dada la lista:
[("Ana", 28, "Ventas", 35000), ("Juan", 35, "IT", 45000), ("María", 42, "IT", 55000), ("Pedro", 31, "Ventas", 38000), ("Laura", 29, "Marketing", 40000)]
- Crea un DataFrame con columnas: nombre, edad, departamento, salario.
- Muestra su esquema y las primeras 3 filas.
- Cuenta el total de empleados.
Ejercicio 2: Filtrado
Section titled “Ejercicio 2: Filtrado”Usa operadores lógicos para:
- Empleados de “IT” con salario > 40000.
- Empleados de “Ventas” O “Marketing”.
- Empleados que NO sean de “IT”.
- Empleados entre 30 y 40 años.
Ejercicio 3: Agregaciones
Section titled “Ejercicio 3: Agregaciones”- Calcula salario promedio por departamento.
- Obtén salario máximo y mínimo por departamento.
- Cuenta empleados por departamento.
- Calcula el gasto total en salarios por departamento y ordénalo de mayor a menor.
Ejercicio 4: Window Functions
Section titled “Ejercicio 4: Window Functions”Dadas ventas mensuales: [("Enero", "ProdA", 100), ("Enero", "ProdB", 150), ("Febrero", "ProdA", 120)].
Crea un ranking de productos por ventas para cada mes.