Skip to content

DataFrames en Spark

Objetivo: Transición hacia la API moderna de Spark para obtener mejor rendimiento y productividad.

Mientras que los RDDs son instrucciones de “cómo hacer” (bajo nivel), los DataFrames son instrucciones de “qué quiero” (alto nivel).

graph TB
subgraph RDD["RDD - Control Manual"]
R1["📝 Tu código:<br/>map(), filter(), reduce()"]
R2["🔧 Tú decides:<br/>- Cómo particionar<br/>- Cuándo hacer shuffle<br/>- Qué optimizar"]
R3["⚙️ Spark ejecuta<br/>exactamente lo que escribiste"]
R1 --> R2 --> R3
end
style R1 fill:#ffcdd2,stroke:#c62828
style R2 fill:#ffcdd2,stroke:#c62828
style R3 fill:#ffcdd2,stroke:#c62828
graph TB
subgraph DF["DataFrame - Optimización Automática"]
D1["📝 Tu código:<br/>select(), where(), groupBy()"]
D2["🤖 Catalyst Optimizer:<br/>- Analiza tu query<br/>- Reordena operaciones<br/>- Elimina pasos innecesarios"]
D3["⚡ Tungsten Engine:<br/>- Genera código optimizado<br/>- Gestión eficiente de memoria"]
D4["🚀 Ejecución optimizada<br/>(hasta 10x más rápido)"]
D1 --> D2 --> D3 --> D4
end
style D1 fill:#c8e6c9,stroke:#388e3c
style D2 fill:#c8e6c9,stroke:#388e3c
style D3 fill:#c8e6c9,stroke:#388e3c
style D4 fill:#a5d6a7,stroke:#2e7d32,stroke-width:3px
AspectoRDDDataFrame
Nivel de AbstracciónBajo (cómo)Alto (qué)
EstructuraSin esquemaCon esquema (columnas con tipos)
OptimizaciónManualAutomática (Catalyst Optimizer)
RendimientoMás lentoHasta 10x más rápido
SQL Nativo❌ No✅ Sí
Gestión de MemoriaJVM HeapTungsten (off-heap, más eficiente)
SerializaciónJava/KryoTungsten Binary Format (más rápido)
Caso de UsoControl total, datos no estructuradosAnálisis, agregaciones, joins
Cuándo usarTransformaciones muy personalizadasEl 95% de los casos

1.5. Catalyst Optimizer - El Cerebro de DataFrames

Section titled “1.5. Catalyst Optimizer - El Cerebro de DataFrames”

El Catalyst Optimizer es el motor de optimización que hace que DataFrames sean tan rápidos. Tú escribes QUÉ quieres, Catalyst decide CÓMO hacerlo de la forma más eficiente.

graph TB
Q["📝 Tu Query<br/>(SQL o DataFrame API)"]
LP["🔍 Logical Plan<br/>(Representación abstracta)"]
OLP["⚡ Optimized Logical Plan<br/>- Predicate Pushdown<br/>- Column Pruning<br/>- Constant Folding<br/>- Join Reordering"]
PP["🎯 Physical Plans<br/>(Múltiples estrategias)"]
BP["✅ Best Physical Plan<br/>(Basado en costes)"]
CODE["🚀 Generated Code<br/>(JVM bytecode optimizado)"]
Q --> LP
LP --> OLP
OLP --> PP
PP --> BP
BP --> CODE
style Q fill:#e3f2fd,stroke:#1976d2
style LP fill:#fff9c4,stroke:#f57c00
style OLP fill:#c8e6c9,stroke:#388e3c,stroke-width:2px
style PP fill:#fff9c4,stroke:#f57c00
style BP fill:#a5d6a7,stroke:#2e7d32,stroke-width:3px
style CODE fill:#a5d6a7,stroke:#2e7d32,stroke-width:3px

Explicación del flujo paso a paso:

1. 📝 Tu Query (SQL o DataFrame API)

  • Escribes tu código usando la API de DataFrames o SQL
  • Ejemplo: df.filter(df.edad > 18).select("nombre")
  • Catalyst recibe esta query como entrada

2. 🔍 Logical Plan (Plan Lógico)

  • Catalyst convierte tu query en un árbol de operaciones abstractas
  • Representa QUÉ quieres hacer, no CÓMO hacerlo
  • Todavía no hay decisiones sobre ejecución física

3. ⚡ Optimized Logical Plan (Plan Lógico Optimizado)

  • Aquí ocurre la magia - Catalyst aplica reglas de optimización:
    • Predicate Pushdown: Mueve filtros lo antes posible
    • Column Pruning: Elimina columnas que no se usan
    • Constant Folding: Precalcula expresiones constantes
    • Join Reordering: Reordena joins para minimizar datos intermedios
  • El plan sigue siendo abstracto, pero mucho más eficiente

4. 🎯 Physical Plans (Planes Físicos)

  • Catalyst genera múltiples estrategias de ejecución física
  • Cada plan representa una forma diferente de ejecutar la query
  • Ejemplo: ¿Usar hash join o sort-merge join?

