Desafío: Optimización End-to-End
En este reto final, construirás y optimizarás un pipeline completo de machine learning desde feature engineering hasta el modelo predictivo. Tunearás múltiples stages simultáneamente, analizarás los resultados, y validarás la mejora sobre un baseline.
Contexto del Problema
Trabajas en una empresa de telecomunicaciones que quiere predecir churn (abandono de clientes). Tienes un dataset con 100,000 clientes y 20 features (demográficos, uso de servicios, facturación). Tu objetivo: construir el mejor modelo posible optimizando TODO el pipeline.
Dataset
- • N = 100,000 clientes
- • 15 features numéricas
- • 5 features categóricas
- • Target: churn (0/1)
- • Desbalance: 20% churn
Métrica Objetivo
- • AUC-ROC (principal)
- • Precision@20% (secundaria)
- • Baseline: AUC = 0.73
- • Meta: AUC ≥ 0.85
Restricciones
- • Presupuesto: 200 configs
- • Tiempo: < 4 horas
- • Inferencia: < 100ms
- • Interpretabilidad: Media
Fase 1: Construir Pipeline Base
Objetivo de la Fase
Construir un pipeline robusto con preprocesamiento de features numéricas y categóricas, seguido de un modelo clasificador. Este pipeline será la base para el tuning.
Arquitectura del Pipeline
graph LR
A[Raw Data] --> B[StringIndexer
Categóricas]
B --> C[OneHotEncoder
Sparse Vectors]
C --> D[VectorAssembler
Features + Encoded]
D --> E[StandardScaler
μ=0, σ=1]
E --> F[PCA
Reducción Dim]
F --> G[GBTClassifier
Gradient Boosting]
G --> H[Predictions]
style A fill:#9333ea,stroke:#a855f7,color:#fff
style B fill:#3b82f6,stroke:#60a5fa,color:#fff
style C fill:#3b82f6,stroke:#60a5fa,color:#fff
style D fill:#10b981,stroke:#34d399,color:#fff
style E fill:#10b981,stroke:#34d399,color:#fff
style F fill:#f59e0b,stroke:#fbbf24,color:#000
style G fill:#ef4444,stroke:#f87171,color:#fff
style H fill:#8b5cf6,stroke:#a78bfa,color:#fff
Pipeline de 7 stages: preprocesamiento categórico → ensamblaje → escalado → PCA → modelo
Código: Construcción del Pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, PCA
from pyspark.ml.classification import GBTClassifier
# ============================================================
# DATOS DE ENTRADA
# ============================================================
# df_train con columnas:
# - Numéricas: age, tenure, monthly_charges, total_charges, ... (15 features)
# - Categóricas: contract, payment_method, internet_service, ... (5 features)
# - Target: churn (0 o 1)
numeric_features = [
"age", "tenure", "monthly_charges", "total_charges",
"num_services", "avg_call_duration", "data_usage_gb",
"num_support_tickets", "avg_bill_amount", "credit_score",
"num_devices", "streaming_hours", "roaming_minutes",
"voicemail_messages", "international_calls"
] # 15 features numéricas
categorical_features = [
"contract", "payment_method", "internet_service",
"device_type", "region"
] # 5 features categóricas
# ============================================================
# STAGE 1-2: Procesar features categóricas
# ============================================================
# StringIndexer: convertir strings a índices numéricos
indexers = [
StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep")
for col in categorical_features
]
# OneHotEncoder: convertir índices a vectores sparse one-hot
encoder = OneHotEncoder(
inputCols=[f"{col}_index" for col in categorical_features],
outputCols=[f"{col}_encoded" for col in categorical_features],
dropLast=True # Evitar multicolinealidad
)
# ============================================================
# STAGE 3: Ensamblar todas las features
# ============================================================
assembler = VectorAssembler(
inputCols=numeric_features + [f"{col}_encoded" for col in categorical_features],
outputCol="raw_features",
handleInvalid="skip" # Saltar filas con NaN
)
# ============================================================
# STAGE 4: Estandarizar features numéricas
# ============================================================
scaler = StandardScaler(
inputCol="raw_features",
outputCol="scaled_features",
withMean=True, # Centrar en 0
withStd=True # Escalar a std=1
)
# ============================================================
# STAGE 5: PCA para reducción dimensional (opcional pero tuneable)
# ============================================================
pca = PCA(
inputCol="scaled_features",
outputCol="features"
# k se definirá en el grid de tuning
)
# ============================================================
# STAGE 6: Modelo Gradient Boosting Trees
# ============================================================
gbt = GBTClassifier(
featuresCol="features",
labelCol="churn",
seed=42,
predictionCol="prediction",
rawPredictionCol="rawPrediction",
probabilityCol="probability"
)
# ============================================================
# CONSTRUIR PIPELINE COMPLETO
# ============================================================
pipeline_stages = indexers + [encoder, assembler, scaler, pca, gbt]
pipeline = Pipeline(stages=pipeline_stages)
print(f"Pipeline construido con {len(pipeline_stages)} stages")
print("Stages:")
for idx, stage in enumerate(pipeline_stages):
print(f" {idx+1}. {stage.__class__.__name__}")
# Output:
# Pipeline construido con 10 stages
# Stages:
# 1. StringIndexer
# 2. StringIndexer
# 3. StringIndexer
# 4. StringIndexer
# 5. StringIndexer
# 6. OneHotEncoder
# 7. VectorAssembler
# 8. StandardScaler
# 9. PCA
# 10. GBTClassifier
Criterios de Éxito - Fase 1
- ✓ Pipeline compila sin errores
- ✓ Maneja features numéricas y categóricas correctamente
- ✓ Incluye al menos 3 stages tuneables (PCA, GBT params)
- ✓ Preparado para ser pasado a CrossValidator
Fase 2: Definir Grid de Hiperparámetros
Objetivo de la Fase
Diseñar un grid inteligente que explore el espacio de hiperparámetros de manera eficiente. Debes tunear al menos 6 hiperparámetros de diferentes stages del pipeline, balanceando exploración y costo computacional.
Estrategia: Coarse-to-Fine Grid Search
Con presupuesto limitado (200 configs), usa estrategia de 2 rondas:
- 1. Grid grueso: Pocos valores (3 por dim), amplio rango → identificar región prometedora (~80 configs)
- 2. Grid fino: Más valores (4-5 por dim), rango estrecho alrededor del mejor → refinar (~120 configs)
Para el reto, implementaremos directamente un grid inteligente que balancea ambas estrategias.
Código: Grid Multidimensional
from pyspark.ml.tuning import ParamGridBuilder
# ============================================================
# ESTRATEGIA DE SELECCIÓN DE HIPERPARÁMETROS
# ============================================================
# Hiperparámetros a tunear (6 total):
# 1. PCA.k: Dimensionalidad reducida (stage 5)
# 2. GBT.maxIter: Número de árboles (stage 6)
# 3. GBT.maxDepth: Profundidad máxima de árboles (stage 6)
# 4. GBT.stepSize: Learning rate (stage 6)
# 5. GBT.subsamplingRate: Fracción de datos por iteración (stage 6)
# 6. GBT.maxBins: Bins para discretización (stage 6)
# ============================================================
# CONSTRUCCIÓN DEL GRID
# ============================================================
paramGrid = ParamGridBuilder() \
.addGrid(pca.k, [10, 20, 30, 40]) \
.addGrid(gbt.maxIter, [50, 100, 150]) \
.addGrid(gbt.maxDepth, [4, 5, 6, 7]) \
.addGrid(gbt.stepSize, [0.05, 0.1, 0.2]) \
.addGrid(gbt.subsamplingRate, [0.7, 0.85, 1.0]) \
.addGrid(gbt.maxBins, [32, 64]) \
.build()
# ============================================================
# ANÁLISIS DEL ESPACIO DE BÚSQUEDA
# ============================================================
grid_size = len(paramGrid)
print(f"Tamaño del grid: {grid_size} configuraciones")
print(f"Cálculo: 4 (k) × 3 (maxIter) × 4 (maxDepth) × 3 (stepSize) × 3 (subsamplingRate) × 2 (maxBins)")
print(f" = {4*3*4*3*3*2} configuraciones")
# Con 5-fold CV: 288 configs × 5 folds = 1,440 entrenamientos
print(f"\nCon 5-fold CV: {grid_size} × 5 = {grid_size * 5} entrenamientos totales")
# ============================================================
# JUSTIFICACIÓN DE CADA HIPERPARÁMETRO
# ============================================================
print("\n========== JUSTIFICACIÓN DE VALORES ==========\n")
print("1. PCA.k (dimensionalidad):")
print(" Valores: [10, 20, 30, 40]")
print(" Razón: Dataset tiene ~50 features post-encoding.")
print(" Explorar reducción agresiva (k=10, 80% reducción)")
print(" hasta conservadora (k=40, 20% reducción)")
print(" Escala: Lineal (enteros)")
print("\n2. GBT.maxIter (número de árboles):")
print(" Valores: [50, 100, 150]")
print(" Razón: Más árboles → mayor capacidad pero riesgo de overfitting")
print(" 50 árboles: baseline rápido")
print(" 150 árboles: máximo poder representacional")
print(" Escala: Lineal")
print("\n3. GBT.maxDepth (profundidad de árboles):")
print(" Valores: [4, 5, 6, 7]")
print(" Razón: Controla complejidad de cada árbol individual")
print(" depth=4: árboles simples (16 hojas max)")
print(" depth=7: árboles complejos (128 hojas max)")
print(" Escala: Lineal (exponencial en hojas: 2^depth)")
print("\n4. GBT.stepSize (learning rate):")
print(" Valores: [0.05, 0.1, 0.2]")
print(" Razón: Controla agresividad del boosting")
print(" 0.05: conservador, requiere más árboles")
print(" 0.2: agresivo, puede divergir")
print(" Escala: Logarítmica parcial")
print("\n5. GBT.subsamplingRate (fracción de datos):")
print(" Valores: [0.7, 0.85, 1.0]")
print(" Razón: Stochastic gradient boosting para regularización")
print(" 1.0: sin subsampling (determinístico)")
print(" 0.7: 30% de datos descartados por iteración")
print(" Escala: Lineal en [0.5, 1.0]")
print("\n6. GBT.maxBins (discretización):")
print(" Valores: [32, 64]")
print(" Razón: Granularidad de splits en features continuas")
print(" Más bins → mayor precisión pero más costo")
print(" 32 bins suficiente para mayoría de casos")
print(" Escala: Potencias de 2")
# ============================================================
# VERIFICAR PRESUPUESTO
# ============================================================
budget = 200 # Presupuesto máximo de configs
if grid_size <= budget:
print(f"\n✓ Grid dentro del presupuesto: {grid_size} ≤ {budget}")
else:
print(f"\n✗ Grid excede presupuesto: {grid_size} > {budget}")
print(" Considera reducir valores o usar Random Search")
# ============================================================
# ESTIMACIÓN DE TIEMPO
# ============================================================
# Supongamos cada entrenamiento toma 2 minutos en promedio
time_per_training = 2 # minutos
total_trainings = grid_size * 5 # 5-fold CV
total_time_sequential = total_trainings * time_per_training # minutos
# Con paralelización (supongamos 10 executors)
parallelism = 10
total_time_parallel = total_time_sequential / parallelism
print(f"\n========== ESTIMACIÓN DE TIEMPO ==========")
print(f"Tiempo por entrenamiento: {time_per_training} min")
print(f"Total entrenamientos: {total_trainings}")
print(f"Tiempo secuencial: {total_time_sequential} min ({total_time_sequential/60:.1f} horas)")
print(f"Tiempo con {parallelism} executors: {total_time_parallel} min ({total_time_parallel/60:.1f} horas)")
if total_time_parallel <= 240: # 4 horas
print(f"✓ Tiempo estimado dentro de restricción (< 4 horas)")
else:
print(f"✗ Tiempo estimado excede restricción (> 4 horas)")
print(" Considera TrainValidationSplit o reducir grid")
Alternativa: Random Search para Mayor Exploración
Si quieres explorar más dimensiones con el mismo presupuesto, implementa Random Search:
import numpy as np
# Generar 200 configuraciones aleatorias
num_configs = 200
random_params = []
for _ in range(num_configs):
config = ParamMap()
config[pca.k] = np.random.choice([10, 15, 20, 25, 30, 35, 40])
config[gbt.maxIter] = np.random.choice([30, 50, 75, 100, 150, 200])
config[gbt.maxDepth] = np.random.randint(3, 8) # [3, 7]
config[gbt.stepSize] = np.random.choice([0.01, 0.05, 0.1, 0.15, 0.2, 0.3])
config[gbt.subsamplingRate] = np.random.uniform(0.6, 1.0)
config[gbt.maxBins] = np.random.choice([16, 32, 64, 128])
random_params.append(config)
# Usar en CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=random_params, ...)
Random Search explora 200 configs en espacio continuo/discreto mixto, mejor coverage que grid regular.
Criterios de Éxito - Fase 2
- ✓ Grid explora al menos 6 hiperparámetros diferentes
- ✓ Tamaño del grid ≤ 300 configuraciones
- ✓ Valores seleccionados con justificación (escala log/linear apropiada)
- ✓ Estimación de tiempo computacional < 4 horas
Fase 3: Ejecutar Cross-Validation y Estrategia de Búsqueda
Objetivo de la Fase
Configurar y ejecutar CrossValidator con estrategia eficiente. Implementar paralelización, caching, y monitoreo para optimizar el tiempo de ejecución.
Código: CrossValidator Optimizado
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import time
# ============================================================
# PASO 1: Configurar Evaluador
# ============================================================
# Métrica principal: AUC-ROC
evaluator = BinaryClassificationEvaluator(
labelCol="churn",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC" # Maximizar AUC
)
# ============================================================
# PASO 2: Configurar CrossValidator
# ============================================================
cv = CrossValidator(
estimator=pipeline, # Pipeline completo (10 stages)
estimatorParamMaps=paramGrid, # Grid de 288 configuraciones
evaluator=evaluator, # AUC-ROC
numFolds=5, # K=5 folds (balance sesgo-varianza)
parallelism=10, # 10 configs en paralelo
seed=42, # Reproducibilidad
collectSubModels=False # No guardar sub-modelos (ahorra memoria)
)
print("CrossValidator configurado:")
print(f" - Configuraciones: {len(paramGrid)}")
print(f" - Folds: 5")
print(f" - Total entrenamientos: {len(paramGrid) * 5}")
print(f" - Paralelismo: 10")
# ============================================================
# PASO 3: Optimizaciones Pre-Entrenamiento
# ============================================================
# 3.1. Cachear dataset en memoria para evitar recomputación
df_train_cached = df_train.cache()
df_train_cached.count() # Forzar materialización
print(f"\n✓ Dataset cacheado: {df_train_cached.count()} filas")
# 3.2. Repartir dataset para balancear carga
optimal_partitions = 200 # Regla: ~10K filas por partición
df_train_cached = df_train_cached.repartition(optimal_partitions)
print(f"✓ Dataset reparticionado: {optimal_partitions} particiones")
# 3.3. Checkpoint para tolerancia a fallos (opcional)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setCheckpointDir("/tmp/spark-checkpoint")
# df_train_cached = df_train_cached.checkpoint()
# ============================================================
# PASO 4: Ejecutar Grid Search (¡el momento de la verdad!)
# ============================================================
print("\n" + "="*60)
print("INICIANDO GRID SEARCH CON CROSS-VALIDATION")
print("="*60)
print(f"Hora de inicio: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
start_time = time.time()
# ENTRENAR (esto puede tomar varias horas)
cvModel = cv.fit(df_train_cached)
end_time = time.time()
elapsed_time = (end_time - start_time) / 60 # minutos
print("\n" + "="*60)
print("GRID SEARCH COMPLETADO")
print("="*60)
print(f"Hora de fin: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Tiempo total: {elapsed_time:.1f} minutos ({elapsed_time/60:.2f} horas)")
print(f"Tiempo promedio por entrenamiento: {elapsed_time / (len(paramGrid) * 5):.2f} min")
# ============================================================
# PASO 5: Extraer Mejor Modelo
# ============================================================
bestPipeline = cvModel.bestModel
best_auc_cv = max(cvModel.avgMetrics)
print(f"\n{'='*60}")
print("MEJOR MODELO ENCONTRADO")
print(f"{'='*60}")
print(f"AUC-ROC (Cross-Validation): {best_auc_cv:.4f}")
# Extraer hiperparámetros óptimos del pipeline
best_pca = bestPipeline.stages[-2] # PCA es penúltimo stage
best_gbt = bestPipeline.stages[-1] # GBT es último stage
print("\nHiperparámetros óptimos:")
print(f" PCA.k (componentes): {best_pca.getK()}")
print(f" GBT.maxIter (árboles): {best_gbt.getMaxIter()}")
print(f" GBT.maxDepth: {best_gbt.getMaxDepth()}")
print(f" GBT.stepSize: {best_gbt.getStepSize():.3f}")
print(f" GBT.subsamplingRate: {best_gbt.getSubsamplingRate():.2f}")
print(f" GBT.maxBins: {best_gbt.getMaxBins()}")
# ============================================================
# PASO 6: Comparación con Baseline
# ============================================================
baseline_auc = 0.73 # AUC del modelo baseline (dado en el problema)
improvement = ((best_auc_cv - baseline_auc) / baseline_auc) * 100
print(f"\n{'='*60}")
print("COMPARACIÓN CON BASELINE")
print(f"{'='*60}")
print(f"Baseline AUC: {baseline_auc:.4f}")
print(f"Mejor AUC (CV): {best_auc_cv:.4f}")
print(f"Mejora absoluta: +{best_auc_cv - baseline_auc:.4f}")
print(f"Mejora relativa: +{improvement:.1f}%")
if best_auc_cv >= 0.85:
print("\n🎉 ¡META ALCANZADA! AUC ≥ 0.85")
elif best_auc_cv >= 0.80:
print("\n✓ Buen resultado, AUC ≥ 0.80 (cerca de la meta)")
else:
print("\n⚠ AUC por debajo de 0.80, considera:")
print(" - Ingeniería de features adicional")
print(" - Probar otros modelos (Random Forest, XGBoost)")
print(" - Ampliar grid de búsqueda")
# ============================================================
# PASO 7: Limpiar cache
# ============================================================
df_train_cached.unpersist()
print("\n✓ Cache limpiado")
Optimizaciones Avanzadas para Producción
-
1.
Early Stopping Condicional: Monitorear métricas en fold 1-2, abortar configs claramente malas.
-
2.
Adaptive Parallelism: Ajustar paralelismo según uso de recursos (empezar con parallelism=20, reducir si hay OOM).
-
3.
Stratified Folds: Garantizar distribución balanceada de churn en cada fold (especialmente importante con desbalance 20%).
-
4.
Guardar Checkpoints: Serializar cvModel cada 50 configs para no perder progreso en caso de fallo.
-
5.
TrainValidationSplit Inicial: Hacer exploración rápida con TVS (1 fold), luego refinar top-10 con CV (5 folds).
Criterios de Éxito - Fase 3
- ✓ CrossValidator ejecuta sin errores
- ✓ Tiempo total < 4 horas (con paralelización)
- ✓ Mejor AUC (CV) > Baseline (0.73)
- ✓ Hiperparámetros óptimos extraídos correctamente
Fase 4: Análisis de Resultados y Validación Final
Objetivo de la Fase
Analizar exhaustivamente los resultados del tuning, identificar patrones de sensibilidad, validar el modelo final en test set, y documentar insights para producción.
Análisis Completo de Resultados
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
# ============================================================
# 1. EXTRAER TODOS LOS RESULTADOS
# ============================================================
results = []
for idx, (params, avg_auc) in enumerate(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics)):
config = {
"config_id": idx,
"avg_auc": avg_auc,
"pca_k": params[pca.k],
"gbt_maxIter": params[gbt.maxIter],
"gbt_maxDepth": params[gbt.maxDepth],
"gbt_stepSize": params[gbt.stepSize],
"gbt_subsamplingRate": params[gbt.subsamplingRate],
"gbt_maxBins": params[gbt.maxBins]
}
results.append(config)
results_df = pd.DataFrame(results)
print("="*60)
print("ANÁLISIS DE RESULTADOS - ESTADÍSTICAS GENERALES")
print("="*60)
print(f"\nTotal configuraciones evaluadas: {len(results_df)}")
print(f"\nEstadísticas de AUC-ROC:")
print(results_df["avg_auc"].describe())
# ============================================================
# 2. TOP 10 CONFIGURACIONES
# ============================================================
print("\n" + "="*60)
print("TOP 10 CONFIGURACIONES")
print("="*60)
top10 = results_df.nlargest(10, "avg_auc")
print(top10.to_string(index=False))
# ============================================================
# 3. ANÁLISIS DE SENSIBILIDAD POR HIPERPARÁMETRO
# ============================================================
print("\n" + "="*60)
print("SENSIBILIDAD POR HIPERPARÁMETRO")
print("="*60)
for param in ["pca_k", "gbt_maxIter", "gbt_maxDepth", "gbt_stepSize",
"gbt_subsamplingRate", "gbt_maxBins"]:
print(f"\n{param}:")
sensitivity = results_df.groupby(param)["avg_auc"].agg(["mean", "std", "min", "max", "count"])
print(sensitivity.sort_values("mean", ascending=False))
# Ejemplo Output:
# mean std min max count
# gbt_stepSize
# 0.10 0.8456 0.0123 0.8123 0.8623 96
# 0.05 0.8389 0.0145 0.8045 0.8598 96
# 0.20 0.8312 0.0167 0.7989 0.8534 96
# ↑ stepSize=0.10 tiene mejor promedio
# ============================================================
# 4. IDENTIFICAR HIPERPARÁMETROS CRÍTICOS
# ============================================================
print("\n" + "="*60)
print("RANKING DE IMPORTANCIA (por varianza explicada)")
print("="*60)
# Calcular varianza de AUC agrupado por cada hiperparámetro
importance = {}
for param in ["pca_k", "gbt_maxIter", "gbt_maxDepth", "gbt_stepSize",
"gbt_subsamplingRate", "gbt_maxBins"]:
group_means = results_df.groupby(param)["avg_auc"].mean()
variance_explained = group_means.var()
importance[param] = variance_explained
importance_df = pd.DataFrame(list(importance.items()), columns=["param", "variance"])
importance_df = importance_df.sort_values("variance", ascending=False)
print(importance_df.to_string(index=False))
# Ejemplo Output:
# param variance
# gbt_maxDepth 0.001234 ← Más importante
# gbt_stepSize 0.000987
# pca_k 0.000654
# gbt_maxIter 0.000432
# gbt_subsamplingRate 0.000234
# gbt_maxBins 0.000123 ← Menos importante
print("\nInterpretación:")
top_param = importance_df.iloc[0]["param"]
print(f" - Hiperparámetro más crítico: {top_param}")
print(f" - Tunear este parámetro tiene mayor impacto en rendimiento")
# ============================================================
# 5. VISUALIZACIÓN: HEATMAP DE INTERACCIONES
# ============================================================
# Heatmap: gbt_maxDepth vs gbt_stepSize (los 2 más importantes)
pivot = results_df.pivot_table(
values="avg_auc",
index="gbt_maxDepth",
columns="gbt_stepSize",
aggfunc="mean"
)
plt.figure(figsize=(10, 8))
sns.heatmap(pivot, annot=True, fmt=".4f", cmap="RdYlGn", vmin=0.70, vmax=0.90,
cbar_kws={"label": "AUC-ROC"})
plt.title("Heatmap: Interacción gbt_maxDepth vs gbt_stepSize")
plt.xlabel("gbt_stepSize (learning rate)")
plt.ylabel("gbt_maxDepth")
plt.tight_layout()
plt.savefig("heatmap_depth_stepsize.png", dpi=150)
print("\n✓ Heatmap guardado: heatmap_depth_stepsize.png")
# ============================================================
# 6. DISTRIBUCIÓN DE MÉTRICAS
# ============================================================
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.hist(results_df["avg_auc"], bins=30, color="#a855f7", alpha=0.7, edgecolor="black")
plt.axvline(baseline_auc, color="red", linestyle="--", linewidth=2, label=f"Baseline: {baseline_auc:.4f}")
plt.axvline(best_auc_cv, color="green", linestyle="--", linewidth=2, label=f"Best: {best_auc_cv:.4f}")
plt.xlabel("AUC-ROC (Cross-Validation)")
plt.ylabel("Frecuencia")
plt.title("Distribución de Rendimiento: Todas las Configuraciones")
plt.legend()
plt.grid(alpha=0.3)
plt.subplot(1, 2, 2)
results_df.boxplot(column="avg_auc", by="gbt_maxDepth", ax=plt.gca())
plt.title("Impacto de gbt_maxDepth en AUC")
plt.suptitle("")
plt.xlabel("gbt_maxDepth")
plt.ylabel("AUC-ROC")
plt.tight_layout()
plt.savefig("distribution_analysis.png", dpi=150)
print("✓ Análisis de distribución guardado: distribution_analysis.png")
# ============================================================
# 7. VALIDACIÓN FINAL EN TEST SET
# ============================================================
print("\n" + "="*60)
print("VALIDACIÓN FINAL EN TEST SET")
print("="*60)
# Aplicar pipeline completo al test set
predictions_test = bestPipeline.transform(df_test)
# Evaluar AUC en test
test_auc = evaluator.evaluate(predictions_test)
print(f"AUC-ROC (Test Set): {test_auc:.4f}")
print(f"AUC-ROC (CV): {best_auc_cv:.4f}")
# Verificar overfitting
gap = abs(test_auc - best_auc_cv)
relative_gap = (gap / best_auc_cv) * 100
print(f"\nGap CV-Test: {gap:.4f} ({relative_gap:.1f}%)")
if gap < 0.02:
print("✓ Generalización excelente (gap < 2%)")
elif gap < 0.05:
print("✓ Generalización aceptable (gap < 5%)")
else:
print("⚠ Posible overfitting (gap ≥ 5%)")
print(" Considera mayor regularización o más datos")
# ============================================================
# 8. MÉTRICAS ADICIONALES EN TEST
# ============================================================
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Accuracy
acc_evaluator = MulticlassClassificationEvaluator(
labelCol="churn",
predictionCol="prediction",
metricName="accuracy"
)
accuracy = acc_evaluator.evaluate(predictions_test)
# F1-Score
f1_evaluator = MulticlassClassificationEvaluator(
labelCol="churn",
predictionCol="prediction",
metricName="f1"
)
f1_score = f1_evaluator.evaluate(predictions_test)
print("\n" + "="*60)
print("MÉTRICAS ADICIONALES (TEST SET)")
print("="*60)
print(f"Accuracy: {accuracy:.4f}")
print(f"F1-Score: {f1_score:.4f}")
print(f"AUC-ROC: {test_auc:.4f}")
# ============================================================
# 9. MATRIZ DE CONFUSIÓN
# ============================================================
from sklearn.metrics import confusion_matrix
# Convertir a Pandas para usar sklearn
predictions_pd = predictions_test.select("churn", "prediction").toPandas()
cm = confusion_matrix(predictions_pd["churn"], predictions_pd["prediction"])
print("\nMatriz de Confusión:")
print(cm)
print("\nInterpretación:")
print(f" True Negatives (no churn correcto): {cm[0, 0]}")
print(f" False Positives (predijo churn, no era): {cm[0, 1]}")
print(f" False Negatives (no predijo churn, sí era): {cm[1, 0]}")
print(f" True Positives (churn correcto): {cm[1, 1]}")
precision = cm[1, 1] / (cm[1, 1] + cm[0, 1])
recall = cm[1, 1] / (cm[1, 1] + cm[1, 0])
print(f"\nPrecision (de churners detectados): {precision:.4f}")
print(f"Recall (cobertura de churners): {recall:.4f}")
# ============================================================
# 10. CONCLUSIONES Y RECOMENDACIONES
# ============================================================
print("\n" + "="*60)
print("CONCLUSIONES Y RECOMENDACIONES")
print("="*60)
print("\n1. Rendimiento General:")
if test_auc >= 0.85:
print(" ✓ Modelo alcanza la meta (AUC ≥ 0.85)")
print(" ✓ Listo para despliegue en producción")
else:
print(f" • AUC = {test_auc:.4f} (meta: 0.85)")
print(" • Posibles mejoras: feature engineering, ensemble methods")
print("\n2. Hiperparámetros Críticos:")
print(f" • Más importante: {importance_df.iloc[0]['param']}")
print(" • Revisar estos primero en futuras iteraciones")
print("\n3. Generalización:")
if gap < 0.03:
print(" ✓ Modelo generaliza bien (gap CV-Test < 3%)")
else:
print(f" ⚠ Gap CV-Test = {gap:.4f}, considerar más regularización")
print("\n4. Desbalance de Clases:")
print(f" • Precision: {precision:.4f}, Recall: {recall:.4f}")
if recall < 0.70:
print(" ⚠ Recall bajo, muchos churners no detectados")
print(" • Considerar ajustar threshold o usar SMOTE para balanceo")
else:
print(" ✓ Recall aceptable")
print("\n5. Próximos Pasos:")
print(" 1. Guardar modelo final: bestPipeline.save('hdfs://path/to/model')")
print(" 2. Documentar hiperparámetros óptimos")
print(" 3. Configurar monitoreo de drift en producción")
print(" 4. A/B testing vs modelo baseline")
Guardar Modelo para Producción
# Guardar pipeline completo (incluye todos los transformadores + modelo)
model_path = "hdfs://cluster/models/churn_pipeline_v1"
bestPipeline.write().overwrite().save(model_path)
print(f"✓ Modelo guardado en: {model_path}")
# Cargar en producción
from pyspark.ml import PipelineModel
loaded_pipeline = PipelineModel.load(model_path)
# Usar para inferencia
new_predictions = loaded_pipeline.transform(df_new_customers)
Criterios de Éxito - Fase 4
- ✓ Test AUC > 0.80 (mejora ≥ 10% sobre baseline 0.73)
- ✓ Gap CV-Test < 5% (sin overfitting significativo)
- ✓ Análisis de sensibilidad completado e interpretado
- ✓ Visualizaciones generadas (heatmaps, distribuciones)
- ✓ Modelo final guardado y documentado
Criterios de Evaluación del Reto
Tu solución será evaluada según los siguientes criterios cuantitativos y cualitativos:
Métricas Cuantitativas (60%)
- • AUC ≥ 0.85: 30 pts (excelente)
- • AUC 0.80-0.85: 20 pts (bueno)
- • AUC 0.75-0.80: 10 pts (aceptable)
- • AUC < 0.75: 0 pts
- • Mejora ≥ 15%: 15 pts
- • Mejora 10-15%: 10 pts
- • Mejora 5-10%: 5 pts
- • Gap < 3%: 15 pts
- • Gap 3-5%: 10 pts
- • Gap > 5%: 5 pts
Criterios Cualitativos (40%)
- • ≥6 hiperparámetros tunedos
- • Valores justificados (log/linear)
- • Presupuesto respetado
- • Sensibilidad por hiperparámetro
- • Visualizaciones interpretables
- • Insights documentados
- • Código limpio y comentado
- • Optimizaciones aplicadas
- • Reproducibilidad (seeds)
- • Modelo guardado correctamente
Calificación Final
90-100
Excelente
75-89
Bueno
60-74
Aceptable