Reto: Tuning Completo de Pipeline

Desafío Integral de Optimización de Hiperparámetros

Volver

Desafío: Optimización End-to-End

En este reto final, construirás y optimizarás un pipeline completo de machine learning desde feature engineering hasta el modelo predictivo. Tunearás múltiples stages simultáneamente, analizarás los resultados, y validarás la mejora sobre un baseline.

Contexto del Problema

Trabajas en una empresa de telecomunicaciones que quiere predecir churn (abandono de clientes). Tienes un dataset con 100,000 clientes y 20 features (demográficos, uso de servicios, facturación). Tu objetivo: construir el mejor modelo posible optimizando TODO el pipeline.

Dataset

  • • N = 100,000 clientes
  • • 15 features numéricas
  • • 5 features categóricas
  • • Target: churn (0/1)
  • • Desbalance: 20% churn

Métrica Objetivo

  • • AUC-ROC (principal)
  • • Precision@20% (secundaria)
  • • Baseline: AUC = 0.73
  • • Meta: AUC ≥ 0.85

Restricciones

  • • Presupuesto: 200 configs
  • • Tiempo: < 4 horas
  • • Inferencia: < 100ms
  • • Interpretabilidad: Media

Fase 1: Construir Pipeline Base

Objetivo de la Fase

Construir un pipeline robusto con preprocesamiento de features numéricas y categóricas, seguido de un modelo clasificador. Este pipeline será la base para el tuning.

Arquitectura del Pipeline

graph LR
    A[Raw Data] --> B[StringIndexer
Categóricas] B --> C[OneHotEncoder
Sparse Vectors] C --> D[VectorAssembler
Features + Encoded] D --> E[StandardScaler
μ=0, σ=1] E --> F[PCA
Reducción Dim] F --> G[GBTClassifier
Gradient Boosting] G --> H[Predictions] style A fill:#9333ea,stroke:#a855f7,color:#fff style B fill:#3b82f6,stroke:#60a5fa,color:#fff style C fill:#3b82f6,stroke:#60a5fa,color:#fff style D fill:#10b981,stroke:#34d399,color:#fff style E fill:#10b981,stroke:#34d399,color:#fff style F fill:#f59e0b,stroke:#fbbf24,color:#000 style G fill:#ef4444,stroke:#f87171,color:#fff style H fill:#8b5cf6,stroke:#a78bfa,color:#fff

Pipeline de 7 stages: preprocesamiento categórico → ensamblaje → escalado → PCA → modelo

Código: Construcción del Pipeline

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, PCA
from pyspark.ml.classification import GBTClassifier

# ============================================================
# DATOS DE ENTRADA
# ============================================================
# df_train con columnas:
# - Numéricas: age, tenure, monthly_charges, total_charges, ...  (15 features)
# - Categóricas: contract, payment_method, internet_service, ... (5 features)
# - Target: churn (0 o 1)

numeric_features = [
    "age", "tenure", "monthly_charges", "total_charges",
    "num_services", "avg_call_duration", "data_usage_gb",
    "num_support_tickets", "avg_bill_amount", "credit_score",
    "num_devices", "streaming_hours", "roaming_minutes",
    "voicemail_messages", "international_calls"
]  # 15 features numéricas

categorical_features = [
    "contract", "payment_method", "internet_service",
    "device_type", "region"
]  # 5 features categóricas

# ============================================================
# STAGE 1-2: Procesar features categóricas
# ============================================================
# StringIndexer: convertir strings a índices numéricos
indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep")
    for col in categorical_features
]

# OneHotEncoder: convertir índices a vectores sparse one-hot
encoder = OneHotEncoder(
    inputCols=[f"{col}_index" for col in categorical_features],
    outputCols=[f"{col}_encoded" for col in categorical_features],
    dropLast=True  # Evitar multicolinealidad
)

# ============================================================
# STAGE 3: Ensamblar todas las features
# ============================================================
assembler = VectorAssembler(
    inputCols=numeric_features + [f"{col}_encoded" for col in categorical_features],
    outputCol="raw_features",
    handleInvalid="skip"  # Saltar filas con NaN
)