5. ✅ Best Physical Plan (Mejor Plan Físico)

  • Catalyst usa un modelo de costes para elegir el mejor plan
  • Considera: tamaño de datos, memoria disponible, particiones, etc.
  • Selecciona la estrategia más eficiente

6. 🚀 Generated Code (Código Generado)

  • Tungsten Code Generation: Genera bytecode JVM optimizado
  • Elimina overhead de interpretación
  • El código resultante es tan rápido como si lo hubieras escrito a mano en Java

1. Predicate Pushdown - Filtrar lo antes posible

# Ejemplo de código
df = spark.read.parquet("ventas.parquet")
resultado = df.filter(df.pais == "España").select("producto", "precio")
# Lo que Catalyst hace automáticamente:
# 1. Mueve el filtro ANTES de leer todos los datos
# 2. Solo lee las columnas "producto", "precio", "pais" (Column Pruning)
# 3. Aplica el filtro a nivel de archivo Parquet (más eficiente)
# Ver el plan de ejecución
resultado.explain()
# Verás que el filtro (Filter) aparece ANTES de la lectura completa

2. Column Pruning - Solo leer columnas necesarias

# Ejemplo de código
df.select("nombre", "edad").show()
# Catalyst automáticamente:
# - Solo lee las columnas "nombre" y "edad" del archivo
# - Ignora todas las demás columnas
# - Ahorra memoria y tiempo de I/O

3. Constant Folding - Precalcular constantes

# Tu código
df.filter(df.precio > 100 * 2)
# Catalyst automáticamente:
# - Calcula 100 * 2 = 200 UNA VEZ antes de ejecutar
# - Convierte a: df.filter(df.precio > 200)
# - No recalcula en cada fila
# Crear un DataFrame de ejemplo
df = spark.read.csv("ventas.csv", header=True, inferSchema=True)
# Query compleja
resultado = df.filter(df.cantidad > 10) \
.select("producto", "precio") \
.groupBy("producto") \
.agg({"precio": "avg"}) \
.filter(col("avg(precio)") > 100)
# Ver el plan de ejecución COMPLETO
resultado.explain(True)
# Salida (simplificada):
# == Parsed Logical Plan == (Tu query original)
# == Analyzed Logical Plan == (Con tipos de datos resueltos)
# == Optimized Logical Plan == (Después de optimizaciones de Catalyst)
# == Physical Plan == (Plan de ejecución real)
# Ver solo el plan físico (más conciso)
resultado.explain()

En la vida real, usaremos RDDs para limpiar datos muy sucios y luego los convertiremos inmediatamente a DataFrames para aprovechar la optimización automática.

# Convertimos nuestro RDD de tuplas a un DataFrame con nombre de columnas
df_ventas = rdd_ventas.toDF(["Vehiculo", "Precio"])
df_ventas.show()
df_ventas.printSchema()
# Opcion 1: Inferir esquema automaticamente
datos = [("Angel", 9), ("Maria", 3), ("Ramon", 7)]
df = spark.createDataFrame(datos, ["nombre", "nota"])
df.show()
df = spark.read.csv("ventas.csv",
header=True,
inferSchema=True,
sep=";")
df.show()
rdd_pares = sc.parallelize([("Angel", 9), ("Maria", 3)])
df = rdd_pares.toDF(["nombre", "nota"])
df.show()
df.show() # Primeras 20 filas
df.show(5) # Primeras 5 filas
df.printSchema() # Estructura (esquema)
df.head(3) # Array de primeras 3 filas
df.select("nombre").show()
df.select(["nombre", "nota"]).show()
df.select(df["nombre"], df["nota"] * 2).show()
# ❌ INCORRECTO - Esto dará error
df.filter(df.nota > 5 and df.nota < 9).show() # SyntaxError!
# ✅ CORRECTO - Usa & con paréntesis
df.filter((df.nota > 5) & (df.nota < 9)).show()
# Ejemplos prácticos
# Filtro simple
df.where(df.nota > 7).show()
# Salida esperada: Solo filas donde nota > 7
# Filtro con AND - ambas condiciones deben cumplirse
df.filter((df.Country == "Germany") & (df.Units > 20)).show()
# Salida: Solo productos de Alemania CON más de 20 unidades
# Filtro con OR - al menos una condición debe cumplirse
df.filter((df.ProductID == 2314) | (df.ProductID == 1322)).show()
# Salida: Productos con ID 2314 O 1322
# Filtro con NOT - negar una condición
df.filter(~(df.Country == "Germany")).show()
# Salida: Todos los países EXCEPTO Alemania
# Filtros combinados complejos
df.filter(
((df.Country == "Germany") | (df.Country == "France")) &
(df.Revenue > 1000)
).show()
# Salida: Ventas de Alemania o Francia con ingresos > 1000
df.select("Country").distinct().show()
df.dropDuplicates(["Country"]).show()
df.sort("nota").show() # Ascendente
df.sort(df.nota.desc()).show() # Descendente
df.orderBy("nota", ascending=False).show() # Descendente
from pyspark.sql.functions import avg, count, sum, max, min
# Media de notas
df.groupBy().avg("nota").show()
# Nota media por cada pais
df.groupBy("Country").avg("Revenue").show()
# Multiples agregaciones
df.groupBy("Country").agg(
count("ProductID").alias("total_ventas"),
avg("Revenue").alias("revenue_promedio"),
sum("Units").alias("unidades_totales")
).show()

