Skip to content

DataFrames en Spark

En este módulo vamos a sumergirnos en el mundo de los DataFrames. Si vienes de usar RDDs, verás que esto es como cambiar de ir en bicicleta a conducir un deportivo.

Los DataFrames son la estructura principal para trabajar con datos estructurados en Spark SQL. Imagínatelos como una tabla en una base de datos relacional o un DataFrame en pandas, pero con superpoderes de optimización bajo el capó.

Los RDD (Resilient Distributed Datasets) son el bloque fundamental de Spark, pero trabajar con ellos puede ser tedioso y menos eficiente para ciertas tareas. Los DataFrames añaden una capa de abstracción que permite a Spark entender qué quieres hacer con los datos, no solo cómo.

graph TD
subgraph RDD[RDD: Caja Negra]
A[Datos opacos] --- B[Datos opacos]
style RDD fill:#f8d7da,stroke:#c62828
end
subgraph DF[DataFrame: Estructurado]
C[Col: Nombre] --- D[Col: Edad]
D --- E[Col: Ciudad]
style DF fill:#c8e6c9,stroke:#388e3c
end
RDD -->|Optimización manual| Result1[Lento en Python]
DF -->|Catalyst Optimizer| Result2[Rápido y Optimizado]
  1. Optimización Automática (Catalyst): Spark optimiza tus consultas por ti.
  2. Gestión de Memoria (Tungsten): Uso mucho más eficiente de la memoria que los objetos Java/Python.
  3. Facilidad de uso: Sintaxis muy parecida a SQL o Pandas.

Hay varias formas de crear un DataFrame. Vamos a ver las más comunes.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrame-Demo").getOrCreate()
data = [("Aitor", 33), ("Maria", 28), ("Juan", 45)]
columns = ["Nombre", "Edad"]
# ✅ OK: Crear DataFrame directamente
df = spark.createDataFrame(data, columns)
df.show()

A veces tendrás un RDD y querrás convertirlo.

rdd = spark.sparkContext.parallelize(data)
# ✅ OK: Convertir RDD a DataFrame
df_from_rdd = rdd.toDF(columns)

Spark puede leer multitud de formatos: CSV, JSON, Parquet, ORC, etc.

# Leyendo un CSV
df_csv = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("datos/ventas.csv")
# ❌ INCORRECTO: No especificar opciones importantes como header en CSV
# df_malo = spark.read.csv("datos/ventas.csv")
# Leyendo un Parquet (Recomendado)
df_parquet = spark.read.parquet("datos/ventas.parquet")

Una vez tienes tu DataFrame, ¡toca jugar con él!

  • printSchema(): Muestra la estructura (columnas y tipos).
  • show(): Muestra las primeras filas (por defecto 20).
  • count(): Cuenta el número total de filas.
  • describe(): Resumen estadístico (count, mean, stddev, min, max).
df.printSchema()
df.show(5, truncate=False) # Muestra 5 filas y no corta el texto
print(f"Total filas: {df.count()}")
from pyspark.sql.functions import col
# Selección de columnas
df.select("Nombre", "Edad").show()
# Filtrado (Where y Filter son alias, usa el que prefieras)
# ✅ OK: Usar sintaxis de columna (col) para mayor seguridad
df.filter(col("Edad") > 30).show()
# También puedes usar expresiones SQL
df.filter("Edad > 30").show()

Añadir, renombrar o borrar columnas es muy fácil.

from pyspark.sql.functions import lit
# Añadir una columna constante
df_nuevo = df.withColumn("Pais", lit("España"))
# Renombrar
df_renombrado = df.withColumnRenamed("Nombre", "Nom")
# Borrar
df_final = df.drop("Edad")

Agrupar datos es esencial para el análisis. Funciona igual que en SQL: groupBy + función de agregación.

Funciones comunes: count, sum, avg, min, max.

from pyspark.sql.functions import count, avg, desc
# Agrupar por país y contar
df.groupBy("Pais").count().show()
# Agregaciones múltiples
# ✅ OK: Usar agg() para múltiples cálculos limpios
df.groupBy("Pais").agg(
count("*").alias("Total_Personas"),
avg("Edad").alias("Edad_Media"),
min("Edad").alias("Edad_Minima")
).orderBy(desc("Total_Personas")).show()

Unir datos de diferentes fuentes es el pan de cada día. Spark soporta los tipos estándar de JOIN.

Supongamos:

  • Empleados: [id, nombre, dep_id]
  • Departamentos: [dep_id, nombre_dep]