# ============================================================
# STAGE 4: Estandarizar features numéricas
# ============================================================
scaler = StandardScaler(
    inputCol="raw_features",
    outputCol="scaled_features",
    withMean=True,   # Centrar en 0
    withStd=True     # Escalar a std=1
)

# ============================================================
# STAGE 5: PCA para reducción dimensional (opcional pero tuneable)
# ============================================================
pca = PCA(
    inputCol="scaled_features",
    outputCol="features"
    # k se definirá en el grid de tuning
)

# ============================================================
# STAGE 6: Modelo Gradient Boosting Trees
# ============================================================
gbt = GBTClassifier(
    featuresCol="features",
    labelCol="churn",
    seed=42,
    predictionCol="prediction",
    rawPredictionCol="rawPrediction",
    probabilityCol="probability"
)

# ============================================================
# CONSTRUIR PIPELINE COMPLETO
# ============================================================
pipeline_stages = indexers + [encoder, assembler, scaler, pca, gbt]

pipeline = Pipeline(stages=pipeline_stages)

print(f"Pipeline construido con {len(pipeline_stages)} stages")
print("Stages:")
for idx, stage in enumerate(pipeline_stages):
    print(f"  {idx+1}. {stage.__class__.__name__}")

# Output:
# Pipeline construido con 10 stages
# Stages:
#   1. StringIndexer
#   2. StringIndexer
#   3. StringIndexer
#   4. StringIndexer
#   5. StringIndexer
#   6. OneHotEncoder
#   7. VectorAssembler
#   8. StandardScaler
#   9. PCA
#   10. GBTClassifier

Criterios de Éxito - Fase 1

  • Pipeline compila sin errores
  • Maneja features numéricas y categóricas correctamente
  • Incluye al menos 3 stages tuneables (PCA, GBT params)
  • Preparado para ser pasado a CrossValidator

Fase 2: Definir Grid de Hiperparámetros

Objetivo de la Fase

Diseñar un grid inteligente que explore el espacio de hiperparámetros de manera eficiente. Debes tunear al menos 6 hiperparámetros de diferentes stages del pipeline, balanceando exploración y costo computacional.

Estrategia: Coarse-to-Fine Grid Search

Con presupuesto limitado (200 configs), usa estrategia de 2 rondas:

  1. 1. Grid grueso: Pocos valores (3 por dim), amplio rango → identificar región prometedora (~80 configs)
  2. 2. Grid fino: Más valores (4-5 por dim), rango estrecho alrededor del mejor → refinar (~120 configs)

Para el reto, implementaremos directamente un grid inteligente que balancea ambas estrategias.

Código: Grid Multidimensional

from pyspark.ml.tuning import ParamGridBuilder

# ============================================================
# ESTRATEGIA DE SELECCIÓN DE HIPERPARÁMETROS
# ============================================================

# Hiperparámetros a tunear (6 total):
# 1. PCA.k: Dimensionalidad reducida (stage 5)
# 2. GBT.maxIter: Número de árboles (stage 6)
# 3. GBT.maxDepth: Profundidad máxima de árboles (stage 6)
# 4. GBT.stepSize: Learning rate (stage 6)
# 5. GBT.subsamplingRate: Fracción de datos por iteración (stage 6)
# 6. GBT.maxBins: Bins para discretización (stage 6)

# ============================================================
# CONSTRUCCIÓN DEL GRID
# ============================================================

paramGrid = ParamGridBuilder() \
    .addGrid(pca.k, [10, 20, 30, 40]) \
    .addGrid(gbt.maxIter, [50, 100, 150]) \
    .addGrid(gbt.maxDepth, [4, 5, 6, 7]) \
    .addGrid(gbt.stepSize, [0.05, 0.1, 0.2]) \
    .addGrid(gbt.subsamplingRate, [0.7, 0.85, 1.0]) \
    .addGrid(gbt.maxBins, [32, 64]) \
    .build()

# ============================================================
# ANÁLISIS DEL ESPACIO DE BÚSQUEDA
# ============================================================