2.5. Window Functions - Análisis Avanzado

Section titled “2.5. Window Functions - Análisis Avanzado”

Las Window Functions (funciones de ventana) son una de las características más potentes de DataFrames. Permiten realizar cálculos sobre un conjunto de filas relacionadas sin colapsar el resultado como hace groupBy.

graph TB
subgraph "DataFrame Original"
D1["producto | fecha | ventas<br/>A | 2024-01 | 100<br/>A | 2024-02 | 150<br/>A | 2024-03 | 120<br/>B | 2024-01 | 200<br/>B | 2024-02 | 180"]
end
subgraph "Window: partitionBy('producto')"
P1["PARTICIÓN A<br/>A | 2024-01 | 100<br/>A | 2024-02 | 150<br/>A | 2024-03 | 120"]
P2["PARTICIÓN B<br/>B | 2024-01 | 200<br/>B | 2024-02 | 180"]
end
subgraph "Resultado con rank()"
R["producto | fecha | ventas | rank<br/>A | 2024-02 | 150 | 1<br/>A | 2024-03 | 120 | 2<br/>A | 2024-01 | 100 | 3<br/>B | 2024-01 | 200 | 1<br/>B | 2024-02 | 180 | 2"]
end
D1 --> P1
D1 --> P2
P1 --> R
P2 --> R
style D1 fill:#e3f2fd,stroke:#1976d2
style P1 fill:#fff9c4,stroke:#f57c00
style P2 fill:#fff9c4,stroke:#f57c00
style R fill:#c8e6c9,stroke:#388e3c,stroke-width:2px

1. Ranking - Clasificar productos por ventas

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, row_number, col
# Datos de ejemplo
ventas = spark.createDataFrame([
("Electrónica", "Laptop", 1200),
("Electrónica", "Mouse", 25),
("Electrónica", "Teclado", 75),
("Hogar", "Sofá", 800),
("Hogar", "Mesa", 300)
], ["categoria", "producto", "precio"])
# Definir ventana: particionar por categoría, ordenar por precio descendente
window_spec = Window.partitionBy("categoria").orderBy(col("precio").desc())
# Aplicar funciones de ranking
resultado = ventas.withColumn("rank", rank().over(window_spec)) \
.withColumn("dense_rank", dense_rank().over(window_spec)) \
.withColumn("row_number", row_number().over(window_spec))
resultado.show()
# Resultado:
# categoria | producto | precio | rank | dense_rank | row_number
# Electrónica | Laptop | 1200 | 1 | 1 | 1
# Electrónica | Teclado | 75 | 2 | 2 | 2
# Electrónica | Mouse | 25 | 3 | 3 | 3
# Hogar | Sofá | 800 | 1 | 1 | 1
# Hogar | Mesa | 300 | 2 | 2 | 2

2. Lag y Lead - Comparar con filas anteriores/siguientes

from pyspark.sql.functions import lag, lead
# Datos de ventas mensuales
ventas_mes = spark.createDataFrame([
("2024-01", 1000),
("2024-02", 1200),
("2024-03", 1100),
("2024-04", 1300)
], ["mes", "ventas"])
# Ventana ordenada por mes
window_spec = Window.orderBy("mes")
# Comparar con mes anterior y siguiente
resultado = ventas_mes.withColumn("ventas_mes_anterior", lag("ventas", 1).over(window_spec)) \
.withColumn("ventas_mes_siguiente", lead("ventas", 1).over(window_spec)) \
.withColumn("diferencia_vs_anterior",
col("ventas") - lag("ventas", 1).over(window_spec))
resultado.show()
# Resultado:
# mes | ventas | ventas_mes_anterior | ventas_mes_siguiente | diferencia_vs_anterior
# 2024-01 | 1000 | null | 1200 | null
# 2024-02 | 1200 | 1000 | 1100 | 200
# 2024-03 | 1100 | 1200 | 1300 | -100
# 2024-04 | 1300 | 1100 | null | 200

3. Moving Average - Media móvil

from pyspark.sql.functions import avg
# Datos de ventas diarias
ventas_diarias = spark.createDataFrame([
("2024-01-01", 100),
("2024-01-02", 120),
("2024-01-03", 90),
("2024-01-04", 110),
("2024-01-05", 130)
], ["fecha", "ventas"])
# Ventana: últimas 3 filas (incluyendo la actual)
window_spec = Window.orderBy("fecha").rowsBetween(-2, 0)
# Calcular media móvil de 3 días
resultado = ventas_diarias.withColumn("media_movil_3dias",
avg("ventas").over(window_spec))
resultado.show()
# Resultado:
# fecha | ventas | media_movil_3dias
# 2024-01-01 | 100 | 100.0 (solo 1 día)
# 2024-01-02 | 120 | 110.0 (promedio de 100, 120)
# 2024-01-03 | 90 | 103.3 (promedio de 100, 120, 90)
# 2024-01-04 | 110 | 106.7 (promedio de 120, 90, 110)
# 2024-01-05 | 130 | 110.0 (promedio de 90, 110, 130)

