SECCIÓN 13

RETO FINAL.challenge()

ONE-HOT ENCODING MANUAL SIN LIBRERÍAS

ENGINEERING CHALLENGE

Reto de Ingeniería

Implementa One-Hot Encoding manualmente sin usar OneHotEncoder.
Objetivo: Entender matrices sparse y optimización distribuida desde cero.

01. Contexto del Problema

📊 Dataset: Transacciones de E-commerce

Tienes un dataset con millones de transacciones. Cada fila contiene información categórica de alta cardinalidad:

transaction_id producto categoria marca monto
TXN001 Laptop Dell XPS 15 Electrónica Dell $1,299
TXN002 Nike Air Max Ropa Nike $150
TXN003 iPhone 15 Pro Electrónica Apple $999
⚠️ Alto Cardinalidad
  • producto: 50,000 únicos
  • categoria: 200 únicas
  • marca: 5,000 únicas
🎯 Objetivo
Convertir estas columnas categóricas en SparseVectors para entrenar un modelo de ML.
🚫 Restricción
No puedes usar:
OneHotEncoder
• Librerías de encoding

¿Por qué este reto es importante?

Usar OneHotEncoder es fácil, pero no entiendes lo que sucede internamente. Al implementarlo manualmente, aprenderás:

• Cómo se construyen matrices sparse (formato CSR)
• Por qué broadcast es crítico para evitar shuffles
• Cómo optimizar memoria con alta cardinalidad
• Técnicas de ingeniería que aplicarás en producción

02. El Desafío: 4 Fases

1

Fase 1: Crear Mapeo Manual (String → Índice)

Sin usar StringIndexer, crea un diccionario que mapee cada string único a un índice entero.

$$ f: \mathcal{S} \to \mathbb{N}, \quad \text{donde } \mathcal{S} = \{\text{"Laptop"}, \text{"Smartphone"}, \ldots\} $$
Ejemplo: {"Laptop": 0, "Smartphone": 1, "Tablet": 2, ...}
# Pista: Necesitas extraer valores únicos
productos_unicos = df.select("producto").distinct().collect()

# TODO: Crear diccionario {producto: índice}
# Desafío: ¿Cómo evitar collect() que trae TODO al driver?
mapeo_producto = # ...
⚠️ Cuidado: collect() con 50,000 productos puede saturar la memoria del driver. Considera usar broadcast después de crear el mapeo.
2

Fase 2: Convertir a SparseVector

Dada una fila con producto = "Laptop" y el mapeo dice "Laptop" → índice 342, crea un SparseVector(50000, [342], [1.0]).

$$ \text{SparseVector}(n, \text{indices}, \text{values}) $$
• \(n\): Dimensión total (número de categorías)
• indices: Posiciones de valores no-cero
• values: Valores en esas posiciones
from pyspark.ml.linalg import Vectors, SparseVector

# UDF para convertir string → SparseVector
def string_to_sparse(producto, mapeo_bc, total_dim):
    # TODO:
    # 1. Obtener índice desde mapeo_bc.value
    # 2. Crear SparseVector(total_dim, [indice], [1.0])
    # 3. Manejar caso: producto no existe en mapeo (categoría nueva)
    pass

# Broadcast del mapeo
bc_mapeo = spark.sparkContext.broadcast(mapeo_producto)

# Registrar UDF
from pyspark.sql.types import *
# ... (necesitas schema para SparseVector)
💡 Pista: Vectors.sparse() acepta (size, indices, values). Asegúrate que indices y values sean listas/arrays.
3

Fase 3: Optimización - Evitar collect() en el Loop

El enfoque naive usa collect() para crear el mapeo, lo cual falla con millones de categorías. Optimiza usando broadcast + zipWithIndex.

# Enfoque optimizado (sin collect en loop)
from pyspark.sql import functions as F

# 1. Extraer categorías únicas CON índices
productos_indexed = (
    df.select("producto")
    .distinct()
    .rdd
    .zipWithIndex()  # Asigna índice automáticamente
    .map(lambda x: (x[0][0], int(x[1])))  # (producto, index)
)

# 2. Convertir a diccionario (esto sí requiere collect, pero solo 50k elementos)
mapeo_producto = dict(productos_indexed.collect())

# 3. Broadcast
bc_mapeo = spark.sparkContext.broadcast(mapeo_producto)

# TODO: Aplicar UDF a DataFrame sin usar collect() en cada fila
❌ Enfoque Naive
collect() dentro del loop
• Enviar mapeo en cada UDF call
• Memoria del driver explota
✅ Enfoque Optimizado
zipWithIndex() distribuido
• Broadcast del mapeo (1 vez)
• UDF accede a broadcast.value
4

Fase 4: Validación y Comparación

Compara tu implementación manual con OneHotEncoder oficial de Spark para verificar correctitud.

# Tu implementación manual
df_manual = tu_funcion_ohe(df, "producto", "producto_ohe_manual")

# Implementación oficial de Spark
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