grid_size = len(paramGrid)
print(f"Tamaño del grid: {grid_size} configuraciones")
print(f"Cálculo: 4 (k) × 3 (maxIter) × 4 (maxDepth) × 3 (stepSize) × 3 (subsamplingRate) × 2 (maxBins)")
print(f"       = {4*3*4*3*3*2} configuraciones")

# Con 5-fold CV: 288 configs × 5 folds = 1,440 entrenamientos
print(f"\nCon 5-fold CV: {grid_size} × 5 = {grid_size * 5} entrenamientos totales")

# ============================================================
# JUSTIFICACIÓN DE CADA HIPERPARÁMETRO
# ============================================================

print("\n========== JUSTIFICACIÓN DE VALORES ==========\n")

print("1. PCA.k (dimensionalidad):")
print("   Valores: [10, 20, 30, 40]")
print("   Razón: Dataset tiene ~50 features post-encoding.")
print("          Explorar reducción agresiva (k=10, 80% reducción)")
print("          hasta conservadora (k=40, 20% reducción)")
print("   Escala: Lineal (enteros)")

print("\n2. GBT.maxIter (número de árboles):")
print("   Valores: [50, 100, 150]")
print("   Razón: Más árboles → mayor capacidad pero riesgo de overfitting")
print("          50 árboles: baseline rápido")
print("          150 árboles: máximo poder representacional")
print("   Escala: Lineal")

print("\n3. GBT.maxDepth (profundidad de árboles):")
print("   Valores: [4, 5, 6, 7]")
print("   Razón: Controla complejidad de cada árbol individual")
print("          depth=4: árboles simples (16 hojas max)")
print("          depth=7: árboles complejos (128 hojas max)")
print("   Escala: Lineal (exponencial en hojas: 2^depth)")

print("\n4. GBT.stepSize (learning rate):")
print("   Valores: [0.05, 0.1, 0.2]")
print("   Razón: Controla agresividad del boosting")
print("          0.05: conservador, requiere más árboles")
print("          0.2: agresivo, puede divergir")
print("   Escala: Logarítmica parcial")

print("\n5. GBT.subsamplingRate (fracción de datos):")
print("   Valores: [0.7, 0.85, 1.0]")
print("   Razón: Stochastic gradient boosting para regularización")
print("          1.0: sin subsampling (determinístico)")
print("          0.7: 30% de datos descartados por iteración")
print("   Escala: Lineal en [0.5, 1.0]")

print("\n6. GBT.maxBins (discretización):")
print("   Valores: [32, 64]")
print("   Razón: Granularidad de splits en features continuas")
print("          Más bins → mayor precisión pero más costo")
print("          32 bins suficiente para mayoría de casos")
print("   Escala: Potencias de 2")

# ============================================================
# VERIFICAR PRESUPUESTO
# ============================================================

budget = 200  # Presupuesto máximo de configs
if grid_size <= budget:
    print(f"\n✓ Grid dentro del presupuesto: {grid_size} ≤ {budget}")
else:
    print(f"\n✗ Grid excede presupuesto: {grid_size} > {budget}")
    print("   Considera reducir valores o usar Random Search")

# ============================================================
# ESTIMACIÓN DE TIEMPO
# ============================================================

# Supongamos cada entrenamiento toma 2 minutos en promedio
time_per_training = 2  # minutos
total_trainings = grid_size * 5  # 5-fold CV
total_time_sequential = total_trainings * time_per_training  # minutos

# Con paralelización (supongamos 10 executors)
parallelism = 10
total_time_parallel = total_time_sequential / parallelism

print(f"\n========== ESTIMACIÓN DE TIEMPO ==========")
print(f"Tiempo por entrenamiento: {time_per_training} min")
print(f"Total entrenamientos: {total_trainings}")
print(f"Tiempo secuencial: {total_time_sequential} min ({total_time_sequential/60:.1f} horas)")
print(f"Tiempo con {parallelism} executors: {total_time_parallel} min ({total_time_parallel/60:.1f} horas)")

if total_time_parallel <= 240:  # 4 horas
    print(f"✓ Tiempo estimado dentro de restricción (< 4 horas)")
else:
    print(f"✗ Tiempo estimado excede restricción (> 4 horas)")
    print("   Considera TrainValidationSplit o reducir grid")

