1. Patrones de Deployment
Tres Paradigmas de Inferencia
El patrón de deployment depende de los requisitos de latencia y volumen:
Batch Inference
Procesa grandes volúmenes en lotes programados (diario, horario)
Características:
- • Latencia: minutos/horas
- • Throughput: millones/batch
- • Costo: bajo (spot instances)
Casos de uso:
- • Email campaigns
- • Scoring de clientes
- • ETL con predicciones
Real-time API
Responde a requests individuales con latencia ultra-baja
Características:
- • Latencia: <100ms (p95)
- • Throughput: 1K-10K req/s
- • Costo: alto (always-on)
Casos de uso:
- • Fraud detection
- • Recomendaciones web
- • Credit scoring
Streaming
Procesa flujos continuos de eventos en tiempo real
Características:
- • Latencia: segundos
- • Throughput: 100K+ evt/s
- • Costo: medio
Casos de uso:
- • IoT sensor analysis
- • Real-time analytics
- • Clickstream processing
Código: Batch Inference con Spark
import mlflow
from pyspark.sql import SparkSession
# ============================================================
# PASO 1: Cargar modelo desde registry
# ============================================================
model_name = "churn_predictor"
model_stage = "Production"
# Cargar modelo actual en producción
model_uri = f"models:/{model_name}/{model_stage}"
pipeline_model = mlflow.spark.load_model(model_uri)
print(f"✓ Modelo cargado: {model_name} ({model_stage})")
# ============================================================
# PASO 2: Cargar datos para scoring (batch diario)
# ============================================================
spark = SparkSession.builder.appName("ChurnBatchScoring").getOrCreate()
# Leer datos del día actual
df_to_score = spark.read.parquet("s3://data-lake/customers/daily/2024-01-15/")
print(f"Clientes a procesar: {df_to_score.count():,}")
# ============================================================
# PASO 3: Aplicar modelo (inferencia distribuida)
# ============================================================
predictions = pipeline_model.transform(df_to_score)
# Seleccionar columnas relevantes
output = predictions.select(
"customer_id",
"prediction", # 0 o 1
"probability", # Vector de probabilidades [P(no_churn), P(churn)]
"rawPrediction"
)
# Extraer probabilidad de churn (índice 1 del vector)
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
def extract_prob_churn(probability_vector):
return float(probability_vector[1])
extract_prob_udf = udf(extract_prob_churn, DoubleType())
output = output.withColumn(
"churn_probability",
extract_prob_udf(col("probability"))
)
# ============================================================
# PASO 4: Aplicar lógica de negocio
# ============================================================
# Segmentar clientes por riesgo
from pyspark.sql.functions import when
output = output.withColumn(
"risk_segment",
when(col("churn_probability") >= 0.7, "HIGH")
.when(col("churn_probability") >= 0.4, "MEDIUM")
.otherwise("LOW")
)
# ============================================================
# PASO 5: Guardar resultados
# ============================================================
# Guardar en S3 para consumo por sistemas downstream
output.select("customer_id", "churn_probability", "risk_segment") \
.write \
.mode("overwrite") \
.partitionBy("risk_segment") \
.parquet("s3://predictions/churn/2024-01-15/")
print("✓ Predicciones guardadas en S3")
# Guardar en base de datos para dashboards
output.select("customer_id", "churn_probability", "risk_segment") \
.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://db.company.com:5432/analytics") \
.option("dbtable", "churn_scores") \
.option("user", "ml_service") \
.option("password", "***") \
.mode("overwrite") \
.save()
print("✓ Predicciones cargadas a PostgreSQL")
# ============================================================
# PASO 6: Métricas de monitoreo
# ============================================================
# Contar predicciones por segmento
segment_counts = output.groupBy("risk_segment").count()
segment_counts.show()
# Output:
# +------------+-------+
# |risk_segment| count|
# +------------+-------+
# | HIGH| 12345|
# | MEDIUM| 45678|
# | LOW| 234567|
# +------------+-------+
# Logear métricas a sistema de monitoreo (ej: Prometheus)
high_risk_count = segment_counts.filter(col("risk_segment") == "HIGH").first()["count"]
medium_risk_count = segment_counts.filter(col("risk_segment") == "MEDIUM").first()["count"]
print(f"Clientes de alto riesgo: {high_risk_count:,} ({high_risk_count/df_to_score.count()*100:.1f}%)")
2. Data Drift Detection
Detección Estadística de Cambios en Distribución
Data Drift ocurre cuando la distribución de features en producción diverge de la distribución de entrenamiento:
Aunque el modelo siga siendo correcto matemáticamente, sus predicciones pueden degradarse si el drift es significativo.
Test Kolmogorov-Smirnov (KS)
Para features numéricas: mide distancia máxima entre CDFs
Interpretación: DKS ∈ [0, 1], valores >0.3 indican drift severo
Population Stability Index (PSI)
Para features categóricas: compara proporciones por bins
Umbrales:
• PSI < 0.1: Sin drift (estable)
• 0.1 ≤ PSI < 0.25: Drift moderado (investigar)
• PSI ≥ 0.25: Drift severo (reentrenar)
Código: Detección de Drift
from scipy.stats import ks_2samp
import numpy as np
import pandas as pd
# ============================================================
# FUNCIÓN: Calcular PSI
# ============================================================
def calculate_psi(expected, actual, bins=10):
"""
Calculate Population Stability Index
Args:
expected: array de distribución de referencia (training)
actual: array de distribución actual (production)
bins: número de bins para discretización
Returns:
PSI value
"""
# Discretizar en bins
breakpoints = np.linspace(0, 100, bins + 1)
breakpoints = np.percentile(expected, breakpoints)
expected_percents = np.histogram(expected, breakpoints)[0] / len(expected)
actual_percents = np.histogram(actual, breakpoints)[0] / len(actual)
# Agregar pequeño valor para evitar log(0)
expected_percents = np.where(expected_percents == 0, 0.0001, expected_percents)
actual_percents = np.where(actual_percents == 0, 0.0001, actual_percents)
# Calcular PSI
psi_values = (actual_percents - expected_percents) * np.log(actual_percents / expected_percents)
psi = np.sum(psi_values)
return psi
# ============================================================
# MONITOREO DE DRIFT: FEATURES NUMÉRICAS
# ============================================================
# Supongamos df_train (referencia) y df_prod (producción)
numeric_features = [
"age", "tenure", "monthly_charges", "total_charges",
"num_services", "data_usage_gb"
]
drift_report = []
for feature in numeric_features:
# Extraer distribuciones
train_dist = df_train.select(feature).toPandas()[feature].dropna()
prod_dist = df_prod.select(feature).toPandas()[feature].dropna()
# Test Kolmogorov-Smirnov
ks_statistic, p_value = ks_2samp(train_dist, prod_dist)
# Population Stability Index
psi = calculate_psi(train_dist.values, prod_dist.values)
# Comparar estadísticas descriptivas
mean_diff = (prod_dist.mean() - train_dist.mean()) / train_dist.mean() * 100
std_diff = (prod_dist.std() - train_dist.std()) / train_dist.std() * 100
# Clasificar drift
if psi >= 0.25 or p_value < 0.01:
drift_level = "SEVERE"
elif psi >= 0.1 or p_value < 0.05:
drift_level = "MODERATE"
else:
drift_level = "STABLE"
drift_report.append({
"feature": feature,
"ks_statistic": ks_statistic,
"p_value": p_value,
"psi": psi,
"mean_change_pct": mean_diff,
"std_change_pct": std_diff,
"drift_level": drift_level
})
# Convertir a DataFrame para visualización
drift_df = pd.DataFrame(drift_report)
print("="*80)
print("REPORTE DE DATA DRIFT")
print("="*80)
print(drift_df.to_string(index=False))
# Output:
# feature ks_statistic p_value psi mean_change_pct std_change_pct drift_level
# age 0.145 0.001 0.089 +2.3 +1.2 STABLE
# tenure 0.234 0.000 0.178 +8.7 +5.4 MODERATE
# monthly_charges 0.312 0.000 0.289 +15.2 +12.1 SEVERE
# total_charges 0.089 0.123 0.045 -1.2 -0.8 STABLE
# num_services 0.198 0.000 0.145 +6.5 +4.2 MODERATE
# data_usage_gb 0.421 0.000 0.456 +32.1 +28.9 SEVERE
# ============================================================
# ALERTAS AUTOMÁTICAS
# ============================================================
severe_drifts = drift_df[drift_df["drift_level"] == "SEVERE"]
if len(severe_drifts) > 0:
print("\n" + "="*80)
print("⚠ ALERTA: DRIFT SEVERO DETECTADO")
print("="*80)
print(f"Features afectadas: {len(severe_drifts)}")
print(severe_drifts[["feature", "psi", "mean_change_pct"]].to_string(index=False))
print("\n📧 Enviando notificación a equipo de ML...")
print("🔄 Reentrenamiento recomendado")
# Aquí se integraría con sistema de alertas (PagerDuty, Slack, email)
# send_alert(
# channel="ml-alerts",
# message=f"Drift severo en {len(severe_drifts)} features",
# severity="high"
# )
Best Practices para Monitoreo de Drift
- 1. Frecuencia: Monitorear diariamente/semanalmente según velocidad de cambio esperada
- 2. Baseline móvil: Actualizar distribución de referencia trimestralmente
- 3. Feature importance: Priorizar drift en features más importantes del modelo
- 4. Umbrales adaptativos: Ajustar thresholds según contexto de negocio
- 5. Visualización: Dashboards con distribuciones train vs prod superpuestas
3. Model Drift: Degradación de Performance
Concept Drift y Monitoreo de Métricas
Model Drift (o Concept Drift) ocurre cuando la relación entre features y target cambia:
Código: Monitoreo de Performance en Producción
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col, lit, current_timestamp
import matplotlib.pyplot as plt
import pandas as pd
# ============================================================
# FUNCIÓN: Calcular métricas de performance
# ============================================================
def calculate_performance_metrics(predictions_df):
"""Calcula métricas completas de clasificación"""
evaluator_auc = BinaryClassificationEvaluator(
labelCol="churn_actual",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
auc_roc = evaluator_auc.evaluate(predictions_df)
evaluator_pr = BinaryClassificationEvaluator(
labelCol="churn_actual",
metricName="areaUnderPR"
)
auc_pr = evaluator_pr.evaluate(predictions_df)
# Calcular precision, recall, F1 manualmente
from sklearn.metrics import precision_score, recall_score, f1_score
pred_pd = predictions_df.select("churn_actual", "prediction").toPandas()
precision = precision_score(pred_pd["churn_actual"], pred_pd["prediction"])
recall = recall_score(pred_pd["churn_actual"], pred_pd["prediction"])
f1 = f1_score(pred_pd["churn_actual"], pred_pd["prediction"])
return {
"auc_roc": auc_roc,
"auc_pr": auc_pr,
"precision": precision,
"recall": recall,
"f1_score": f1
}
# ============================================================
# MONITOREO CONTINUO: Comparar vs labels reales
# ============================================================
# En producción, los labels reales llegan con delay (ej: 1 semana después)
# Una vez disponibles, comparar predicciones con ground truth
# Cargar predicciones de la semana pasada
predictions_last_week = spark.read.parquet("s3://predictions/churn/2024-01-08/")
# Cargar labels reales que ahora están disponibles
actuals = spark.read.parquet("s3://data-lake/churn-actuals/2024-01-08/")
# Join predictions con actuals
df_with_labels = predictions_last_week.join(
actuals.select("customer_id", col("churn").alias("churn_actual")),
on="customer_id",
how="inner"
)
# Calcular métricas actuales
current_metrics = calculate_performance_metrics(df_with_labels)
print("Métricas de la última semana:")
for metric, value in current_metrics.items():
print(f" {metric}: {value:.4f}")
# ============================================================
# COMPARAR CON BASELINE (métricas de training)
# ============================================================
baseline_metrics = {
"auc_roc": 0.8523, # Del entrenamiento
"auc_pr": 0.8012,
"precision": 0.7845,
"recall": 0.8234,
"f1_score": 0.8035
}
print("\n" + "="*80)
print("COMPARACIÓN CON BASELINE")
print("="*80)
alerts = []
for metric in baseline_metrics.keys():
current = current_metrics[metric]
baseline = baseline_metrics[metric]
diff = current - baseline
pct_change = (diff / baseline) * 100
print(f"{metric}:")
print(f" Baseline: {baseline:.4f}")
print(f" Current: {current:.4f}")
print(f" Change: {diff:+.4f} ({pct_change:+.1f}%)")
# Detectar degradación > 5%
if pct_change < -5:
alerts.append({
"metric": metric,
"degradation_pct": abs(pct_change),
"current": current,
"baseline": baseline
})
print()
# ============================================================
# ALERTAS DE DEGRADACIÓN
# ============================================================
if len(alerts) > 0:
print("="*80)
print("⚠ ALERTA: DEGRADACIÓN DE MODELO DETECTADA")
print("="*80)
print(f"Métricas degradadas: {len(alerts)}")
for alert in alerts:
print(f" - {alert['metric']}: -{alert['degradation_pct']:.1f}% "
f"({alert['baseline']:.4f} → {alert['current']:.4f})")
print("\n🔄 Acción recomendada: REENTRENAR MODELO")
print("📊 Revisar causas: data drift, concept drift, outliers")
# Trigger retraining pipeline
# trigger_retraining_job(model_name="churn_predictor", reason="performance_degradation")
# ============================================================
# GUARDAR MÉTRICAS HISTÓRICAS PARA TENDENCIAS
# ============================================================
metrics_history = pd.DataFrame([{
"timestamp": "2024-01-15",
"week": "2024-W02",
**current_metrics
}])
# Append a histórico en database
metrics_history.to_sql(
name="model_performance_history",
con=db_connection,
if_exists="append",
index=False
)
print("✓ Métricas guardadas en histórico")