DataFrames en Spark
Objetivo: Transición hacia la API moderna de Spark para obtener mejor rendimiento y productividad.
1. ¿Por qué DataFrames?
Section titled “1. ¿Por qué DataFrames?”Mientras que los RDDs son instrucciones de “cómo hacer” (bajo nivel), los DataFrames son instrucciones de “qué quiero” (alto nivel).
Comparación Visual: RDD vs DataFrame
Section titled “Comparación Visual: RDD vs DataFrame”graph TB subgraph RDD["RDD - Control Manual"] R1["📝 Tu código:<br/>map(), filter(), reduce()"] R2["🔧 Tú decides:<br/>- Cómo particionar<br/>- Cuándo hacer shuffle<br/>- Qué optimizar"] R3["⚙️ Spark ejecuta<br/>exactamente lo que escribiste"]
R1 --> R2 --> R3 end
style R1 fill:#ffcdd2,stroke:#c62828 style R2 fill:#ffcdd2,stroke:#c62828 style R3 fill:#ffcdd2,stroke:#c62828graph TB
subgraph DF["DataFrame - Optimización Automática"] D1["📝 Tu código:<br/>select(), where(), groupBy()"] D2["🤖 Catalyst Optimizer:<br/>- Analiza tu query<br/>- Reordena operaciones<br/>- Elimina pasos innecesarios"] D3["⚡ Tungsten Engine:<br/>- Genera código optimizado<br/>- Gestión eficiente de memoria"] D4["🚀 Ejecución optimizada<br/>(hasta 10x más rápido)"]
D1 --> D2 --> D3 --> D4 end
style D1 fill:#c8e6c9,stroke:#388e3c style D2 fill:#c8e6c9,stroke:#388e3c style D3 fill:#c8e6c9,stroke:#388e3c style D4 fill:#a5d6a7,stroke:#2e7d32,stroke-width:3pxTabla Comparativa Detallada
Section titled “Tabla Comparativa Detallada”| Aspecto | RDD | DataFrame |
|---|---|---|
| Nivel de Abstracción | Bajo (cómo) | Alto (qué) |
| Estructura | Sin esquema | Con esquema (columnas con tipos) |
| Optimización | Manual | Automática (Catalyst Optimizer) |
| Rendimiento | Más lento | Hasta 10x más rápido |
| SQL Nativo | ❌ No | ✅ Sí |
| Gestión de Memoria | JVM Heap | Tungsten (off-heap, más eficiente) |
| Serialización | Java/Kryo | Tungsten Binary Format (más rápido) |
| Caso de Uso | Control total, datos no estructurados | Análisis, agregaciones, joins |
| Cuándo usar | Transformaciones muy personalizadas | El 95% de los casos |
1.5. Catalyst Optimizer - El Cerebro de DataFrames
Section titled “1.5. Catalyst Optimizer - El Cerebro de DataFrames”El Catalyst Optimizer es el motor de optimización que hace que DataFrames sean tan rápidos. Tú escribes QUÉ quieres, Catalyst decide CÓMO hacerlo de la forma más eficiente.
Flujo de Optimización de Catalyst
Section titled “Flujo de Optimización de Catalyst”graph TB Q["📝 Tu Query<br/>(SQL o DataFrame API)"] LP["🔍 Logical Plan<br/>(Representación abstracta)"] OLP["⚡ Optimized Logical Plan<br/>- Predicate Pushdown<br/>- Column Pruning<br/>- Constant Folding<br/>- Join Reordering"] PP["🎯 Physical Plans<br/>(Múltiples estrategias)"] BP["✅ Best Physical Plan<br/>(Basado en costes)"] CODE["🚀 Generated Code<br/>(JVM bytecode optimizado)"]
Q --> LP LP --> OLP OLP --> PP PP --> BP BP --> CODE
style Q fill:#e3f2fd,stroke:#1976d2 style LP fill:#fff9c4,stroke:#f57c00 style OLP fill:#c8e6c9,stroke:#388e3c,stroke-width:2px style PP fill:#fff9c4,stroke:#f57c00 style BP fill:#a5d6a7,stroke:#2e7d32,stroke-width:3px style CODE fill:#a5d6a7,stroke:#2e7d32,stroke-width:3pxExplicación del flujo paso a paso:
1. 📝 Tu Query (SQL o DataFrame API)
- Escribes tu código usando la API de DataFrames o SQL
- Ejemplo:
df.filter(df.edad > 18).select("nombre") - Catalyst recibe esta query como entrada
2. 🔍 Logical Plan (Plan Lógico)
- Catalyst convierte tu query en un árbol de operaciones abstractas
- Representa QUÉ quieres hacer, no CÓMO hacerlo
- Todavía no hay decisiones sobre ejecución física
3. ⚡ Optimized Logical Plan (Plan Lógico Optimizado)
- Aquí ocurre la magia - Catalyst aplica reglas de optimización:
- Predicate Pushdown: Mueve filtros lo antes posible
- Column Pruning: Elimina columnas que no se usan
- Constant Folding: Precalcula expresiones constantes
- Join Reordering: Reordena joins para minimizar datos intermedios
- El plan sigue siendo abstracto, pero mucho más eficiente
4. 🎯 Physical Plans (Planes Físicos)
- Catalyst genera múltiples estrategias de ejecución física
- Cada plan representa una forma diferente de ejecutar la query
- Ejemplo: ¿Usar hash join o sort-merge join?
5. ✅ Best Physical Plan (Mejor Plan Físico)
- Catalyst usa un modelo de costes para elegir el mejor plan
- Considera: tamaño de datos, memoria disponible, particiones, etc.
- Selecciona la estrategia más eficiente
6. 🚀 Generated Code (Código Generado)
- Tungsten Code Generation: Genera bytecode JVM optimizado
- Elimina overhead de interpretación
- El código resultante es tan rápido como si lo hubieras escrito a mano en Java
Optimizaciones Automáticas de Catalyst
Section titled “Optimizaciones Automáticas de Catalyst”1. Predicate Pushdown - Filtrar lo antes posible
# Ejemplo de códigodf = spark.read.parquet("ventas.parquet")resultado = df.filter(df.pais == "España").select("producto", "precio")
# Lo que Catalyst hace automáticamente:# 1. Mueve el filtro ANTES de leer todos los datos# 2. Solo lee las columnas "producto", "precio", "pais" (Column Pruning)# 3. Aplica el filtro a nivel de archivo Parquet (más eficiente)
# Ver el plan de ejecuciónresultado.explain()# Verás que el filtro (Filter) aparece ANTES de la lectura completa2. Column Pruning - Solo leer columnas necesarias
# Ejemplo de códigodf.select("nombre", "edad").show()
# Catalyst automáticamente:# - Solo lee las columnas "nombre" y "edad" del archivo# - Ignora todas las demás columnas# - Ahorra memoria y tiempo de I/O3. Constant Folding - Precalcular constantes
# Tu códigodf.filter(df.precio > 100 * 2)
# Catalyst automáticamente:# - Calcula 100 * 2 = 200 UNA VEZ antes de ejecutar# - Convierte a: df.filter(df.precio > 200)# - No recalcula en cada filaUsar explain() para Ver la Optimización
Section titled “Usar explain() para Ver la Optimización”# Crear un DataFrame de ejemplodf = spark.read.csv("ventas.csv", header=True, inferSchema=True)
# Query complejaresultado = df.filter(df.cantidad > 10) \ .select("producto", "precio") \ .groupBy("producto") \ .agg({"precio": "avg"}) \ .filter(col("avg(precio)") > 100)
# Ver el plan de ejecución COMPLETOresultado.explain(True)
# Salida (simplificada):# == Parsed Logical Plan == (Tu query original)# == Analyzed Logical Plan == (Con tipos de datos resueltos)# == Optimized Logical Plan == (Después de optimizaciones de Catalyst)# == Physical Plan == (Plan de ejecución real)
# Ver solo el plan físico (más conciso)resultado.explain()2. Crear DataFrames
Section titled “2. Crear DataFrames”Desde un RDD
Section titled “Desde un RDD”En la vida real, usaremos RDDs para limpiar datos muy sucios y luego los convertiremos inmediatamente a DataFrames para aprovechar la optimización automática.
# Convertimos nuestro RDD de tuplas a un DataFrame con nombre de columnasdf_ventas = rdd_ventas.toDF(["Vehiculo", "Precio"])
df_ventas.show()df_ventas.printSchema()Desde una lista de tuplas
Section titled “Desde una lista de tuplas”# Opcion 1: Inferir esquema automaticamentedatos = [("Angel", 9), ("Maria", 3), ("Ramon", 7)]df = spark.createDataFrame(datos, ["nombre", "nota"])df.show()Desde un archivo CSV
Section titled “Desde un archivo CSV”df = spark.read.csv("ventas.csv", header=True, inferSchema=True, sep=";")df.show()Desde un RDD
Section titled “Desde un RDD”rdd_pares = sc.parallelize([("Angel", 9), ("Maria", 3)])df = rdd_pares.toDF(["nombre", "nota"])df.show()4.3 Operaciones Basicas
Section titled “4.3 Operaciones Basicas”Mostrar datos
Section titled “Mostrar datos”df.show() # Primeras 20 filasdf.show(5) # Primeras 5 filasdf.printSchema() # Estructura (esquema)df.head(3) # Array de primeras 3 filasSeleccionar columnas
Section titled “Seleccionar columnas”df.select("nombre").show()df.select(["nombre", "nota"]).show()df.select(df["nombre"], df["nota"] * 2).show()Filtrado (WHERE)
Section titled “Filtrado (WHERE)”# ❌ INCORRECTO - Esto dará errordf.filter(df.nota > 5 and df.nota < 9).show() # SyntaxError!
# ✅ CORRECTO - Usa & con paréntesisdf.filter((df.nota > 5) & (df.nota < 9)).show()
# Ejemplos prácticos# Filtro simpledf.where(df.nota > 7).show()# Salida esperada: Solo filas donde nota > 7
# Filtro con AND - ambas condiciones deben cumplirsedf.filter((df.Country == "Germany") & (df.Units > 20)).show()# Salida: Solo productos de Alemania CON más de 20 unidades
# Filtro con OR - al menos una condición debe cumplirsedf.filter((df.ProductID == 2314) | (df.ProductID == 1322)).show()# Salida: Productos con ID 2314 O 1322
# Filtro con NOT - negar una condicióndf.filter(~(df.Country == "Germany")).show()# Salida: Todos los países EXCEPTO Alemania
# Filtros combinados complejosdf.filter( ((df.Country == "Germany") | (df.Country == "France")) & (df.Revenue > 1000)).show()# Salida: Ventas de Alemania o Francia con ingresos > 1000Eliminar duplicados
Section titled “Eliminar duplicados”df.select("Country").distinct().show()df.dropDuplicates(["Country"]).show()Ordenar (SORT / ORDER BY)
Section titled “Ordenar (SORT / ORDER BY)”df.sort("nota").show() # Ascendentedf.sort(df.nota.desc()).show() # Descendentedf.orderBy("nota", ascending=False).show() # DescendenteAgregaciones (GROUP BY)
Section titled “Agregaciones (GROUP BY)”from pyspark.sql.functions import avg, count, sum, max, min
# Media de notasdf.groupBy().avg("nota").show()
# Nota media por cada paisdf.groupBy("Country").avg("Revenue").show()
# Multiples agregacionesdf.groupBy("Country").agg( count("ProductID").alias("total_ventas"), avg("Revenue").alias("revenue_promedio"), sum("Units").alias("unidades_totales")).show()2.5. Window Functions - Análisis Avanzado
Section titled “2.5. Window Functions - Análisis Avanzado”Las Window Functions (funciones de ventana) son una de las características más potentes de DataFrames. Permiten realizar cálculos sobre un conjunto de filas relacionadas sin colapsar el resultado como hace groupBy.
Concepto de Ventana (Window)
Section titled “Concepto de Ventana (Window)”graph TB subgraph "DataFrame Original" D1["producto | fecha | ventas<br/>A | 2024-01 | 100<br/>A | 2024-02 | 150<br/>A | 2024-03 | 120<br/>B | 2024-01 | 200<br/>B | 2024-02 | 180"] end
subgraph "Window: partitionBy('producto')" P1["PARTICIÓN A<br/>A | 2024-01 | 100<br/>A | 2024-02 | 150<br/>A | 2024-03 | 120"] P2["PARTICIÓN B<br/>B | 2024-01 | 200<br/>B | 2024-02 | 180"] end
subgraph "Resultado con rank()" R["producto | fecha | ventas | rank<br/>A | 2024-02 | 150 | 1<br/>A | 2024-03 | 120 | 2<br/>A | 2024-01 | 100 | 3<br/>B | 2024-01 | 200 | 1<br/>B | 2024-02 | 180 | 2"] end
D1 --> P1 D1 --> P2 P1 --> R P2 --> R
style D1 fill:#e3f2fd,stroke:#1976d2 style P1 fill:#fff9c4,stroke:#f57c00 style P2 fill:#fff9c4,stroke:#f57c00 style R fill:#c8e6c9,stroke:#388e3c,stroke-width:2pxEjemplos Prácticos de Window Functions
Section titled “Ejemplos Prácticos de Window Functions”1. Ranking - Clasificar productos por ventas
from pyspark.sql.window import Windowfrom pyspark.sql.functions import rank, dense_rank, row_number, col
# Datos de ejemploventas = spark.createDataFrame([ ("Electrónica", "Laptop", 1200), ("Electrónica", "Mouse", 25), ("Electrónica", "Teclado", 75), ("Hogar", "Sofá", 800), ("Hogar", "Mesa", 300)], ["categoria", "producto", "precio"])
# Definir ventana: particionar por categoría, ordenar por precio descendentewindow_spec = Window.partitionBy("categoria").orderBy(col("precio").desc())
# Aplicar funciones de rankingresultado = ventas.withColumn("rank", rank().over(window_spec)) \ .withColumn("dense_rank", dense_rank().over(window_spec)) \ .withColumn("row_number", row_number().over(window_spec))
resultado.show()# Resultado:# categoria | producto | precio | rank | dense_rank | row_number# Electrónica | Laptop | 1200 | 1 | 1 | 1# Electrónica | Teclado | 75 | 2 | 2 | 2# Electrónica | Mouse | 25 | 3 | 3 | 3# Hogar | Sofá | 800 | 1 | 1 | 1# Hogar | Mesa | 300 | 2 | 2 | 22. Lag y Lead - Comparar con filas anteriores/siguientes
from pyspark.sql.functions import lag, lead
# Datos de ventas mensualesventas_mes = spark.createDataFrame([ ("2024-01", 1000), ("2024-02", 1200), ("2024-03", 1100), ("2024-04", 1300)], ["mes", "ventas"])
# Ventana ordenada por meswindow_spec = Window.orderBy("mes")
# Comparar con mes anterior y siguienteresultado = ventas_mes.withColumn("ventas_mes_anterior", lag("ventas", 1).over(window_spec)) \ .withColumn("ventas_mes_siguiente", lead("ventas", 1).over(window_spec)) \ .withColumn("diferencia_vs_anterior", col("ventas") - lag("ventas", 1).over(window_spec))
resultado.show()# Resultado:# mes | ventas | ventas_mes_anterior | ventas_mes_siguiente | diferencia_vs_anterior# 2024-01 | 1000 | null | 1200 | null# 2024-02 | 1200 | 1000 | 1100 | 200# 2024-03 | 1100 | 1200 | 1300 | -100# 2024-04 | 1300 | 1100 | null | 2003. Moving Average - Media móvil
from pyspark.sql.functions import avg
# Datos de ventas diariasventas_diarias = spark.createDataFrame([ ("2024-01-01", 100), ("2024-01-02", 120), ("2024-01-03", 90), ("2024-01-04", 110), ("2024-01-05", 130)], ["fecha", "ventas"])
# Ventana: últimas 3 filas (incluyendo la actual)window_spec = Window.orderBy("fecha").rowsBetween(-2, 0)
# Calcular media móvil de 3 díasresultado = ventas_diarias.withColumn("media_movil_3dias", avg("ventas").over(window_spec))
resultado.show()# Resultado:# fecha | ventas | media_movil_3dias# 2024-01-01 | 100 | 100.0 (solo 1 día)# 2024-01-02 | 120 | 110.0 (promedio de 100, 120)# 2024-01-03 | 90 | 103.3 (promedio de 100, 120, 90)# 2024-01-04 | 110 | 106.7 (promedio de 120, 90, 110)# 2024-01-05 | 130 | 110.0 (promedio de 90, 110, 130)4. Cumulative Sum - Suma acumulada
from pyspark.sql.functions import sum as _sum
# Ventana: desde el inicio hasta la fila actualwindow_spec = Window.orderBy("fecha").rowsBetween(Window.unboundedPreceding, 0)
resultado = ventas_diarias.withColumn("ventas_acumuladas", _sum("ventas").over(window_spec))
resultado.show()# Resultado:# fecha | ventas | ventas_acumuladas# 2024-01-01 | 100 | 100# 2024-01-02 | 120 | 220# 2024-01-03 | 90 | 310# 2024-01-04 | 110 | 420# 2024-01-05 | 130 | 5503. Joins - Combinar DataFrames
Section titled “3. Joins - Combinar DataFrames”Los joins son una de las operaciones más potentes en DataFrames. Permiten combinar datos de múltiples tablas basándose en una clave común.
Tipos de Joins Visualizados
Section titled “Tipos de Joins Visualizados”graph TB subgraph "Datos de Ejemplo" DF1["DataFrame 1: Empleados<br/>id | nombre<br/>1 | Ana<br/>2 | Juan<br/>3 | María"] DF2["DataFrame 2: Salarios<br/>id | salario<br/>2 | 3000<br/>3 | 2500<br/>4 | 2800"] end
subgraph "INNER JOIN" I["Solo registros que coinciden<br/>en AMBOS DataFrames<br/><br/>Resultado:<br/>2 | Juan | 3000<br/>3 | María | 2500"] end
subgraph "LEFT JOIN" L["TODOS los registros del DF izquierdo<br/>+ coincidencias del derecho<br/><br/>Resultado:<br/>1 | Ana | null<br/>2 | Juan | 3000<br/>3 | María | 2500"] end
subgraph "RIGHT JOIN" R["TODOS los registros del DF derecho<br/>+ coincidencias del izquierdo<br/><br/>Resultado:<br/>2 | Juan | 3000<br/>3 | María | 2500<br/>4 | null | 2800"] end
subgraph "OUTER JOIN" O["TODOS los registros de AMBOS<br/><br/>Resultado:<br/>1 | Ana | null<br/>2 | Juan | 3000<br/>3 | María | 2500<br/>4 | null | 2800"] end
style I fill:#c8e6c9,stroke:#388e3c style L fill:#fff9c4,stroke:#f57c00 style R fill:#fff9c4,stroke:#f57c00 style O fill:#e1bee7,stroke:#8e24aaEjemplos Prácticos de Joins
Section titled “Ejemplos Prácticos de Joins”# Datos de ejemploempleados = spark.createDataFrame([ (1, "Ana"), (2, "Juan"), (3, "María")], ["id", "nombre"])
salarios = spark.createDataFrame([ (2, 3000), (3, 2500), (4, 2800)], ["id", "salario"])
# INNER JOIN - Solo coincidencias (por defecto)empleados.join(salarios, "id").show()# Resultado: Juan y María (tienen salario)
# LEFT JOIN - Todos los empleados, con o sin salarioempleados.join(salarios, "id", "left").show()# Resultado: Ana (null), Juan (3000), María (2500)
# RIGHT JOIN - Todos los salarios, con o sin empleadoempleados.join(salarios, "id", "right").show()# Resultado: Juan (3000), María (2500), null (2800)
# OUTER JOIN - Todos los registros de ambosempleados.join(salarios, "id", "outer").show()# Resultado: Ana (null), Juan (3000), María (2500), null (2800)
# Join con múltiples columnasdf1.join(df2, ["columna1", "columna2"], "inner").show()
# Join con condiciones complejasdf1.join(df2, (df1.id == df2.emp_id) & (df1.dept == df2.dept)).show()
:::tip**Optimización de Joins**: Para DataFrames pequeños (<10MB), usa `broadcast()` para evitar shuffle. Para grandes volúmenes, considera particionar por la clave de join.:::
```pythonfrom pyspark.sql.functions import broadcast
# Broadcast join para DataFrames pequeños (MUY rápido)df_grande.join(broadcast(df_pequeño), "id").show()4. RDD vs DataFrame: Cuando usar cada uno
Section titled “4. RDD vs DataFrame: Cuando usar cada uno”Usar DataFrames cuando:
- Tienes datos estructurados (filas, columnas)
- Necesitas rendimiento optimo
- Quieres escribir codigo SQL
- Trabajas con JSON, CSV, Parquet
Usar RDDs cuando:
- Tienes datos no estructurados
- Necesitas control total a bajo nivel
- Trabajas con objetos complejos o heterogeneos
- Haces transformaciones muy personalizadas
4.5 Transicion RDD -> DataFrame
Section titled “4.5 Transicion RDD -> DataFrame”# Paso 1: Crear RDDrdd = sc.parallelize([("Angel", 9), ("Maria", 3)])
# Paso 2: Convertir a DataFramedf = rdd.toDF(["nombre", "nota"])
# Paso 3: Operar con API DataFrame (mas simple)df.filter(df.nota > 5).show()
# Paso 4: Volver a RDD si es necesariordd_filtrado = df.rddprint(rdd_filtrado.collect())5. Errores Comunes y Debugging
Section titled “5. Errores Comunes y Debugging”Errores Típicos en DataFrames
Section titled “Errores Típicos en DataFrames”1. Usar collect() en DataFrames grandes
# ❌ PELIGROSO - DataFrame de 10GBdf_gigante.collect() # OutOfMemoryError!
# ✅ MEJOR - Usa show() para ver una muestradf_gigante.show(20)
# ✅ O toma solo lo necesariodf_gigante.limit(100).collect()
# ✅ O guarda a discodf_gigante.write.parquet("resultado/")2. Error con operadores lógicos
# ❌ INCORRECTOdf.filter(df.age > 18 and df.city == "Madrid") # SyntaxError
# ✅ CORRECTOdf.filter((df.age > 18) & (df.city == "Madrid"))3. Problemas con tipos de datos
# Ver el esquema para detectar tipos incorrectosdf.printSchema()
# Convertir tipos explícitamentefrom pyspark.sql.functions import coldf = df.withColumn("precio", col("precio").cast("double"))4. Shuffle excesivo en Joins
# ❌ LENTO - Join sin optimizardf_grande.join(df_grande2, "id") # Shuffle masivo
# ✅ RÁPIDO - Broadcast join si uno es pequeñofrom pyspark.sql.functions import broadcastdf_grande.join(broadcast(df_pequeño), "id")Debugging con Spark UI
Section titled “Debugging con Spark UI”- Accede a Spark UI:
http://localhost:4040(mientras tu aplicación corre) - Revisa:
- Jobs: Cuánto tarda cada job
- Stages: Identifica stages lentos
- Storage: Verifica qué DataFrames están cacheados
- SQL: Ve el plan de ejecución optimizado
# Ver el plan de ejecución físicodf.explain()
# Ver el plan de ejecución completo (lógico + físico)df.explain(True)Resumen de Conceptos Clave
Section titled “Resumen de Conceptos Clave”SparkSession vs SparkContext
Section titled “SparkSession vs SparkContext”- SparkSession: Punto de entrada moderno (v2.0+). Usar siempre.
- SparkContext: Legacy. Solo si trabajas con codigo antiguo.
RDD: El Corazon de Spark
Section titled “RDD: El Corazon de Spark”- Estructura inmutable y tolerante a fallos.
- Evaluacion perezosa: Se ejecuta solo con acciones.
- Excelente para datos no estructurados y control fino.
Transformaciones Clave-Valor
Section titled “Transformaciones Clave-Valor”reduceByKey(): Agrupa y reduce valores por clave.groupByKey(): Agrupa todos los valores por clave (cuidado: memoria).sortByKey(): Ordena por clave.
DataFrame: El Presente de Spark
Section titled “DataFrame: El Presente de Spark”- API de alto nivel con optimizacion automatica.
- Estructura de tabla con esquema definido.
- Mejor rendimiento que RDD.
Evaluacion Perezosa (Lazy Evaluation)
Section titled “Evaluacion Perezosa (Lazy Evaluation)”Transformacion -> Transformacion -> Accion(No se ejecuta) (No se ejecuta) (Se ejecuta todo!)Ejercicios Prácticos
Section titled “Ejercicios Prácticos”Ejercicio 1: Creación y Exploración de DataFrames
Section titled “Ejercicio 1: Creación y Exploración de DataFrames”Crea un DataFrame a partir de la siguiente lista de empleados y explora su estructura.
Datos de entrada:
empleados = [ ("Ana", 28, "Ventas", 35000), ("Juan", 35, "IT", 45000), ("María", 42, "IT", 55000), ("Pedro", 31, "Ventas", 38000), ("Laura", 29, "Marketing", 40000)]Tareas:
- Crea un DataFrame con las columnas:
nombre,edad,departamento,salario - Muestra el esquema del DataFrame
- Muestra las primeras 3 filas
- Cuenta cuántos empleados hay en total
Resultado esperado:
- DataFrame con 5 filas y 4 columnas
- Esquema mostrando los tipos de datos
- Total: 5 empleados
Ejercicio 2: Filtrado con Operadores Lógicos
Section titled “Ejercicio 2: Filtrado con Operadores Lógicos”Usando el DataFrame del Ejercicio 1, aplica filtros combinados.
Tareas:
- Filtra empleados del departamento IT con salario mayor a 40000
- Filtra empleados de Ventas O Marketing
- Filtra empleados que NO sean del departamento IT
- Filtra empleados con edad entre 30 y 40 años (inclusive)
Resultado esperado:
- Primera consulta: 2 empleados (Juan y María)
- Segunda consulta: 3 empleados (Ana, Pedro, Laura)
- Tercera consulta: 3 empleados (Ana, Pedro, Laura)
- Cuarta consulta: 2 empleados (Juan y Pedro)
Ejercicio 3: Agregaciones por Grupo
Section titled “Ejercicio 3: Agregaciones por Grupo”Usando el DataFrame del Ejercicio 1, realiza agregaciones.
Tareas:
- Calcula el salario promedio por departamento
- Encuentra el salario máximo y mínimo por departamento
- Cuenta cuántos empleados hay en cada departamento
- Calcula el salario total (suma) por departamento y ordena de mayor a menor
Resultado esperado:
- IT: salario promedio = 50000
- Ventas: 2 empleados
- IT tiene el salario total más alto (100000)
Ejercicio 4: Window Functions - Ranking
Section titled “Ejercicio 4: Window Functions - Ranking”Crea un DataFrame de ventas mensuales y aplica funciones de ventana.
Datos de entrada:
ventas = [ ("Electrónica", "Enero", 15000), ("Electrónica", "Febrero", 18000), ("Electrónica", "Marzo", 12000), ("Hogar", "Enero", 8000), ("Hogar", "Febrero", 9500), ("Hogar", "Marzo", 11000)]Tareas:
- Crea un DataFrame con columnas:
categoria,mes,ventas - Añade una columna
rankingque muestre el ranking de ventas dentro de cada categoría (mayor venta = rank 1) - Añade una columna
ventas_mes_anteriorusando la funciónlag() - Calcula la diferencia de ventas respecto al mes anterior
Resultado esperado:
- Febrero de Electrónica debe tener rank = 1 (18000 es el máximo)
- Enero no debe tener valor en
ventas_mes_anterior(es el primero) - Diferencia de Febrero vs Enero en Electrónica = 3000
Ejercicio 5: Joins - Combinar DataFrames
Section titled “Ejercicio 5: Joins - Combinar DataFrames”Crea dos DataFrames y combínalos usando diferentes tipos de joins.
Datos de entrada:
# DataFrame de productosproductos = [ (1, "Laptop", "Electrónica"), (2, "Mouse", "Accesorios"), (3, "Teclado", "Accesorios")]
# DataFrame de ventasventas = [ (1, 5, 6000), (2, 10, 250), (4, 3, 150) # Producto ID 4 no existe en productos]Tareas:
- Crea dos DataFrames:
df_productos(id, nombre, categoria) ydf_ventas(producto_id, cantidad, total) - Realiza un INNER JOIN (solo productos que tienen ventas)
- Realiza un LEFT JOIN (todos los productos, con o sin ventas)
- Realiza un RIGHT JOIN (todas las ventas, con o sin producto)
Resultado esperado:
- INNER JOIN: 2 filas (Laptop y Mouse)
- LEFT JOIN: 3 filas (Teclado tendrá ventas = null)
- RIGHT JOIN: 3 filas (producto_id 4 tendrá nombre = null)
Ejercicio 6: Debugging - Encuentra el Error
Section titled “Ejercicio 6: Debugging - Encuentra el Error”El siguiente código tiene errores. Identifícalos y corrígelos.
Código con errores:
df = spark.createDataFrame([ ("Ana", 25, 3000), ("Juan", 30, 4000)], ["nombre", "edad", "salario"])
# Error 1: Operador lógico incorrectoresultado1 = df.filter(df.edad > 20 and df.salario > 3500)
# Error 2: Uso de collect() en DataFrame granderesultado2 = df.select("nombre").collect()
# Error 3: Sintaxis incorrecta de agregaciónresultado3 = df.groupBy("edad").avg()Tareas:
- Identifica los 3 errores
- Explica por qué están mal
- Proporciona el código corregido
Resultado esperado:
- Error 1: Debe usar
&en lugar deand, con paréntesis - Error 2:
collect()es peligroso en DataFrames grandes, usarshow()olimit() - Error 3:
avg()necesita especificar la columna
Ejercicio 7: Optimización - Broadcast Join
Section titled “Ejercicio 7: Optimización - Broadcast Join”Tienes dos DataFrames: uno grande (1 millón de transacciones) y uno pequeño (100 productos). Optimiza el join.
Código inicial (ineficiente):
# DataFrame grandetransacciones = spark.read.csv("transacciones.csv") # 1M filas
# DataFrame pequeñoproductos = spark.read.csv("productos.csv") # 100 filas
# Join sin optimizarresultado = transacciones.join(productos, "producto_id")Tareas:
- Identifica el problema de rendimiento
- Reescribe el código usando broadcast join
- Explica por qué es más eficiente
Resultado esperado:
- Uso de
broadcast()en el DataFrame pequeño - Explicación: Evita shuffle enviando la tabla pequeña a todos los nodos
Ejercicio 8: Análisis Completo - Window Functions Avanzadas
Section titled “Ejercicio 8: Análisis Completo - Window Functions Avanzadas”Crea un análisis de ventas con media móvil y suma acumulada.
Datos de entrada:
ventas_diarias = [ ("2024-01-01", 1000), ("2024-01-02", 1200), ("2024-01-03", 900), ("2024-01-04", 1100), ("2024-01-05", 1300), ("2024-01-06", 1150), ("2024-01-07", 1250)]Tareas:
- Crea un DataFrame con columnas:
fecha,ventas - Añade una columna
media_movil_3dias(promedio de los últimos 3 días) - Añade una columna
ventas_acumuladas(suma desde el inicio) - Añade una columna
diferencia_vs_promedio(ventas del día - media móvil)
Resultado esperado:
- Media móvil del día 3 = (1000 + 1200 + 900) / 3 = 1033.33
- Ventas acumuladas del día 3 = 1000 + 1200 + 900 = 3100
- Diferencia del día 3 = 900 - 1033.33 = -133.33
Proyecto: Análisis de Ventas con DataFrames
Section titled “Proyecto: Análisis de Ventas con DataFrames”Objetivo
Section titled “Objetivo”Realizar un análisis completo de ventas combinando múltiples DataFrames, aplicando agregaciones, window functions y joins. Este proyecto integra todos los conceptos vistos en el documento.
Dataset
Section titled “Dataset”Para este proyecto utilizaremos los datasets pdi_sales.csv y pdi_product.csv disponibles en:
https://github.com/josepgarcia/datos
Descripción de los datasets:
pdi_sales.csv - Registro de ventas
OrderID: ID único de la ordenProductID: ID del producto vendidoQuantity: Cantidad vendidaUnitPrice: Precio unitarioOrderDate: Fecha de la ordenCountry: País de la venta
pdi_product.csv - Catálogo de productos
ProductID: ID único del productoProductName: Nombre del productoCategory: Categoría del productoSupplierID: ID del proveedor
Tamaño aproximado: ~10,000 registros de ventas, ~100 productos
Descarga:
# Descargar los datasets!wget https://raw.githubusercontent.com/josepgarcia/datos/main/pdi_sales.csv!wget https://raw.githubusercontent.com/josepgarcia/datos/main/pdi_product.csv
# Cargar en Sparkdf_sales = spark.read.csv("pdi_sales.csv", header=True, inferSchema=True)df_products = spark.read.csv("pdi_product.csv", header=True, inferSchema=True)Paso 1: Exploración Inicial de los Datos
Section titled “Paso 1: Exploración Inicial de los Datos”Objetivo: Familiarizarte con la estructura y calidad de los datos.
Instrucciones:
- Carga ambos DataFrames y muestra sus esquemas
- Muestra las primeras 5 filas de cada DataFrame
- Cuenta cuántas filas tiene cada DataFrame
- Identifica si hay valores nulos en columnas clave
Código guía:
# Cargar los datosdf_sales = spark.read.csv("pdi_sales.csv", header=True, inferSchema=True)df_products = spark.read.csv("pdi_product.csv", header=True, inferSchema=True)
# TODO: Mostrar esquemas# TODO: Mostrar primeras filas# TODO: Contar filas# TODO: Verificar valores nulosPista: Usa printSchema(), show(), count() y filter() con isNull()
Paso 2: Limpieza y Preparación de Datos
Section titled “Paso 2: Limpieza y Preparación de Datos”Objetivo: Limpiar los datos y crear columnas calculadas necesarias.
Instrucciones:
- Elimina filas con valores nulos en columnas críticas (
ProductID,Quantity,UnitPrice) - Crea una nueva columna
TotalAmount=Quantity×UnitPrice - Convierte la columna
OrderDatea tipo fecha (si no lo está) - Filtra solo las ventas con
Quantity> 0 yUnitPrice> 0
Código guía:
from pyspark.sql.functions import col, to_date
# TODO: Eliminar nulosdf_sales_clean = df_sales.filter(...)
# TODO: Crear columna TotalAmountdf_sales_clean = df_sales_clean.withColumn("TotalAmount", ...)
# TODO: Convertir fechadf_sales_clean = df_sales_clean.withColumn("OrderDate", to_date(col("OrderDate"), "yyyy-MM-dd"))
# TODO: Filtrar ventas válidasPista: Usa filter(), withColumn() y operaciones aritméticas con columnas
Paso 3: Join de Ventas con Productos
Section titled “Paso 3: Join de Ventas con Productos”Objetivo: Enriquecer los datos de ventas con información de productos.
Instrucciones:
- Realiza un LEFT JOIN entre
df_sales_cleanydf_productsusandoProductID - Verifica si hay ventas de productos que no existen en el catálogo
- Selecciona las columnas relevantes:
OrderID,ProductName,Category,Quantity,TotalAmount,Country,OrderDate - Guarda el resultado en
df_sales_enriched
Código guía:
# TODO: Realizar LEFT JOINdf_sales_enriched = df_sales_clean.join(...)
# TODO: Verificar productos sin informaciónproductos_sin_info = df_sales_enriched.filter(col("ProductName").isNull())print(f"Ventas sin producto: {productos_sin_info.count()}")
# TODO: Seleccionar columnas relevantesdf_sales_enriched = df_sales_enriched.select(...)Pista: Para DataFrames pequeños como df_products, considera usar broadcast() para optimizar
Paso 4: Análisis de Ventas por País
Section titled “Paso 4: Análisis de Ventas por País”Objetivo: Calcular métricas de ventas agregadas por país.
Instrucciones:
- Agrupa las ventas por
Country - Calcula para cada país:
- Total de ventas (
sum(TotalAmount)) - Número de órdenes (
count(OrderID)) - Cantidad total de productos vendidos (
sum(Quantity)) - Ticket promedio (
avg(TotalAmount))
- Total de ventas (
- Ordena los resultados por total de ventas descendente
- Muestra los top 10 países
Código guía:
from pyspark.sql.functions import sum, count, avg
# TODO: Agrupar y agregarventas_por_pais = df_sales_enriched.groupBy("Country").agg(...)
# TODO: Ordenar y mostrar top 10Pista: Usa alias() para renombrar las columnas agregadas y hacerlas más legibles
Paso 5: Análisis de Productos Más Vendidos
Section titled “Paso 5: Análisis de Productos Más Vendidos”Objetivo: Identificar los productos con mejor desempeño.
Instrucciones:
- Agrupa por
ProductNameyCategory - Calcula el total de ventas y cantidad vendida por producto
- Usa window functions para crear un ranking de productos dentro de cada categoría
- Muestra los top 3 productos de cada categoría
Código guía:
from pyspark.sql.window import Windowfrom pyspark.sql.functions import rank, desc
# TODO: Agregar por productoventas_por_producto = df_sales_enriched.groupBy(...).agg(...)
# TODO: Crear ventana por categoríawindow_spec = Window.partitionBy("Category").orderBy(...)
# TODO: Añadir rankingventas_por_producto = ventas_por_producto.withColumn("rank", rank().over(window_spec))
# TODO: Filtrar top 3 por categoríatop_productos = ventas_por_producto.filter(col("rank") <= 3)Pista: Ordena por total de ventas descendente para que el rank 1 sea el más vendido
Paso 6: Análisis Temporal - Tendencias de Ventas
Section titled “Paso 6: Análisis Temporal - Tendencias de Ventas”Objetivo: Analizar la evolución de ventas en el tiempo.
Instrucciones:
- Extrae el mes y año de
OrderDate(crea columnasYearyMonth) - Agrupa por año y mes, calculando el total de ventas
- Usa window functions para calcular:
- Ventas del mes anterior (
lag) - Crecimiento respecto al mes anterior (%)
- Ventas acumuladas del año
- Ventas del mes anterior (
- Ordena por año y mes
Código guía:
from pyspark.sql.functions import year, month, lagfrom pyspark.sql.window import Window
# TODO: Extraer año y mesdf_temporal = df_sales_enriched.withColumn("Year", year(col("OrderDate"))) \ .withColumn("Month", month(col("OrderDate")))
# TODO: Agregar por mesventas_mensuales = df_temporal.groupBy("Year", "Month").agg(...)
# TODO: Crear ventana ordenada por tiempowindow_spec = Window.orderBy("Year", "Month")
# TODO: Calcular métricas temporalesventas_mensuales = ventas_mensuales.withColumn("ventas_mes_anterior", lag(...).over(window_spec))# TODO: Calcular crecimiento %# TODO: Calcular ventas acumuladasPista: Para el crecimiento porcentual: ((ventas_actual - ventas_anterior) / ventas_anterior) * 100
Paso 7: Identificar Anomalías - Ventas Atípicas
Section titled “Paso 7: Identificar Anomalías - Ventas Atípicas”Objetivo: Detectar órdenes con ventas inusualmente altas o bajas.
Instrucciones:
- Calcula la media y desviación estándar del
TotalAmount - Identifica órdenes que están más de 2 desviaciones estándar por encima de la media (ventas muy altas)
- Identifica órdenes que están más de 2 desviaciones estándar por debajo de la media (ventas muy bajas)
- Muestra información de estas órdenes anómalas
Código guía:
from pyspark.sql.functions import mean, stddev
# TODO: Calcular estadísticasstats = df_sales_enriched.select( mean("TotalAmount").alias("media"), stddev("TotalAmount").alias("desviacion")).collect()[0]
media = stats["media"]desviacion = stats["desviacion"]
# TODO: Identificar anomalíasventas_altas = df_sales_enriched.filter(col("TotalAmount") > media + 2 * desviacion)ventas_bajas = df_sales_enriched.filter(col("TotalAmount") < media - 2 * desviacion)
# TODO: Mostrar resultadosPista: Las anomalías pueden indicar errores de datos o transacciones especiales que requieren atención
Paso 8: Análisis Final y Visualización de Resultados
Section titled “Paso 8: Análisis Final y Visualización de Resultados”Objetivo: Consolidar los hallazgos y preparar un resumen ejecutivo.
Instrucciones:
- Crea un DataFrame resumen con las métricas clave:
- Total de ventas global
- Número total de órdenes
- Ticket promedio global
- País con más ventas
- Producto más vendido
- Categoría más rentable
- Guarda los resultados principales en archivos Parquet para análisis futuro
- Responde las preguntas de reflexión
Código guía:
# TODO: Calcular métricas globalestotal_ventas = df_sales_enriched.agg(sum("TotalAmount")).collect()[0][0]total_ordenes = df_sales_enriched.count()# ... más métricas
# TODO: Guardar resultadosventas_por_pais.write.mode("overwrite").parquet("resultados/ventas_por_pais")top_productos.write.mode("overwrite").parquet("resultados/top_productos")
# TODO: Crear resumen ejecutivoprint("=== RESUMEN EJECUTIVO ===")print(f"Total de ventas: ${total_ventas:,.2f}")# ... más informaciónPreguntas de Reflexión
Section titled “Preguntas de Reflexión”- Optimización: ¿Qué optimizaciones aplicaste? ¿Usaste broadcast joins? ¿Por qué?
- Rendimiento: ¿En qué paso tardó más tiempo el procesamiento? ¿Por qué crees que fue así?
- Calidad de datos: ¿Encontraste problemas de calidad en los datos? ¿Cómo los manejaste?
- Window Functions: ¿En qué casos usaste window functions en lugar de groupBy? ¿Cuál fue la ventaja?
- Mejoras: Si tuvieras que procesar 100x más datos, ¿qué cambiarías en tu código?
- Insights: ¿Qué insights de negocio descubriste? ¿Qué recomendaciones harías?
Solución Completa
Section titled “Solución Completa”La solución completa del proyecto está disponible en 03dataframes_PROYECTO_SOL.txt