Alternativa: Random Search para Mayor Exploración

Si quieres explorar más dimensiones con el mismo presupuesto, implementa Random Search:

import numpy as np

# Generar 200 configuraciones aleatorias
num_configs = 200

random_params = []
for _ in range(num_configs):
    config = ParamMap()
    config[pca.k] = np.random.choice([10, 15, 20, 25, 30, 35, 40])
    config[gbt.maxIter] = np.random.choice([30, 50, 75, 100, 150, 200])
    config[gbt.maxDepth] = np.random.randint(3, 8)  # [3, 7]
    config[gbt.stepSize] = np.random.choice([0.01, 0.05, 0.1, 0.15, 0.2, 0.3])
    config[gbt.subsamplingRate] = np.random.uniform(0.6, 1.0)
    config[gbt.maxBins] = np.random.choice([16, 32, 64, 128])
    random_params.append(config)

# Usar en CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=random_params, ...)

Random Search explora 200 configs en espacio continuo/discreto mixto, mejor coverage que grid regular.

Criterios de Éxito - Fase 2

  • Grid explora al menos 6 hiperparámetros diferentes
  • Tamaño del grid ≤ 300 configuraciones
  • Valores seleccionados con justificación (escala log/linear apropiada)
  • Estimación de tiempo computacional < 4 horas

Fase 3: Ejecutar Cross-Validation y Estrategia de Búsqueda

Objetivo de la Fase

Configurar y ejecutar CrossValidator con estrategia eficiente. Implementar paralelización, caching, y monitoreo para optimizar el tiempo de ejecución.

Código: CrossValidator Optimizado

from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import time

# ============================================================
# PASO 1: Configurar Evaluador
# ============================================================

# Métrica principal: AUC-ROC
evaluator = BinaryClassificationEvaluator(
    labelCol="churn",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"  # Maximizar AUC
)

# ============================================================
# PASO 2: Configurar CrossValidator
# ============================================================

cv = CrossValidator(
    estimator=pipeline,              # Pipeline completo (10 stages)
    estimatorParamMaps=paramGrid,    # Grid de 288 configuraciones
    evaluator=evaluator,             # AUC-ROC
    numFolds=5,                      # K=5 folds (balance sesgo-varianza)
    parallelism=10,                  # 10 configs en paralelo
    seed=42,                         # Reproducibilidad
    collectSubModels=False           # No guardar sub-modelos (ahorra memoria)
)

print("CrossValidator configurado:")
print(f"  - Configuraciones: {len(paramGrid)}")
print(f"  - Folds: 5")
print(f"  - Total entrenamientos: {len(paramGrid) * 5}")
print(f"  - Paralelismo: 10")

# ============================================================
# PASO 3: Optimizaciones Pre-Entrenamiento
# ============================================================

# 3.1. Cachear dataset en memoria para evitar recomputación
df_train_cached = df_train.cache()
df_train_cached.count()  # Forzar materialización
print(f"\n✓ Dataset cacheado: {df_train_cached.count()} filas")

# 3.2. Repartir dataset para balancear carga
optimal_partitions = 200  # Regla: ~10K filas por partición
df_train_cached = df_train_cached.repartition(optimal_partitions)
print(f"✓ Dataset reparticionado: {optimal_partitions} particiones")

# 3.3. Checkpoint para tolerancia a fallos (opcional)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setCheckpointDir("/tmp/spark-checkpoint")
# df_train_cached = df_train_cached.checkpoint()

# ============================================================
# PASO 4: Ejecutar Grid Search (¡el momento de la verdad!)
# ============================================================

print("\n" + "="*60)
print("INICIANDO GRID SEARCH CON CROSS-VALIDATION")
print("="*60)
print(f"Hora de inicio: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")

start_time = time.time()

# ENTRENAR (esto puede tomar varias horas)
cvModel = cv.fit(df_train_cached)

end_time = time.time()
elapsed_time = (end_time - start_time) / 60  # minutos