graph LR
subgraph Inner
A((A)) ---|Match| B((B))
end
subgraph Left
C((A)) -->|Todo A + Match| D((B))
end
  1. Inner Join (Por defecto): Solo filas que coinciden en ambos.
  2. Left Join: Todas las del izquierdo, y las que coincidan del derecho.
  3. Right Join: Todas las del derecho, y las que coincidan del izquierdo.
  4. Full Outer Join: Todas las filas de ambos.
# Sintaxis: df1.join(df2, condicion, tipo)
# ✅ OK: Especificar explícitamente la condición
df_join = empleados.join(departamentos,
empleados.dep_id == departamentos.dep_id,
"left") # "inner", "outer", "right", "left_anti", "left_semi"
# Selección de columnas tras el join para evitar duplicados de IDs
df_join.select(empleados["nombre"], departamentos["nombre_dep"]).show()

Si una de las tablas es muy pequeña (ej. tabla maestra de departamentos), Spark puede enviarla a todos los nodos para evitar mover la tabla grande.

from pyspark.sql.functions import broadcast
# Forzamos un broadcast join (aunque Spark suele hacerlo solo si es pequeña)
df_optim = empleados.join(broadcast(departamentos), "dep_id")

Spark tiene cientos de funciones integradas en pyspark.sql.functions. ¡No reinventes la rueda con UDFs (User Defined Functions) si no es estrictamente necesario! Las nativas son mucho más rápidas.

from pyspark.sql.functions import year, month, upper, split
# Extraer año de una fecha
df.select(year("fecha_compra")).show()
# Convertir a mayúsculas
df.select(upper("nombre")).show()
# Dividir texto
df.select(split("nombre_completo", " ").alias("Array_Nombres")).show()
from pyspark.sql.functions import when
df.withColumn("Categoria",
when(col("edad") < 18, "Menor")
.when(col("edad") < 65, "Adulto")
.otherwise("Jubilado")
).show()
  1. Filtra pronto: Aplica filter o where lo antes posible para reducir la cantidad de datos a procesar.
  2. Evita collect(): collect() trae TODOS los datos al nodo driver. Si el DF es grande, tu programa explotará (OOM). Usa take(n) o show() para inspeccionar.
  3. Usa explain(): Si una consulta va lenta, usa df.explain() para ver el plan físico y ver si está haciendo lo que esperas.
  4. Caché con cabeza: Si vas a usar el mismo DF varias veces, usa df.cache(). Recuerda hacer df.unpersist() cuando acabes.

Para estos ejercicios, necesitarás cargar datos. Puedes crear los DataFrames manualmente o cargar un CSV de prueba.

Datos: Crea un DataFrame con: [("Ana", 25, "Madrid"), ("Luis", 35, "Barcelona"), ("Pepe", 40, "Madrid")]. Columnas: Nombre, Edad, Ciudad. Reto: Muestra solo las personas de “Madrid” mayores de 30 años.

Datos: Mismo DF anterior. Reto: Calcula la edad media de las personas agrupadas por Ciudad.

Reto: Añade una columna nueva “Edad_en_10_años” que sea la columna Edad + 10. Luego renombra “Nombre” a “Usuario”.

Datos:

  • DF1 (Personas): [("Ana", 1), ("Luis", 2), ("Marta", 3)] (Cols: Nombre, DepID)
  • DF2 (Departamentos): [(1, "IT"), (2, "HR")] (Cols: DepID, NombreDep) Reto: Haz un Left Join para ver en qué departamento trabaja cada uno. ¿Qué pasa con Marta?

Reto: Registra el DF del Ej. 1 como tabla temporal (createOrReplaceTempView) y haz la misma consulta (Madrid > 30) usando spark.sql().

Datos: [(" Juan ", "García"), ("Ana", " Pérez ")] Reto: Limpia los espacios en blanco (trim) de ambos campos y crea una columna nueva “Nombre_Completo” concatenando ambos con un espacio en medio.


Dataset: Vamos a usar el fichero pdi_sales.csv (ventas). Objetivo: Analizar el rendimiento de ventas por país y tipo de envío.

Pasos:

  1. Carga el CSV pdi_sales.csv con inferencia de esquema y cabeceras.
  2. Muestra el esquema y cuenta cuántas filas tiene.
  3. Calcula el total de ingresos (Revenue) por país (Country). Ordena de mayor a menor.
  4. Filtra las ventas que sean de tipo “FedEx” (busca la columna de envío adecuada, quizás Ship Mode).
  5. Encuentra cuál es el Estado (State) con mayor número de ventas (filas) en Estados Unidos (Country = 'USA' - verifica cómo está escrito en los datos).
  6. Añade una columna que clasifique la venta: Si Revenue > 500 es “Alta”, si no “Normal”.
  7. (Bonus) Calcula la media de ingresos (Revenue) por año (tendrás que transformar la columna fecha si no la ha detectado bien, o usar funciones de año).

¡Suerte! 🚀