Reto de Ingeniería
Implementa One-Hot Encoding manualmente sin usar OneHotEncoder.
Objetivo: Entender matrices sparse y optimización distribuida desde cero.
01. Contexto del Problema
📊 Dataset: Transacciones de E-commerce
Tienes un dataset con millones de transacciones. Cada fila contiene información categórica de alta cardinalidad:
| transaction_id | producto | categoria | marca | monto |
|---|---|---|---|---|
| TXN001 | Laptop Dell XPS 15 | Electrónica | Dell | $1,299 |
| TXN002 | Nike Air Max | Ropa | Nike | $150 |
| TXN003 | iPhone 15 Pro | Electrónica | Apple | $999 |
- • producto: 50,000 únicos
- • categoria: 200 únicas
- • marca: 5,000 únicas
•
OneHotEncoder• Librerías de encoding
¿Por qué este reto es importante?
Usar OneHotEncoder es fácil, pero no entiendes lo que sucede internamente.
Al implementarlo manualmente, aprenderás:
• Cómo se construyen matrices sparse (formato CSR)
• Por qué broadcast es crítico para evitar shuffles
• Cómo optimizar memoria con alta cardinalidad
• Técnicas de ingeniería que aplicarás en producción
02. El Desafío: 4 Fases
Fase 1: Crear Mapeo Manual (String → Índice)
Sin usar StringIndexer, crea un diccionario que mapee cada string único a un índice entero.
{"Laptop": 0, "Smartphone": 1, "Tablet": 2, ...}
# Pista: Necesitas extraer valores únicos
productos_unicos = df.select("producto").distinct().collect()
# TODO: Crear diccionario {producto: índice}
# Desafío: ¿Cómo evitar collect() que trae TODO al driver?
mapeo_producto = # ...
collect() con 50,000 productos puede saturar la memoria del driver.
Considera usar broadcast después de crear el mapeo.
Fase 2: Convertir a SparseVector
Dada una fila con producto = "Laptop" y el mapeo dice "Laptop" → índice 342,
crea un SparseVector(50000, [342], [1.0]).
• indices: Posiciones de valores no-cero
• values: Valores en esas posiciones
from pyspark.ml.linalg import Vectors, SparseVector
# UDF para convertir string → SparseVector
def string_to_sparse(producto, mapeo_bc, total_dim):
# TODO:
# 1. Obtener índice desde mapeo_bc.value
# 2. Crear SparseVector(total_dim, [indice], [1.0])
# 3. Manejar caso: producto no existe en mapeo (categoría nueva)
pass
# Broadcast del mapeo
bc_mapeo = spark.sparkContext.broadcast(mapeo_producto)
# Registrar UDF
from pyspark.sql.types import *
# ... (necesitas schema para SparseVector)
Vectors.sparse() acepta (size, indices, values). Asegúrate que indices y values sean listas/arrays.
Fase 3: Optimización - Evitar collect() en el Loop
El enfoque naive usa collect() para crear el mapeo, lo cual falla con millones de categorías.
Optimiza usando broadcast + zipWithIndex.
# Enfoque optimizado (sin collect en loop)
from pyspark.sql import functions as F
# 1. Extraer categorías únicas CON índices
productos_indexed = (
df.select("producto")
.distinct()
.rdd
.zipWithIndex() # Asigna índice automáticamente
.map(lambda x: (x[0][0], int(x[1]))) # (producto, index)
)
# 2. Convertir a diccionario (esto sí requiere collect, pero solo 50k elementos)
mapeo_producto = dict(productos_indexed.collect())
# 3. Broadcast
bc_mapeo = spark.sparkContext.broadcast(mapeo_producto)
# TODO: Aplicar UDF a DataFrame sin usar collect() en cada fila
collect() dentro del loop• Enviar mapeo en cada UDF call
• Memoria del driver explota
zipWithIndex() distribuido• Broadcast del mapeo (1 vez)
• UDF accede a broadcast.value
Fase 4: Validación y Comparación
Compara tu implementación manual con OneHotEncoder oficial de Spark para verificar correctitud.
# Tu implementación manual
df_manual = tu_funcion_ohe(df, "producto", "producto_ohe_manual")
# Implementación oficial de Spark
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
indexer = StringIndexer(inputCol="producto", outputCol="producto_idx")
encoder = OneHotEncoder(inputCol="producto_idx", outputCol="producto_ohe_oficial")
pipeline = Pipeline(stages=[indexer, encoder])
df_oficial = pipeline.fit(df).transform(df)
# Comparar: ¿Son idénticos los vectores?
# TODO: Escribir función de comparación
03. Dataset de Prueba
Código para generar dataset sintético
import random
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("OHE_Challenge").getOrCreate()
# Generar 1 millón de transacciones con alta cardinalidad
productos = [f"Producto_{i}" for i in range(50000)] # 50k productos únicos
categorias = [f"Categoria_{i}" for i in range(200)] # 200 categorías
marcas = [f"Marca_{i}" for i in range(5000)] # 5k marcas
# Generar datos
data = [
(
f"TXN{i:08d}",
random.choice(productos),
random.choice(categorias),
random.choice(marcas),
round(random.uniform(10, 2000), 2)
)
for i in range(1_000_000)
]
df = spark.createDataFrame(
data,
["transaction_id", "producto", "categoria", "marca", "monto"]
)
# Cachear para evitar recomputación
df.cache()
df.count() # Trigger cache
print(f"Dataset generado: {df.count()} filas")
df.show(5)
+-------------+-----------------+-------------+---------+-------+
|transaction_id|producto |categoria |marca |monto |
+-------------+-----------------+-------------+---------+-------+
|TXN00000000 |Producto_12345 |Categoria_42 |Marca_789|123.45 |
|TXN00000001 |Producto_9876 |Categoria_15 |Marca_234|456.78 |
+-------------+-----------------+-------------+---------+-------+
04. Criterios de Evaluación
Correctitud (40%)
- ✓ Vector sparse correctamente formado (dimensión, índices, valores)
- ✓ Mapeo string → índice sin colisiones
-
✓
Resultado idéntico a
OneHotEncoderoficial - ✓ Manejo de categorías no vistas en test set
Eficiencia (30%)
-
⚡
No usar
collect()dentro del loop principal - ⚡ Broadcast del mapeo (evitar transferencias repetidas)
- ⚡ Uso de SparseVector (no DenseVector)
- ⚡ Tiempo comparable a OneHotEncoder (±20%)
Manejo de Memoria (20%)
- 💾 Calcular % de ceros en vector final
- 💾 Comparar tamaño Dense vs Sparse (en bytes)
- 💾 Demostrar que sparse ahorra >90% memoria
Comprensión (10%)
- 📖 Comentarios explicando por qué sparse es mejor
- 📖 Documentación de tradeoffs (memoria vs velocidad)
- 📖 Análisis de complejidad (tiempo/espacio)
05. Desafíos Bonus
Bonus 1: k-1 Encoding
Implementa k-1 encoding (eliminar última columna para evitar colinealidad).
# Con 3 categorías, genera 2 columnas
# [1, 0] = categoria 0
# [0, 1] = categoria 1
# [0, 0] = categoria 2 (implícito)
Bonus 2: Categorías Desconocidas
Maneja categorías nuevas en test set que no existían en train set.
• Asignar a índice especial "unknown"
• Vector de ceros (ignorar categoría)
• Lanzar error (strict mode)
Bonus 3: Frecuencia de Categorías
En vez de [0, 1] binario, usa frecuencia relativa [0, 0.15] (15% de apariciones).
# "Laptop" aparece en 15% de transacciones
# SparseVector(n, [342], [0.15])
Bonus 4: Pipeline Serializable
Envuelve tu implementación en una clase que herede de Transformer y sea serializable.
class CustomOHE(Transformer):
def _transform(self, df):
# tu código aquí
🎯 Objetivo de Aprendizaje
Al completar este reto, habrás dominado:
- • Matrices sparse (formato CSR)
- • Indicadores binarios \(\mathbb{1}(x = c)\)
- • Ahorro de memoria: O(n) vs O(n×k)
- • Broadcast variables
- • UDFs distribuidas
- • zipWithIndex para asignación de IDs
- • Evitar shuffles innecesarios
- • Tradeoffs memoria/velocidad
- • Profiling de Spark jobs
- • Serialización de modelos
- • Manejo de datos nuevos
- • Testing y validación
💡 Insight clave: Este reto representa el 90% de lo que necesitas saber
para implementar feature engineering personalizado en producción. Las librerías como OneHotEncoder
son convenientes, pero entender lo que hacen internamente te permite debuggear, optimizar, y adaptar cuando
los requisitos no encajan con las herramientas estándar.