print("\n" + "="*60)
print("GRID SEARCH COMPLETADO")
print("="*60)
print(f"Hora de fin: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Tiempo total: {elapsed_time:.1f} minutos ({elapsed_time/60:.2f} horas)")
print(f"Tiempo promedio por entrenamiento: {elapsed_time / (len(paramGrid) * 5):.2f} min")

# ============================================================
# PASO 5: Extraer Mejor Modelo
# ============================================================

bestPipeline = cvModel.bestModel
best_auc_cv = max(cvModel.avgMetrics)

print(f"\n{'='*60}")
print("MEJOR MODELO ENCONTRADO")
print(f"{'='*60}")
print(f"AUC-ROC (Cross-Validation): {best_auc_cv:.4f}")

# Extraer hiperparámetros óptimos del pipeline
best_pca = bestPipeline.stages[-2]  # PCA es penúltimo stage
best_gbt = bestPipeline.stages[-1]  # GBT es último stage

print("\nHiperparámetros óptimos:")
print(f"  PCA.k (componentes): {best_pca.getK()}")
print(f"  GBT.maxIter (árboles): {best_gbt.getMaxIter()}")
print(f"  GBT.maxDepth: {best_gbt.getMaxDepth()}")
print(f"  GBT.stepSize: {best_gbt.getStepSize():.3f}")
print(f"  GBT.subsamplingRate: {best_gbt.getSubsamplingRate():.2f}")
print(f"  GBT.maxBins: {best_gbt.getMaxBins()}")

# ============================================================
# PASO 6: Comparación con Baseline
# ============================================================

baseline_auc = 0.73  # AUC del modelo baseline (dado en el problema)
improvement = ((best_auc_cv - baseline_auc) / baseline_auc) * 100

print(f"\n{'='*60}")
print("COMPARACIÓN CON BASELINE")
print(f"{'='*60}")
print(f"Baseline AUC: {baseline_auc:.4f}")
print(f"Mejor AUC (CV): {best_auc_cv:.4f}")
print(f"Mejora absoluta: +{best_auc_cv - baseline_auc:.4f}")
print(f"Mejora relativa: +{improvement:.1f}%")

if best_auc_cv >= 0.85:
    print("\n🎉 ¡META ALCANZADA! AUC ≥ 0.85")
elif best_auc_cv >= 0.80:
    print("\n✓ Buen resultado, AUC ≥ 0.80 (cerca de la meta)")
else:
    print("\n⚠ AUC por debajo de 0.80, considera:")
    print("   - Ingeniería de features adicional")
    print("   - Probar otros modelos (Random Forest, XGBoost)")
    print("   - Ampliar grid de búsqueda")

# ============================================================
# PASO 7: Limpiar cache
# ============================================================

df_train_cached.unpersist()
print("\n✓ Cache limpiado")

Optimizaciones Avanzadas para Producción

  • 1.
    Early Stopping Condicional: Monitorear métricas en fold 1-2, abortar configs claramente malas.
  • 2.
    Adaptive Parallelism: Ajustar paralelismo según uso de recursos (empezar con parallelism=20, reducir si hay OOM).
  • 3.
    Stratified Folds: Garantizar distribución balanceada de churn en cada fold (especialmente importante con desbalance 20%).
  • 4.
    Guardar Checkpoints: Serializar cvModel cada 50 configs para no perder progreso en caso de fallo.
  • 5.
    TrainValidationSplit Inicial: Hacer exploración rápida con TVS (1 fold), luego refinar top-10 con CV (5 folds).

Criterios de Éxito - Fase 3

  • CrossValidator ejecuta sin errores
  • Tiempo total < 4 horas (con paralelización)
  • Mejor AUC (CV) > Baseline (0.73)
  • Hiperparámetros óptimos extraídos correctamente

Fase 4: Análisis de Resultados y Validación Final

Objetivo de la Fase

Analizar exhaustivamente los resultados del tuning, identificar patrones de sensibilidad, validar el modelo final en test set, y documentar insights para producción.

Análisis Completo de Resultados

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# ============================================================
# 1. EXTRAER TODOS LOS RESULTADOS
# ============================================================