indexer = StringIndexer(inputCol="producto", outputCol="producto_idx")
encoder = OneHotEncoder(inputCol="producto_idx", outputCol="producto_ohe_oficial")
pipeline = Pipeline(stages=[indexer, encoder])
df_oficial = pipeline.fit(df).transform(df)

# Comparar: ¿Son idénticos los vectores?
# TODO: Escribir función de comparación
Validación 1: Verificar que ambos vectores sean SparseVector con misma dimensión.
Validación 2: Comparar índices y valores no-ceros (deben ser idénticos).
Validación 3: Medir tiempo de ejecución: ¿Tu implementación es comparable?

03. Dataset de Prueba

Código para generar dataset sintético

import random
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("OHE_Challenge").getOrCreate()

# Generar 1 millón de transacciones con alta cardinalidad
productos = [f"Producto_{i}" for i in range(50000)]  # 50k productos únicos
categorias = [f"Categoria_{i}" for i in range(200)]      # 200 categorías
marcas = [f"Marca_{i}" for i in range(5000)]            # 5k marcas

# Generar datos
data = [
    (
        f"TXN{i:08d}",
        random.choice(productos),
        random.choice(categorias),
        random.choice(marcas),
        round(random.uniform(10, 2000), 2)
    )
    for i in range(1_000_000)
]

df = spark.createDataFrame(
    data,
    ["transaction_id", "producto", "categoria", "marca", "monto"]
)

# Cachear para evitar recomputación
df.cache()
df.count()  # Trigger cache

print(f"Dataset generado: {df.count()} filas")
df.show(5)
Salida esperada:
+-------------+-----------------+-------------+---------+-------+
|transaction_id|producto         |categoria    |marca    |monto  |
+-------------+-----------------+-------------+---------+-------+
|TXN00000000  |Producto_12345   |Categoria_42 |Marca_789|123.45 |
|TXN00000001  |Producto_9876    |Categoria_15 |Marca_234|456.78 |
+-------------+-----------------+-------------+---------+-------+

04. Criterios de Evaluación

Correctitud (40%)

  • Vector sparse correctamente formado (dimensión, índices, valores)
  • Mapeo string → índice sin colisiones
  • Resultado idéntico a OneHotEncoder oficial
  • Manejo de categorías no vistas en test set

Eficiencia (30%)

  • No usar collect() dentro del loop principal
  • Broadcast del mapeo (evitar transferencias repetidas)
  • Uso de SparseVector (no DenseVector)
  • Tiempo comparable a OneHotEncoder (±20%)

Manejo de Memoria (20%)

  • 💾 Calcular % de ceros en vector final
  • 💾 Comparar tamaño Dense vs Sparse (en bytes)
  • 💾 Demostrar que sparse ahorra >90% memoria

Comprensión (10%)

  • 📖 Comentarios explicando por qué sparse es mejor
  • 📖 Documentación de tradeoffs (memoria vs velocidad)
  • 📖 Análisis de complejidad (tiempo/espacio)

05. Desafíos Bonus

Bonus 1: k-1 Encoding

Implementa k-1 encoding (eliminar última columna para evitar colinealidad).

# Con 3 categorías, genera 2 columnas
# [1, 0] = categoria 0
# [0, 1] = categoria 1
# [0, 0] = categoria 2 (implícito)
+10 puntos extra

Bonus 2: Categorías Desconocidas

Maneja categorías nuevas en test set que no existían en train set.

Estrategias:
• Asignar a índice especial "unknown"
• Vector de ceros (ignorar categoría)
• Lanzar error (strict mode)
+10 puntos extra

Bonus 3: Frecuencia de Categorías

En vez de [0, 1] binario, usa frecuencia relativa [0, 0.15] (15% de apariciones).

# "Laptop" aparece en 15% de transacciones
# SparseVector(n, [342], [0.15])
+15 puntos extra

Bonus 4: Pipeline Serializable

Envuelve tu implementación en una clase que herede de Transformer y sea serializable.

class CustomOHE(Transformer):
    def _transform(self, df):
        # tu código aquí
+20 puntos extra (nivel avanzado)

🎯 Objetivo de Aprendizaje

Al completar este reto, habrás dominado:

✅ Matemática
  • • Matrices sparse (formato CSR)
  • • Indicadores binarios \(\mathbb{1}(x = c)\)
  • • Ahorro de memoria: O(n) vs O(n×k)
✅ Ingeniería
  • • Broadcast variables
  • • UDFs distribuidas
  • • zipWithIndex para asignación de IDs
✅ Optimización
  • • Evitar shuffles innecesarios
  • • Tradeoffs memoria/velocidad
  • • Profiling de Spark jobs
✅ Producción
  • • Serialización de modelos
  • • Manejo de datos nuevos
  • • Testing y validación

💡 Insight clave: Este reto representa el 90% de lo que necesitas saber para implementar feature engineering personalizado en producción. Las librerías como OneHotEncoder son convenientes, pero entender lo que hacen internamente te permite debuggear, optimizar, y adaptar cuando los requisitos no encajan con las herramientas estándar.