Skip to content

DataFrames en Spark

Mientras que los RDDs te obligan a decir CÓMO hacer las cosas (control manual), los DataFrames te permiten decir QUÉ quieres (declarativo) y Spark decide la mejor forma de hacerlo.

graph TB
subgraph RDD["RDD: Control Manual (Lento)"]
R1["Tu código"] --> R2["Serialización Java"] --> R3["Ejecución literal"]
end
style RDD fill:#ffcdd2,stroke:#c62828
graph TB
subgraph DF["DataFrame: Optimizado (Rápido)"]
D1["Tu código"] --> D2["Catalyst Optimizer"] --> D3["Tungsten Binary"] --> D4["Ejecución C++ style"]
end
style DF fill:#c8e6c9,stroke:#388e3c

El Catalyst Optimizer es la razón clave para usar DataFrames, reescribe las consultas internamente para que sean lo más rápida posible.

Optimizaciones automáticas:

  1. Predicate Pushdown: Mueve los filter() lo más cerca posible de la fuente de datos (para leer menos).
  2. Column Pruning: Si solo seleccionas 2 columnas, Spark solo leerá esas 2 columnas del disco, ignorando el resto.
  3. Join Reordering: Si haces múltiples joins, Spark decide el orden óptimo para minimizar el movimiento de datos (Shuffle).

Es común usar RDDs para limpiar datos muy sucios y luego pasarlos a DataFrames.

# Crear un RDD de tuplas
rdd = sc.parallelize([("Angel", 9), ("Maria", 3), ("Ramon", 7)])
# Convertir a DataFrame (Lazy)
df = rdd.toDF(["nombre", "nota"])
# Acción (Eager, carga los datos)
df.show()
# +------+----+
# |nombre|nota|
# +------+----+
# | Angel| 9|
# | Maria| 3|
# | Ramon| 7|
# +------+----+
data = [("Laptop", 1200), ("Mouse", 25)]
df = spark.createDataFrame(data, ["producto", "precio"])
df.printSchema()
# root
# |-- producto: string (nullable = true)
# |-- precio: long (nullable = true)
# Lectura optimizada (Lazy)
df = spark.read.csv(
"ventas.csv",
header=True, # La primera línea son los nombres de columna
inferSchema=True, # Spark adivina los tipos de datos (int, string...)
sep=";" # Separador personalizado
)
# Acción (Eager)
df.show(5)

df.show() # Primeras 20 filas
df.show(5) # Primeras 5 filas
df.printSchema() # Estructura (esquema)
df.head(3) # Array de primeras 3 filas

Estas operaciones son LAZY (no se ejecutan hasta llamar a una acción como show() o collect()).

# Seleccionar columnas específicas
df_sel = df.select("producto", "precio")
# Seleccionar y transformar al vuelo
df_calc = df.select(
df["producto"],
(df["precio"] * 1.21).alias("precio_iva")
)
df_calc.show() # Acción
# ❌ INCORRECTO: SyntaxError
# df.filter(df.precio > 100 and df.stock > 0)
# ✅ CORRECTO: Paréntesis obligatorios
filtrado = df.filter((df.precio > 100) & (df.stock > 0))
# Filtro con OR
ofertazo = df.filter((df.descuento > 50) | (df.precio < 10))
# Filtro con NOT (~)
stock_activo = df.filter(~(df.stock == 0))
filtrado.show() # Acción

withColumn() - Añadir o Modificar Columnas

Section titled “withColumn() - Añadir o Modificar Columnas”
from pyspark.sql.functions import col, lit
# Añadir una columna constante (lit = literal)
df1 = df.withColumn("pais", lit("España"))
# Crear columna basada en otra
df2 = df1.withColumn("precio_doble", col("precio") * 2)
# Modificar columna existente (casting de tipo)
df3 = df2.withColumn("precio", col("precio").cast("double"))
df3.printSchema() # Acción (Metadata)
# Eliminar una
df_limpio = df.drop("columna_innecesaria")
# Eliminar varias
df_super_limpio = df.drop("aux1", "aux2", "temp")
df.select("Country").distinct().show()
df.dropDuplicates(["Country"]).show()
df.sort("nota").show() # Ascendente
df.sort(df.nota.desc()).show() # Descendente
df.orderBy("nota", ascending=False).show() # Descendente

✅ Checkpoint 1: Contesta las siguientes preguntas

Section titled “✅ Checkpoint 1: Contesta las siguientes preguntas”
  1. Crea un DataFrame con datos de empleados: [("Ana", 25000, "Venta"), ("Luis", 32000, "IT")]
  2. Selecciona el nombre y crea una nueva columna salario_future aumentado un 10%.
  3. Filtra solo los empleados de ‘Venta’ que ganen menos de 30000.