results = []
for idx, (params, avg_auc) in enumerate(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics)):
    config = {
        "config_id": idx,
        "avg_auc": avg_auc,
        "pca_k": params[pca.k],
        "gbt_maxIter": params[gbt.maxIter],
        "gbt_maxDepth": params[gbt.maxDepth],
        "gbt_stepSize": params[gbt.stepSize],
        "gbt_subsamplingRate": params[gbt.subsamplingRate],
        "gbt_maxBins": params[gbt.maxBins]
    }
    results.append(config)

results_df = pd.DataFrame(results)

print("="*60)
print("ANÁLISIS DE RESULTADOS - ESTADÍSTICAS GENERALES")
print("="*60)
print(f"\nTotal configuraciones evaluadas: {len(results_df)}")
print(f"\nEstadísticas de AUC-ROC:")
print(results_df["avg_auc"].describe())

# ============================================================
# 2. TOP 10 CONFIGURACIONES
# ============================================================

print("\n" + "="*60)
print("TOP 10 CONFIGURACIONES")
print("="*60)
top10 = results_df.nlargest(10, "avg_auc")
print(top10.to_string(index=False))

# ============================================================
# 3. ANÁLISIS DE SENSIBILIDAD POR HIPERPARÁMETRO
# ============================================================

print("\n" + "="*60)
print("SENSIBILIDAD POR HIPERPARÁMETRO")
print("="*60)

for param in ["pca_k", "gbt_maxIter", "gbt_maxDepth", "gbt_stepSize",
              "gbt_subsamplingRate", "gbt_maxBins"]:
    print(f"\n{param}:")
    sensitivity = results_df.groupby(param)["avg_auc"].agg(["mean", "std", "min", "max", "count"])
    print(sensitivity.sort_values("mean", ascending=False))

# Ejemplo Output:
#              mean       std       min       max    count
# gbt_stepSize
# 0.10         0.8456    0.0123    0.8123    0.8623    96
# 0.05         0.8389    0.0145    0.8045    0.8598    96
# 0.20         0.8312    0.0167    0.7989    0.8534    96
# ↑ stepSize=0.10 tiene mejor promedio

# ============================================================
# 4. IDENTIFICAR HIPERPARÁMETROS CRÍTICOS
# ============================================================

print("\n" + "="*60)
print("RANKING DE IMPORTANCIA (por varianza explicada)")
print("="*60)

# Calcular varianza de AUC agrupado por cada hiperparámetro
importance = {}
for param in ["pca_k", "gbt_maxIter", "gbt_maxDepth", "gbt_stepSize",
              "gbt_subsamplingRate", "gbt_maxBins"]:
    group_means = results_df.groupby(param)["avg_auc"].mean()
    variance_explained = group_means.var()
    importance[param] = variance_explained

importance_df = pd.DataFrame(list(importance.items()), columns=["param", "variance"])
importance_df = importance_df.sort_values("variance", ascending=False)

print(importance_df.to_string(index=False))

# Ejemplo Output:
#              param     variance
#      gbt_maxDepth    0.001234  ← Más importante
#       gbt_stepSize   0.000987
#         pca_k         0.000654
#      gbt_maxIter     0.000432
# gbt_subsamplingRate  0.000234
#       gbt_maxBins    0.000123  ← Menos importante

print("\nInterpretación:")
top_param = importance_df.iloc[0]["param"]
print(f"  - Hiperparámetro más crítico: {top_param}")
print(f"  - Tunear este parámetro tiene mayor impacto en rendimiento")

# ============================================================
# 5. VISUALIZACIÓN: HEATMAP DE INTERACCIONES
# ============================================================

# Heatmap: gbt_maxDepth vs gbt_stepSize (los 2 más importantes)
pivot = results_df.pivot_table(
    values="avg_auc",
    index="gbt_maxDepth",
    columns="gbt_stepSize",
    aggfunc="mean"
)

plt.figure(figsize=(10, 8))
sns.heatmap(pivot, annot=True, fmt=".4f", cmap="RdYlGn", vmin=0.70, vmax=0.90,
            cbar_kws={"label": "AUC-ROC"})