4. Cumulative Sum - Suma acumulada

from pyspark.sql.functions import sum as _sum
# Ventana: desde el inicio hasta la fila actual
window_spec = Window.orderBy("fecha").rowsBetween(Window.unboundedPreceding, 0)
resultado = ventas_diarias.withColumn("ventas_acumuladas",
_sum("ventas").over(window_spec))
resultado.show()
# Resultado:
# fecha | ventas | ventas_acumuladas
# 2024-01-01 | 100 | 100
# 2024-01-02 | 120 | 220
# 2024-01-03 | 90 | 310
# 2024-01-04 | 110 | 420
# 2024-01-05 | 130 | 550

Los joins son una de las operaciones más potentes en DataFrames. Permiten combinar datos de múltiples tablas basándose en una clave común.

graph TB
subgraph "Datos de Ejemplo"
DF1["DataFrame 1: Empleados<br/>id | nombre<br/>1 | Ana<br/>2 | Juan<br/>3 | María"]
DF2["DataFrame 2: Salarios<br/>id | salario<br/>2 | 3000<br/>3 | 2500<br/>4 | 2800"]
end
subgraph "INNER JOIN"
I["Solo registros que coinciden<br/>en AMBOS DataFrames<br/><br/>Resultado:<br/>2 | Juan | 3000<br/>3 | María | 2500"]
end
subgraph "LEFT JOIN"
L["TODOS los registros del DF izquierdo<br/>+ coincidencias del derecho<br/><br/>Resultado:<br/>1 | Ana | null<br/>2 | Juan | 3000<br/>3 | María | 2500"]
end
subgraph "RIGHT JOIN"
R["TODOS los registros del DF derecho<br/>+ coincidencias del izquierdo<br/><br/>Resultado:<br/>2 | Juan | 3000<br/>3 | María | 2500<br/>4 | null | 2800"]
end
subgraph "OUTER JOIN"
O["TODOS los registros de AMBOS<br/><br/>Resultado:<br/>1 | Ana | null<br/>2 | Juan | 3000<br/>3 | María | 2500<br/>4 | null | 2800"]
end
style I fill:#c8e6c9,stroke:#388e3c
style L fill:#fff9c4,stroke:#f57c00
style R fill:#fff9c4,stroke:#f57c00
style O fill:#e1bee7,stroke:#8e24aa
# Datos de ejemplo
empleados = spark.createDataFrame([
(1, "Ana"),
(2, "Juan"),
(3, "María")
], ["id", "nombre"])
salarios = spark.createDataFrame([
(2, 3000),
(3, 2500),
(4, 2800)
], ["id", "salario"])
# INNER JOIN - Solo coincidencias (por defecto)
empleados.join(salarios, "id").show()
# Resultado: Juan y María (tienen salario)
# LEFT JOIN - Todos los empleados, con o sin salario
empleados.join(salarios, "id", "left").show()
# Resultado: Ana (null), Juan (3000), María (2500)
# RIGHT JOIN - Todos los salarios, con o sin empleado
empleados.join(salarios, "id", "right").show()
# Resultado: Juan (3000), María (2500), null (2800)
# OUTER JOIN - Todos los registros de ambos
empleados.join(salarios, "id", "outer").show()
# Resultado: Ana (null), Juan (3000), María (2500), null (2800)
# Join con múltiples columnas
df1.join(df2, ["columna1", "columna2"], "inner").show()
# Join con condiciones complejas
df1.join(df2, (df1.id == df2.emp_id) & (df1.dept == df2.dept)).show()
:::tip
**Optimización de Joins**: Para DataFrames pequeños (<10MB), usa `broadcast()` para evitar shuffle. Para grandes volúmenes, considera particionar por la clave de join.
:::
```python
from pyspark.sql.functions import broadcast
# Broadcast join para DataFrames pequeños (MUY rápido)
df_grande.join(broadcast(df_pequeño), "id").show()

Usar DataFrames cuando:

  • Tienes datos estructurados (filas, columnas)
  • Necesitas rendimiento optimo
  • Quieres escribir codigo SQL
  • Trabajas con JSON, CSV, Parquet

Usar RDDs cuando:

  • Tienes datos no estructurados
  • Necesitas control total a bajo nivel
  • Trabajas con objetos complejos o heterogeneos
  • Haces transformaciones muy personalizadas
# Paso 1: Crear RDD
rdd = sc.parallelize([("Angel", 9), ("Maria", 3)])
# Paso 2: Convertir a DataFrame
df = rdd.toDF(["nombre", "nota"])
# Paso 3: Operar con API DataFrame (mas simple)
df.filter(df.nota > 5).show()
# Paso 4: Volver a RDD si es necesario
rdd_filtrado = df.rdd
print(rdd_filtrado.collect())

1. Usar collect() en DataFrames grandes

# ❌ PELIGROSO - DataFrame de 10GB
df_gigante.collect() # OutOfMemoryError!
# ✅ MEJOR - Usa show() para ver una muestra
df_gigante.show(20)
# ✅ O toma solo lo necesario
df_gigante.limit(100).collect()
# ✅ O guarda a disco
df_gigante.write.parquet("resultado/")

2. Error con operadores lógicos

# ❌ INCORRECTO
df.filter(df.age > 18 and df.city == "Madrid") # SyntaxError
# ✅ CORRECTO
df.filter((df.age > 18) & (df.city == "Madrid"))

3. Problemas con tipos de datos

# Ver el esquema para detectar tipos incorrectos
df.printSchema()
# Convertir tipos explícitamente
from pyspark.sql.functions import col
df = df.withColumn("precio", col("precio").cast("double"))

4. Shuffle excesivo en Joins

# ❌ LENTO - Join sin optimizar
df_grande.join(df_grande2, "id") # Shuffle masivo
# ✅ RÁPIDO - Broadcast join si uno es pequeño
from pyspark.sql.functions import broadcast
df_grande.join(broadcast(df_pequeño), "id")
  1. Accede a Spark UI: http://localhost:4040 (mientras tu aplicación corre)
  2. Revisa:
    • Jobs: Cuánto tarda cada job
    • Stages: Identifica stages lentos
    • Storage: Verifica qué DataFrames están cacheados
    • SQL: Ve el plan de ejecución optimizado
# Ver el plan de ejecución físico
df.explain()
# Ver el plan de ejecución completo (lógico + físico)
df.explain(True)

  • SparkSession: Punto de entrada moderno (v2.0+). Usar siempre.
  • SparkContext: Legacy. Solo si trabajas con codigo antiguo.
  • Estructura inmutable y tolerante a fallos.
  • Evaluacion perezosa: Se ejecuta solo con acciones.
  • Excelente para datos no estructurados y control fino.
  • reduceByKey(): Agrupa y reduce valores por clave.
  • groupByKey(): Agrupa todos los valores por clave (cuidado: memoria).
  • sortByKey(): Ordena por clave.
  • API de alto nivel con optimizacion automatica.
  • Estructura de tabla con esquema definido.
  • Mejor rendimiento que RDD.
Transformacion -> Transformacion -> Accion
(No se ejecuta) (No se ejecuta) (Se ejecuta todo!)

Ejercicio 1: Creación y Exploración de DataFrames

Section titled “Ejercicio 1: Creación y Exploración de DataFrames”

Crea un DataFrame a partir de la siguiente lista de empleados y explora su estructura.

Datos de entrada:

empleados = [
("Ana", 28, "Ventas", 35000),
("Juan", 35, "IT", 45000),
("María", 42, "IT", 55000),
("Pedro", 31, "Ventas", 38000),
("Laura", 29, "Marketing", 40000)
]

Tareas:

  1. Crea un DataFrame con las columnas: nombre, edad, departamento, salario
  2. Muestra el esquema del DataFrame
  3. Muestra las primeras 3 filas
  4. Cuenta cuántos empleados hay en total

Resultado esperado:

  • DataFrame con 5 filas y 4 columnas
  • Esquema mostrando los tipos de datos
  • Total: 5 empleados

Ejercicio 2: Filtrado con Operadores Lógicos

Section titled “Ejercicio 2: Filtrado con Operadores Lógicos”

Usando el DataFrame del Ejercicio 1, aplica filtros combinados.

Tareas:

  1. Filtra empleados del departamento IT con salario mayor a 40000
  2. Filtra empleados de Ventas O Marketing
  3. Filtra empleados que NO sean del departamento IT
  4. Filtra empleados con edad entre 30 y 40 años (inclusive)

Resultado esperado:

  • Primera consulta: 2 empleados (Juan y María)
  • Segunda consulta: 3 empleados (Ana, Pedro, Laura)
  • Tercera consulta: 3 empleados (Ana, Pedro, Laura)
  • Cuarta consulta: 2 empleados (Juan y Pedro)

Usando el DataFrame del Ejercicio 1, realiza agregaciones.

Tareas:

  1. Calcula el salario promedio por departamento
  2. Encuentra el salario máximo y mínimo por departamento
  3. Cuenta cuántos empleados hay en cada departamento
  4. Calcula el salario total (suma) por departamento y ordena de mayor a menor

Resultado esperado:

  • IT: salario promedio = 50000
  • Ventas: 2 empleados
  • IT tiene el salario total más alto (100000)

Crea un DataFrame de ventas mensuales y aplica funciones de ventana.

Datos de entrada:

ventas = [
("Electrónica", "Enero", 15000),
("Electrónica", "Febrero", 18000),
("Electrónica", "Marzo", 12000),
("Hogar", "Enero", 8000),
("Hogar", "Febrero", 9500),
("Hogar", "Marzo", 11000)
]

Tareas:

  1. Crea un DataFrame con columnas: categoria, mes, ventas
  2. Añade una columna ranking que muestre el ranking de ventas dentro de cada categoría (mayor venta = rank 1)
  3. Añade una columna ventas_mes_anterior usando la función lag()
  4. Calcula la diferencia de ventas respecto al mes anterior

Resultado esperado:

  • Febrero de Electrónica debe tener rank = 1 (18000 es el máximo)
  • Enero no debe tener valor en ventas_mes_anterior (es el primero)
  • Diferencia de Febrero vs Enero en Electrónica = 3000

Crea dos DataFrames y combínalos usando diferentes tipos de joins.

Datos de entrada:

# DataFrame de productos
productos = [
(1, "Laptop", "Electrónica"),
(2, "Mouse", "Accesorios"),
(3, "Teclado", "Accesorios")
]
# DataFrame de ventas
ventas = [
(1, 5, 6000),
(2, 10, 250),
(4, 3, 150) # Producto ID 4 no existe en productos
]

Tareas:

  1. Crea dos DataFrames: df_productos (id, nombre, categoria) y df_ventas (producto_id, cantidad, total)
  2. Realiza un INNER JOIN (solo productos que tienen ventas)
  3. Realiza un LEFT JOIN (todos los productos, con o sin ventas)
  4. Realiza un RIGHT JOIN (todas las ventas, con o sin producto)

Resultado esperado:

  • INNER JOIN: 2 filas (Laptop y Mouse)
  • LEFT JOIN: 3 filas (Teclado tendrá ventas = null)
  • RIGHT JOIN: 3 filas (producto_id 4 tendrá nombre = null)

Ejercicio 6: Debugging - Encuentra el Error

Section titled “Ejercicio 6: Debugging - Encuentra el Error”

El siguiente código tiene errores. Identifícalos y corrígelos.

Código con errores:

df = spark.createDataFrame([
("Ana", 25, 3000),
("Juan", 30, 4000)
], ["nombre", "edad", "salario"])
# Error 1: Operador lógico incorrecto
resultado1 = df.filter(df.edad > 20 and df.salario > 3500)
# Error 2: Uso de collect() en DataFrame grande
resultado2 = df.select("nombre").collect()
# Error 3: Sintaxis incorrecta de agregación
resultado3 = df.groupBy("edad").avg()

Tareas:

  1. Identifica los 3 errores
  2. Explica por qué están mal
  3. Proporciona el código corregido

Resultado esperado:

  • Error 1: Debe usar & en lugar de and, con paréntesis
  • Error 2: collect() es peligroso en DataFrames grandes, usar show() o limit()
  • Error 3: avg() necesita especificar la columna

Ejercicio 7: Optimización - Broadcast Join

Section titled “Ejercicio 7: Optimización - Broadcast Join”

Tienes dos DataFrames: uno grande (1 millón de transacciones) y uno pequeño (100 productos). Optimiza el join.

Código inicial (ineficiente):

# DataFrame grande
transacciones = spark.read.csv("transacciones.csv") # 1M filas
# DataFrame pequeño
productos = spark.read.csv("productos.csv") # 100 filas
# Join sin optimizar
resultado = transacciones.join(productos, "producto_id")

Tareas:

  1. Identifica el problema de rendimiento
  2. Reescribe el código usando broadcast join
  3. Explica por qué es más eficiente

Resultado esperado:

  • Uso de broadcast() en el DataFrame pequeño
  • Explicación: Evita shuffle enviando la tabla pequeña a todos los nodos

Ejercicio 8: Análisis Completo - Window Functions Avanzadas

Section titled “Ejercicio 8: Análisis Completo - Window Functions Avanzadas”

Crea un análisis de ventas con media móvil y suma acumulada.

Datos de entrada:

ventas_diarias = [
("2024-01-01", 1000),
("2024-01-02", 1200),
("2024-01-03", 900),
("2024-01-04", 1100),
("2024-01-05", 1300),
("2024-01-06", 1150),
("2024-01-07", 1250)
]

Tareas:

  1. Crea un DataFrame con columnas: fecha, ventas
  2. Añade una columna media_movil_3dias (promedio de los últimos 3 días)
  3. Añade una columna ventas_acumuladas (suma desde el inicio)
  4. Añade una columna diferencia_vs_promedio (ventas del día - media móvil)

Resultado esperado:

  • Media móvil del día 3 = (1000 + 1200 + 900) / 3 = 1033.33
  • Ventas acumuladas del día 3 = 1000 + 1200 + 900 = 3100
  • Diferencia del día 3 = 900 - 1033.33 = -133.33

Proyecto: Análisis de Ventas con DataFrames

Section titled “Proyecto: Análisis de Ventas con DataFrames”

Realizar un análisis completo de ventas combinando múltiples DataFrames, aplicando agregaciones, window functions y joins. Este proyecto integra todos los conceptos vistos en el documento.

Para este proyecto utilizaremos los datasets pdi_sales.csv y pdi_product.csv disponibles en: https://github.com/josepgarcia/datos

Descripción de los datasets:

pdi_sales.csv - Registro de ventas

  • OrderID: ID único de la orden
  • ProductID: ID del producto vendido
  • Quantity: Cantidad vendida
  • UnitPrice: Precio unitario
  • OrderDate: Fecha de la orden
  • Country: País de la venta

pdi_product.csv - Catálogo de productos

  • ProductID: ID único del producto
  • ProductName: Nombre del producto
  • Category: Categoría del producto
  • SupplierID: ID del proveedor

Tamaño aproximado: ~10,000 registros de ventas, ~100 productos

Descarga:

# Descargar los datasets
!wget https://raw.githubusercontent.com/josepgarcia/datos/main/pdi_sales.csv
!wget https://raw.githubusercontent.com/josepgarcia/datos/main/pdi_product.csv
# Cargar en Spark
df_sales = spark.read.csv("pdi_sales.csv", header=True, inferSchema=True)
df_products = spark.read.csv("pdi_product.csv", header=True, inferSchema=True)

Objetivo: Familiarizarte con la estructura y calidad de los datos.

Instrucciones:

  1. Carga ambos DataFrames y muestra sus esquemas
  2. Muestra las primeras 5 filas de cada DataFrame
  3. Cuenta cuántas filas tiene cada DataFrame
  4. Identifica si hay valores nulos en columnas clave

Código guía:

# Cargar los datos
df_sales = spark.read.csv("pdi_sales.csv", header=True, inferSchema=True)
df_products = spark.read.csv("pdi_product.csv", header=True, inferSchema=True)
# TODO: Mostrar esquemas
# TODO: Mostrar primeras filas
# TODO: Contar filas
# TODO: Verificar valores nulos

Pista: Usa printSchema(), show(), count() y filter() con isNull()

Objetivo: Limpiar los datos y crear columnas calculadas necesarias.

Instrucciones:

  1. Elimina filas con valores nulos en columnas críticas (ProductID, Quantity, UnitPrice)
  2. Crea una nueva columna TotalAmount = Quantity × UnitPrice
  3. Convierte la columna OrderDate a tipo fecha (si no lo está)
  4. Filtra solo las ventas con Quantity > 0 y UnitPrice > 0

Código guía:

from pyspark.sql.functions import col, to_date
# TODO: Eliminar nulos
df_sales_clean = df_sales.filter(...)
# TODO: Crear columna TotalAmount
df_sales_clean = df_sales_clean.withColumn("TotalAmount", ...)
# TODO: Convertir fecha
df_sales_clean = df_sales_clean.withColumn("OrderDate", to_date(col("OrderDate"), "yyyy-MM-dd"))
# TODO: Filtrar ventas válidas

Pista: Usa filter(), withColumn() y operaciones aritméticas con columnas

Objetivo: Enriquecer los datos de ventas con información de productos.

Instrucciones:

  1. Realiza un LEFT JOIN entre df_sales_clean y df_products usando ProductID
  2. Verifica si hay ventas de productos que no existen en el catálogo
  3. Selecciona las columnas relevantes: OrderID, ProductName, Category, Quantity, TotalAmount, Country, OrderDate
  4. Guarda el resultado en df_sales_enriched

Código guía:

# TODO: Realizar LEFT JOIN
df_sales_enriched = df_sales_clean.join(...)
# TODO: Verificar productos sin información
productos_sin_info = df_sales_enriched.filter(col("ProductName").isNull())
print(f"Ventas sin producto: {productos_sin_info.count()}")
# TODO: Seleccionar columnas relevantes
df_sales_enriched = df_sales_enriched.select(...)

Pista: Para DataFrames pequeños como df_products, considera usar broadcast() para optimizar

Objetivo: Calcular métricas de ventas agregadas por país.

Instrucciones:

  1. Agrupa las ventas por Country
  2. Calcula para cada país:
    • Total de ventas (sum(TotalAmount))
    • Número de órdenes (count(OrderID))
    • Cantidad total de productos vendidos (sum(Quantity))
    • Ticket promedio (avg(TotalAmount))
  3. Ordena los resultados por total de ventas descendente
  4. Muestra los top 10 países

Código guía:

from pyspark.sql.functions import sum, count, avg
# TODO: Agrupar y agregar
ventas_por_pais = df_sales_enriched.groupBy("Country").agg(...)
# TODO: Ordenar y mostrar top 10

Pista: Usa alias() para renombrar las columnas agregadas y hacerlas más legibles

Paso 5: Análisis de Productos Más Vendidos

Section titled “Paso 5: Análisis de Productos Más Vendidos”

Objetivo: Identificar los productos con mejor desempeño.

Instrucciones:

  1. Agrupa por ProductName y Category
  2. Calcula el total de ventas y cantidad vendida por producto
  3. Usa window functions para crear un ranking de productos dentro de cada categoría
  4. Muestra los top 3 productos de cada categoría

Código guía:

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, desc
# TODO: Agregar por producto
ventas_por_producto = df_sales_enriched.groupBy(...).agg(...)
# TODO: Crear ventana por categoría
window_spec = Window.partitionBy("Category").orderBy(...)
# TODO: Añadir ranking
ventas_por_producto = ventas_por_producto.withColumn("rank", rank().over(window_spec))
# TODO: Filtrar top 3 por categoría
top_productos = ventas_por_producto.filter(col("rank") <= 3)

Pista: Ordena por total de ventas descendente para que el rank 1 sea el más vendido

Paso 6: Análisis Temporal - Tendencias de Ventas

Section titled “Paso 6: Análisis Temporal - Tendencias de Ventas”

Objetivo: Analizar la evolución de ventas en el tiempo.

Instrucciones:

  1. Extrae el mes y año de OrderDate (crea columnas Year y Month)
  2. Agrupa por año y mes, calculando el total de ventas
  3. Usa window functions para calcular:
    • Ventas del mes anterior (lag)
    • Crecimiento respecto al mes anterior (%)
    • Ventas acumuladas del año
  4. Ordena por año y mes

Código guía:

from pyspark.sql.functions import year, month, lag
from pyspark.sql.window import Window
# TODO: Extraer año y mes
df_temporal = df_sales_enriched.withColumn("Year", year(col("OrderDate"))) \
.withColumn("Month", month(col("OrderDate")))
# TODO: Agregar por mes
ventas_mensuales = df_temporal.groupBy("Year", "Month").agg(...)
# TODO: Crear ventana ordenada por tiempo
window_spec = Window.orderBy("Year", "Month")
# TODO: Calcular métricas temporales
ventas_mensuales = ventas_mensuales.withColumn("ventas_mes_anterior", lag(...).over(window_spec))
# TODO: Calcular crecimiento %
# TODO: Calcular ventas acumuladas

Pista: Para el crecimiento porcentual: ((ventas_actual - ventas_anterior) / ventas_anterior) * 100

Paso 7: Identificar Anomalías - Ventas Atípicas

Section titled “Paso 7: Identificar Anomalías - Ventas Atípicas”

Objetivo: Detectar órdenes con ventas inusualmente altas o bajas.

Instrucciones:

  1. Calcula la media y desviación estándar del TotalAmount
  2. Identifica órdenes que están más de 2 desviaciones estándar por encima de la media (ventas muy altas)
  3. Identifica órdenes que están más de 2 desviaciones estándar por debajo de la media (ventas muy bajas)
  4. Muestra información de estas órdenes anómalas

Código guía:

from pyspark.sql.functions import mean, stddev
# TODO: Calcular estadísticas
stats = df_sales_enriched.select(
mean("TotalAmount").alias("media"),
stddev("TotalAmount").alias("desviacion")
).collect()[0]
media = stats["media"]
desviacion = stats["desviacion"]
# TODO: Identificar anomalías
ventas_altas = df_sales_enriched.filter(col("TotalAmount") > media + 2 * desviacion)
ventas_bajas = df_sales_enriched.filter(col("TotalAmount") < media - 2 * desviacion)
# TODO: Mostrar resultados

Pista: Las anomalías pueden indicar errores de datos o transacciones especiales que requieren atención

Paso 8: Análisis Final y Visualización de Resultados

Section titled “Paso 8: Análisis Final y Visualización de Resultados”

Objetivo: Consolidar los hallazgos y preparar un resumen ejecutivo.

Instrucciones:

  1. Crea un DataFrame resumen con las métricas clave:
    • Total de ventas global
    • Número total de órdenes
    • Ticket promedio global
    • País con más ventas
    • Producto más vendido
    • Categoría más rentable
  2. Guarda los resultados principales en archivos Parquet para análisis futuro
  3. Responde las preguntas de reflexión

Código guía:

# TODO: Calcular métricas globales
total_ventas = df_sales_enriched.agg(sum("TotalAmount")).collect()[0][0]
total_ordenes = df_sales_enriched.count()
# ... más métricas
# TODO: Guardar resultados
ventas_por_pais.write.mode("overwrite").parquet("resultados/ventas_por_pais")
top_productos.write.mode("overwrite").parquet("resultados/top_productos")
# TODO: Crear resumen ejecutivo
print("=== RESUMEN EJECUTIVO ===")
print(f"Total de ventas: ${total_ventas:,.2f}")
# ... más información
  1. Optimización: ¿Qué optimizaciones aplicaste? ¿Usaste broadcast joins? ¿Por qué?
  2. Rendimiento: ¿En qué paso tardó más tiempo el procesamiento? ¿Por qué crees que fue así?
  3. Calidad de datos: ¿Encontraste problemas de calidad en los datos? ¿Cómo los manejaste?
  4. Window Functions: ¿En qué casos usaste window functions en lugar de groupBy? ¿Cuál fue la ventaja?
  5. Mejoras: Si tuvieras que procesar 100x más datos, ¿qué cambiarías en tu código?
  6. Insights: ¿Qué insights de negocio descubriste? ¿Qué recomendaciones harías?

La solución completa del proyecto está disponible en 03dataframes_PROYECTO_SOL.txt