Pista Usa `withColumn` o `select` con multiplicación, y `filter` con condición `&`.

Operaciones que reducen múltiples filas a una sola (similar a SQL).

from pyspark.sql.functions import count, avg, max, sum
# Agrupar por departamento y contar
df.groupBy("departamento").count().show()
# Agregaciones múltiples
res = df.groupBy("departamento").agg(
avg("salario").alias("salario_medio"),
max("salario").alias("salario_maximo"),
sum("plazas").alias("total_plazas")
)
res.show()

Objetivo: Agrupar datos. Usando el DataFrame anterior:

  1. Agrupa por departamento.
  2. Calcula el salario medio y el número total de empleados por departamento.
Pista Usa `groupBy("col").agg(avg(...), count(...))`.

Las funciones de ventana permiten hacer cálculos sobre grupos de filas sin colapsarlas (a diferencia de groupBy). Ideal para rankings, acumulados o comparaciones con filas anteriores.

Imagina que quieres el “Ranking de ventas por empleado”.

  • groupBy te daría solo 1 fila por empleado (se pierde el detalle de cada venta).
  • Window mantiene TODAS las ventas y añade una columna “Ranking” al lado.
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
# Definir la ventana: Particionar por Dept y Ordenar por Salario
ventana = Window.partitionBy("departamento").orderBy(col("salario").desc())
# Aplicar (Lazy)
df_rank = df.withColumn("ranking_salario", rank().over(ventana))
# Acción
df_rank.show()
# Resultado:
# | Nombre | Dept | Salario | Ranking |
# | Maria | IT | 5000 | 1 |
# | Juan | IT | 3000 | 2 |