plt.title("Heatmap: Interacción gbt_maxDepth vs gbt_stepSize")
plt.xlabel("gbt_stepSize (learning rate)")
plt.ylabel("gbt_maxDepth")
plt.tight_layout()
plt.savefig("heatmap_depth_stepsize.png", dpi=150)
print("\n✓ Heatmap guardado: heatmap_depth_stepsize.png")

# ============================================================
# 6. DISTRIBUCIÓN DE MÉTRICAS
# ============================================================

plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.hist(results_df["avg_auc"], bins=30, color="#a855f7", alpha=0.7, edgecolor="black")
plt.axvline(baseline_auc, color="red", linestyle="--", linewidth=2, label=f"Baseline: {baseline_auc:.4f}")
plt.axvline(best_auc_cv, color="green", linestyle="--", linewidth=2, label=f"Best: {best_auc_cv:.4f}")
plt.xlabel("AUC-ROC (Cross-Validation)")
plt.ylabel("Frecuencia")
plt.title("Distribución de Rendimiento: Todas las Configuraciones")
plt.legend()
plt.grid(alpha=0.3)

plt.subplot(1, 2, 2)
results_df.boxplot(column="avg_auc", by="gbt_maxDepth", ax=plt.gca())
plt.title("Impacto de gbt_maxDepth en AUC")
plt.suptitle("")
plt.xlabel("gbt_maxDepth")
plt.ylabel("AUC-ROC")

plt.tight_layout()
plt.savefig("distribution_analysis.png", dpi=150)
print("✓ Análisis de distribución guardado: distribution_analysis.png")

# ============================================================
# 7. VALIDACIÓN FINAL EN TEST SET
# ============================================================

print("\n" + "="*60)
print("VALIDACIÓN FINAL EN TEST SET")
print("="*60)

# Aplicar pipeline completo al test set
predictions_test = bestPipeline.transform(df_test)

# Evaluar AUC en test
test_auc = evaluator.evaluate(predictions_test)

print(f"AUC-ROC (Test Set): {test_auc:.4f}")
print(f"AUC-ROC (CV): {best_auc_cv:.4f}")

# Verificar overfitting
gap = abs(test_auc - best_auc_cv)
relative_gap = (gap / best_auc_cv) * 100

print(f"\nGap CV-Test: {gap:.4f} ({relative_gap:.1f}%)")

if gap < 0.02:
    print("✓ Generalización excelente (gap < 2%)")
elif gap < 0.05:
    print("✓ Generalización aceptable (gap < 5%)")
else:
    print("⚠ Posible overfitting (gap ≥ 5%)")
    print("  Considera mayor regularización o más datos")

# ============================================================
# 8. MÉTRICAS ADICIONALES EN TEST
# ============================================================

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Accuracy
acc_evaluator = MulticlassClassificationEvaluator(
    labelCol="churn",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = acc_evaluator.evaluate(predictions_test)

# F1-Score
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="churn",
    predictionCol="prediction",
    metricName="f1"
)
f1_score = f1_evaluator.evaluate(predictions_test)

print("\n" + "="*60)
print("MÉTRICAS ADICIONALES (TEST SET)")
print("="*60)
print(f"Accuracy: {accuracy:.4f}")
print(f"F1-Score: {f1_score:.4f}")
print(f"AUC-ROC: {test_auc:.4f}")

# ============================================================
# 9. MATRIZ DE CONFUSIÓN
# ============================================================

from sklearn.metrics import confusion_matrix

# Convertir a Pandas para usar sklearn
predictions_pd = predictions_test.select("churn", "prediction").toPandas()
cm = confusion_matrix(predictions_pd["churn"], predictions_pd["prediction"])

print("\nMatriz de Confusión:")
print(cm)
print("\nInterpretación:")
print(f"  True Negatives (no churn correcto): {cm[0, 0]}")
print(f"  False Positives (predijo churn, no era): {cm[0, 1]}")
print(f"  False Negatives (no predijo churn, sí era): {cm[1, 0]}")
print(f"  True Positives (churn correcto): {cm[1, 1]}")

precision = cm[1, 1] / (cm[1, 1] + cm[0, 1])
recall = cm[1, 1] / (cm[1, 1] + cm[1, 0])

