DataFrames en Spark
DataFrames en Apache Spark
Section titled “DataFrames en Apache Spark”En este módulo vamos a sumergirnos en el mundo de los DataFrames. Si vienes de usar RDDs, verás que esto es como cambiar de ir en bicicleta a conducir un deportivo.
Los DataFrames son la estructura principal para trabajar con datos estructurados en Spark SQL. Imagínatelos como una tabla en una base de datos relacional o un DataFrame en pandas, pero con superpoderes de optimización bajo el capó.
1. RDD vs DataFrame: ¿Por qué cambiar?
Section titled “1. RDD vs DataFrame: ¿Por qué cambiar?”Los RDD (Resilient Distributed Datasets) son el bloque fundamental de Spark, pero trabajar con ellos puede ser tedioso y menos eficiente para ciertas tareas. Los DataFrames añaden una capa de abstracción que permite a Spark entender qué quieres hacer con los datos, no solo cómo.
Concepto Clave
Section titled “Concepto Clave”graph TD subgraph RDD[RDD: Caja Negra] A[Datos opacos] --- B[Datos opacos] style RDD fill:#f8d7da,stroke:#c62828 end
subgraph DF[DataFrame: Estructurado] C[Col: Nombre] --- D[Col: Edad] D --- E[Col: Ciudad] style DF fill:#c8e6c9,stroke:#388e3c end
RDD -->|Optimización manual| Result1[Lento en Python] DF -->|Catalyst Optimizer| Result2[Rápido y Optimizado]Ventajas de los DataFrames
Section titled “Ventajas de los DataFrames”- Optimización Automática (Catalyst): Spark optimiza tus consultas por ti.
- Gestión de Memoria (Tungsten): Uso mucho más eficiente de la memoria que los objetos Java/Python.
- Facilidad de uso: Sintaxis muy parecida a SQL o Pandas.
2. Creación de DataFrames
Section titled “2. Creación de DataFrames”Hay varias formas de crear un DataFrame. Vamos a ver las más comunes.
Desde una lista (para pruebas)
Section titled “Desde una lista (para pruebas)”from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrame-Demo").getOrCreate()
data = [("Aitor", 33), ("Maria", 28), ("Juan", 45)]columns = ["Nombre", "Edad"]
# ✅ OK: Crear DataFrame directamentedf = spark.createDataFrame(data, columns)df.show()Desde un RDD (Legacy)
Section titled “Desde un RDD (Legacy)”A veces tendrás un RDD y querrás convertirlo.
rdd = spark.sparkContext.parallelize(data)
# ✅ OK: Convertir RDD a DataFramedf_from_rdd = rdd.toDF(columns)Leyendo Ficheros (Lo más habitual)
Section titled “Leyendo Ficheros (Lo más habitual)”Spark puede leer multitud de formatos: CSV, JSON, Parquet, ORC, etc.
# Leyendo un CSVdf_csv = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .csv("datos/ventas.csv")
# ❌ INCORRECTO: No especificar opciones importantes como header en CSV# df_malo = spark.read.csv("datos/ventas.csv")
# Leyendo un Parquet (Recomendado)df_parquet = spark.read.parquet("datos/ventas.parquet")3. Operaciones Básicas
Section titled “3. Operaciones Básicas”Una vez tienes tu DataFrame, ¡toca jugar con él!
Inspección
Section titled “Inspección”printSchema(): Muestra la estructura (columnas y tipos).show(): Muestra las primeras filas (por defecto 20).count(): Cuenta el número total de filas.describe(): Resumen estadístico (count, mean, stddev, min, max).
df.printSchema()df.show(5, truncate=False) # Muestra 5 filas y no corta el textoprint(f"Total filas: {df.count()}")Selección y Filtrado
Section titled “Selección y Filtrado”from pyspark.sql.functions import col
# Selección de columnasdf.select("Nombre", "Edad").show()
# Filtrado (Where y Filter son alias, usa el que prefieras)# ✅ OK: Usar sintaxis de columna (col) para mayor seguridaddf.filter(col("Edad") > 30).show()
# También puedes usar expresiones SQLdf.filter("Edad > 30").show()Manipulación de Columnas
Section titled “Manipulación de Columnas”Añadir, renombrar o borrar columnas es muy fácil.
from pyspark.sql.functions import lit
# Añadir una columna constantedf_nuevo = df.withColumn("Pais", lit("España"))
# Renombrardf_renombrado = df.withColumnRenamed("Nombre", "Nom")
# Borrardf_final = df.drop("Edad")4. Agregaciones (groupBy)
Section titled “4. Agregaciones (groupBy)”Agrupar datos es esencial para el análisis. Funciona igual que en SQL: groupBy + función de agregación.
Funciones comunes: count, sum, avg, min, max.
from pyspark.sql.functions import count, avg, desc
# Agrupar por país y contardf.groupBy("Pais").count().show()
# Agregaciones múltiples# ✅ OK: Usar agg() para múltiples cálculos limpiosdf.groupBy("Pais").agg( count("*").alias("Total_Personas"), avg("Edad").alias("Edad_Media"), min("Edad").alias("Edad_Minima")).orderBy(desc("Total_Personas")).show()5. Joins (Uniones)
Section titled “5. Joins (Uniones)”Unir datos de diferentes fuentes es el pan de cada día. Spark soporta los tipos estándar de JOIN.
Supongamos:
- Empleados:
[id, nombre, dep_id] - Departamentos:
[dep_id, nombre_dep]
graph LR subgraph Inner A((A)) ---|Match| B((B)) end subgraph Left C((A)) -->|Todo A + Match| D((B)) endTipos de Join
Section titled “Tipos de Join”- Inner Join (Por defecto): Solo filas que coinciden en ambos.
- Left Join: Todas las del izquierdo, y las que coincidan del derecho.
- Right Join: Todas las del derecho, y las que coincidan del izquierdo.
- Full Outer Join: Todas las filas de ambos.
Ejemplo Práctico
Section titled “Ejemplo Práctico”# Sintaxis: df1.join(df2, condicion, tipo)
# ✅ OK: Especificar explícitamente la condicióndf_join = empleados.join(departamentos, empleados.dep_id == departamentos.dep_id, "left") # "inner", "outer", "right", "left_anti", "left_semi"
# Selección de columnas tras el join para evitar duplicados de IDsdf_join.select(empleados["nombre"], departamentos["nombre_dep"]).show()Broadcast Join (Optimización)
Section titled “Broadcast Join (Optimización)”Si una de las tablas es muy pequeña (ej. tabla maestra de departamentos), Spark puede enviarla a todos los nodos para evitar mover la tabla grande.
from pyspark.sql.functions import broadcast
# Forzamos un broadcast join (aunque Spark suele hacerlo solo si es pequeña)df_optim = empleados.join(broadcast(departamentos), "dep_id")6. Funciones Útiles
Section titled “6. Funciones Útiles”Spark tiene cientos de funciones integradas en pyspark.sql.functions. ¡No reinventes la rueda con UDFs (User Defined Functions) si no es estrictamente necesario! Las nativas son mucho más rápidas.
Fechas y Strings
Section titled “Fechas y Strings”from pyspark.sql.functions import year, month, upper, split
# Extraer año de una fechadf.select(year("fecha_compra")).show()
# Convertir a mayúsculasdf.select(upper("nombre")).show()
# Dividir textodf.select(split("nombre_completo", " ").alias("Array_Nombres")).show()when / otherwise (El “IF” de Spark)
Section titled “when / otherwise (El “IF” de Spark)”from pyspark.sql.functions import when
df.withColumn("Categoria", when(col("edad") < 18, "Menor") .when(col("edad") < 65, "Adulto") .otherwise("Jubilado")).show()7. Buenas Prácticas y Rendimiento
Section titled “7. Buenas Prácticas y Rendimiento”- Filtra pronto: Aplica
filterowherelo antes posible para reducir la cantidad de datos a procesar. - Evita
collect():collect()trae TODOS los datos al nodo driver. Si el DF es grande, tu programa explotará (OOM). Usatake(n)oshow()para inspeccionar. - Usa
explain(): Si una consulta va lenta, usadf.explain()para ver el plan físico y ver si está haciendo lo que esperas. - Caché con cabeza: Si vas a usar el mismo DF varias veces, usa
df.cache(). Recuerda hacerdf.unpersist()cuando acabes.
Ejercicios
Section titled “Ejercicios”Para estos ejercicios, necesitarás cargar datos. Puedes crear los DataFrames manualmente o cargar un CSV de prueba.
Ej. 1: Creación y Filtrado
Section titled “Ej. 1: Creación y Filtrado”Datos: Crea un DataFrame con: [("Ana", 25, "Madrid"), ("Luis", 35, "Barcelona"), ("Pepe", 40, "Madrid")]. Columnas: Nombre, Edad, Ciudad.
Reto: Muestra solo las personas de “Madrid” mayores de 30 años.
Ej. 2: Agregaciones Básicas
Section titled “Ej. 2: Agregaciones Básicas”Datos: Mismo DF anterior. Reto: Calcula la edad media de las personas agrupadas por Ciudad.
Ej. 3: Manipulación de Columnas
Section titled “Ej. 3: Manipulación de Columnas”Reto: Añade una columna nueva “Edad_en_10_años” que sea la columna Edad + 10. Luego renombra “Nombre” a “Usuario”.
Ej. 4: Joins
Section titled “Ej. 4: Joins”Datos:
- DF1 (Personas):
[("Ana", 1), ("Luis", 2), ("Marta", 3)](Cols: Nombre, DepID) - DF2 (Departamentos):
[(1, "IT"), (2, "HR")](Cols: DepID, NombreDep) Reto: Haz un Left Join para ver en qué departamento trabaja cada uno. ¿Qué pasa con Marta?
Ej. 5: SQL Puro
Section titled “Ej. 5: SQL Puro”Reto: Registra el DF del Ej. 1 como tabla temporal (createOrReplaceTempView) y haz la misma consulta (Madrid > 30) usando spark.sql().
Ej. 6: Funciones de String
Section titled “Ej. 6: Funciones de String”Datos: [(" Juan ", "García"), ("Ana", " Pérez ")]
Reto: Limpia los espacios en blanco (trim) de ambos campos y crea una columna nueva “Nombre_Completo” concatenando ambos con un espacio en medio.
Proyecto: Análisis de Ventas
Section titled “Proyecto: Análisis de Ventas”Dataset: Vamos a usar el fichero pdi_sales.csv (ventas).
Objetivo: Analizar el rendimiento de ventas por país y tipo de envío.
Pasos:
- Carga el CSV
pdi_sales.csvcon inferencia de esquema y cabeceras. - Muestra el esquema y cuenta cuántas filas tiene.
- Calcula el total de ingresos (
Revenue) por país (Country). Ordena de mayor a menor. - Filtra las ventas que sean de tipo “FedEx” (busca la columna de envío adecuada, quizás
Ship Mode). - Encuentra cuál es el Estado (
State) con mayor número de ventas (filas) en Estados Unidos (Country = 'USA'- verifica cómo está escrito en los datos). - Añade una columna que clasifique la venta: Si
Revenue> 500 es “Alta”, si no “Normal”. - (Bonus) Calcula la media de ingresos (
Revenue) por año (tendrás que transformar la columna fecha si no la ha detectado bien, o usar funciones de año).
¡Suerte! 🚀