Objetivo: Entender la partición de ventanas. Imagina que quieres clasificar a los empleados por salario, pero dentro de cada departamento (el que más gana de IT es el #1 de IT, el que más gana de Ventas es el #1 de Ventas). ¿Cómo definirías la ventana?

Pista `Window.partitionBy("departamento").orderBy(...)`

Un Join combina filas de dos DataFrames basándose en una columna común (clave). Es fundamental entender la teoría de conjuntos para saber qué datos obtendrás.

graph TB
subgraph "Dataframe derecho"
DF2["Salarios<br/>id | salario<br/>2 | 3000<br/>3 | 2500<br/>4 | 2800"]
end
subgraph "Dataframe izquierdo"
DF1["Empleados<br/>id | nombre<br/>1 | Ana<br/>2 | Juan<br/>3 | María"]
end
graph TB
subgraph "RIGHT JOIN"
R["TODOS los registros del DF derecho<br/>+ coincidencias del izquierdo<br/><br/>Resultado:<br/>2 | Juan | 3000<br/>3 | María | 2500<br/>4 | null | 2800"]
end
subgraph "LEFT JOIN"
L["TODOS los registros del DF izquierdo<br/>+ coincidencias del derecho<br/><br/>Resultado:<br/>1 | Ana | null<br/>2 | Juan | 3000<br/>3 | María | 2500"]
end
style L fill:#fff9c4,stroke:#f57c00
style R fill:#fff9c4,stroke:#f57c00
graph TB
subgraph "INNER JOIN"
I["Solo registros que coinciden<br/>en AMBOS DataFrames<br/><br/>Resultado:<br/>2 | Juan | 3000<br/>3 | María | 2500"]
end
subgraph "OUTER JOIN"
O["TODOS los registros de AMBOS<br/><br/>Resultado:<br/>1 | Ana | null<br/>2 | Juan | 3000<br/>3 | María | 2500<br/>4 | null | 2800"]
end
style I fill:#c8e6c9,stroke:#388e3c
style O fill:#e1bee7,stroke:#8e24aa

Ejemplos prácticos:

# Datos de ejemplo
empleados = spark.createDataFrame([
(1, "Ana"),
(2, "Juan"),
(3, "María")
], ["id", "nombre"])
salarios = spark.createDataFrame([
(2, 3000),
(3, 2500),
(4, 2800)
], ["id", "salario"])
# INNER JOIN - Solo coincidencias (por defecto)
empleados.join(salarios, "id").show()
# Resultado: Juan y María (tienen salario)
# LEFT JOIN - Todos los empleados, con o sin salario
empleados.join(salarios, "id", "left").show()
# Resultado: Ana (null), Juan (3000), María (2500)
# RIGHT JOIN - Todos los salarios, con o sin empleado
empleados.join(salarios, "id", "right").show()
# Resultado: Juan (3000), María (2500), null (2800)
# OUTER JOIN - Todos los registros de ambos
empleados.join(salarios, "id", "outer").show()
# Resultado: Ana (null), Juan (3000), María (2500), null (2800)
# Join con múltiples columnas
df1.join(df2, ["columna1", "columna2"], "inner").show()
# Join con condiciones complejas
df1.join(df2, (df1.id == df2.emp_id) & (df1.dept == df2.dept)).show()

Si una tabla es pequeña (<10MB), usa broadcast(). Evita el Shuffle (movimiento de datos en red) enviando una copia de la tabla pequeña a todos los nodos.

from pyspark.sql.functions import broadcast
# df_grande join df_pequeño
resultado = df_grande.join(broadcast(df_pequeño), "clave")

Objetivo: Combinar datos. Tienes empleados (id, nombre) y proyectos (id_empleado, proyecto). ¿Qué tipo de JOIN usarías para ver todos los empleados, tengan o no proyecto asignado?

Pista Necesitas mantener todas las filas de la tabla izquierda (`empleados`).

Puedes tratar los DataFrames como tablas SQL temporales y escribir SQL puro.

# 1. Registrar DataFrame como vista temporal
df.createOrReplaceTempView("empleados")
# 2. Escribir SQL (retorna un DataFrame)
df_sql = spark.sql("""
SELECT departamento, avg(salario) as media
FROM empleados
WHERE edad > 25
GROUP BY departamento
""")
# 3. Acción
df_sql.show()

❌ Error 1: collect() en Big Data Nunca hagas df.collect() si el DataFrame tiene millones de filas. Traerás todo a la RAM de tu ordenador (Driver) y explotará (OutOfMemoryError).

  • ✅ Usa df.show(20) para ver muestras.
  • ✅ Usa df.limit(10).collect() si necesitas una lista Python pequeña.

❌ Error 2: Tipos de datos incorrectos Si sumas strings, Spark puede concatenar o fallar.

  • ✅ Verifica con df.printSchema().
  • ✅ Corrige con df.withColumn("precio", col("precio").cast("double")).

  1. Accede a Spark UI: http://localhost:4040 (mientras tu aplicación corre)
  2. Revisa:
    • Jobs: Cuánto tarda cada job
    • Stages: Identifica stages lentos
    • Storage: Verifica qué DataFrames están cacheados
    • SQL: Ve el plan de ejecución optimizado
# Ver el plan de ejecución físico
df.explain()
# == Physical Plan ==
# *(1) Project [nombre#1, (salario#2L * 1.1) AS nuevo_salario#5]
# +- *(1) Filter (isnotnull(salario#2L) AND (salario#2L > 30000))
# +- *(1) Scan ExistingRDD[nombre#1,salario#2L]
# Ver el plan de ejecución completo (lógico + físico)
df.explain(True)

Este script simula un “trabajo pesado” para que puedas verlo en la pestaña Jobs de Spark UI.

import time
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# 1. Definir una función lenta (duerme 1 segundo por fila)
def proceso_lento(x):
time.sleep(1)
return x * x
# 2. Registrar como UDF (User Defined Function)
spark.udf.register("proceso_lento", proceso_lento, IntegerType())
slow_udf = udf(proceso_lento, IntegerType())
# 3. Crear DataFrame con 10 particiones (para ver paralelismo)
# Range(0, 20) = 20 filas. Repartidas en 10 particiones.
df = spark.range(0, 20, 1, 10)
# 4. Aplicar transformación (Lazy - no hace nada todavía)
df_lento = df.withColumn("cuadrado", slow_udf("id"))
print("Iniciando Job lento...")
start = time.time()
# 5. Acción (Eager) - Dispara el Job
# Si tienes 2 cores, tardará aprox 10 seg (20 tareas / 2 paralelo * 1s)
df_lento.count()
print(f"Job terminado en {time.time() - start:.2f} segundos")

Dada la lista: [("Ana", 28, "Ventas", 35000), ("Juan", 35, "IT", 45000), ("María", 42, "IT", 55000), ("Pedro", 31, "Ventas", 38000), ("Laura", 29, "Marketing", 40000)]

  1. Crea un DataFrame con columnas: nombre, edad, departamento, salario.
  2. Muestra su esquema y las primeras 3 filas.
  3. Cuenta el total de empleados.

Usa operadores lógicos para:

  1. Empleados de “IT” con salario > 40000.
  2. Empleados de “Ventas” O “Marketing”.
  3. Empleados que NO sean de “IT”.
  4. Empleados entre 30 y 40 años.
  1. Calcula salario promedio por departamento.
  2. Obtén salario máximo y mínimo por departamento.
  3. Cuenta empleados por departamento.
  4. Calcula el gasto total en salarios por departamento y ordénalo de mayor a menor.

Dadas ventas mensuales: [("Enero", "ProdA", 100), ("Enero", "ProdB", 150), ("Febrero", "ProdA", 120)]. Crea un ranking de productos por ventas para cada mes.