print(f"\nPrecision (de churners detectados): {precision:.4f}")
print(f"Recall (cobertura de churners): {recall:.4f}")

# ============================================================
# 10. CONCLUSIONES Y RECOMENDACIONES
# ============================================================

print("\n" + "="*60)
print("CONCLUSIONES Y RECOMENDACIONES")
print("="*60)

print("\n1. Rendimiento General:")
if test_auc >= 0.85:
    print("   ✓ Modelo alcanza la meta (AUC ≥ 0.85)")
    print("   ✓ Listo para despliegue en producción")
else:
    print(f"   • AUC = {test_auc:.4f} (meta: 0.85)")
    print("   • Posibles mejoras: feature engineering, ensemble methods")

print("\n2. Hiperparámetros Críticos:")
print(f"   • Más importante: {importance_df.iloc[0]['param']}")
print("   • Revisar estos primero en futuras iteraciones")

print("\n3. Generalización:")
if gap < 0.03:
    print("   ✓ Modelo generaliza bien (gap CV-Test < 3%)")
else:
    print(f"   ⚠ Gap CV-Test = {gap:.4f}, considerar más regularización")

print("\n4. Desbalance de Clases:")
print(f"   • Precision: {precision:.4f}, Recall: {recall:.4f}")
if recall < 0.70:
    print("   ⚠ Recall bajo, muchos churners no detectados")
    print("   • Considerar ajustar threshold o usar SMOTE para balanceo")
else:
    print("   ✓ Recall aceptable")

print("\n5. Próximos Pasos:")
print("   1. Guardar modelo final: bestPipeline.save('hdfs://path/to/model')")
print("   2. Documentar hiperparámetros óptimos")
print("   3. Configurar monitoreo de drift en producción")
print("   4. A/B testing vs modelo baseline")

Guardar Modelo para Producción

# Guardar pipeline completo (incluye todos los transformadores + modelo)
model_path = "hdfs://cluster/models/churn_pipeline_v1"
bestPipeline.write().overwrite().save(model_path)
print(f"✓ Modelo guardado en: {model_path}")

# Cargar en producción
from pyspark.ml import PipelineModel
loaded_pipeline = PipelineModel.load(model_path)

# Usar para inferencia
new_predictions = loaded_pipeline.transform(df_new_customers)

Criterios de Éxito - Fase 4

  • Test AUC > 0.80 (mejora ≥ 10% sobre baseline 0.73)
  • Gap CV-Test < 5% (sin overfitting significativo)
  • Análisis de sensibilidad completado e interpretado
  • Visualizaciones generadas (heatmaps, distribuciones)
  • Modelo final guardado y documentado

Criterios de Evaluación del Reto

Tu solución será evaluada según los siguientes criterios cuantitativos y cualitativos:

Métricas Cuantitativas (60%)

1. AUC-ROC en Test 30 pts
  • • AUC ≥ 0.85: 30 pts (excelente)
  • • AUC 0.80-0.85: 20 pts (bueno)
  • • AUC 0.75-0.80: 10 pts (aceptable)
  • • AUC < 0.75: 0 pts
2. Mejora sobre Baseline 15 pts
  • • Mejora ≥ 15%: 15 pts
  • • Mejora 10-15%: 10 pts
  • • Mejora 5-10%: 5 pts
3. Generalización (Gap CV-Test) 15 pts
  • • Gap < 3%: 15 pts
  • • Gap 3-5%: 10 pts
  • • Gap > 5%: 5 pts

Criterios Cualitativos (40%)

4. Diseño del Grid 10 pts
  • • ≥6 hiperparámetros tunedos
  • • Valores justificados (log/linear)
  • • Presupuesto respetado
5. Análisis de Resultados 15 pts
  • • Sensibilidad por hiperparámetro
  • • Visualizaciones interpretables
  • • Insights documentados
6. Calidad del Código 15 pts
  • • Código limpio y comentado
  • • Optimizaciones aplicadas
  • • Reproducibilidad (seeds)
  • • Modelo guardado correctamente

Calificación Final

90-100

Excelente

75-89

Bueno

60-74

Aceptable

Anterior: ParamGrid & Evaluadores Volver al Hub