Deployment y Monitoreo

Patrones de Despliegue y Detección de Drift

Volver

1. Patrones de Deployment

Tres Paradigmas de Inferencia

El patrón de deployment depende de los requisitos de latencia y volumen:

Batch Inference

Procesa grandes volúmenes en lotes programados (diario, horario)

Características:

  • • Latencia: minutos/horas
  • • Throughput: millones/batch
  • • Costo: bajo (spot instances)

Casos de uso:

  • • Email campaigns
  • • Scoring de clientes
  • • ETL con predicciones
Real-time API

Responde a requests individuales con latencia ultra-baja

Características:

  • • Latencia: <100ms (p95)
  • • Throughput: 1K-10K req/s
  • • Costo: alto (always-on)

Casos de uso:

  • • Fraud detection
  • • Recomendaciones web
  • • Credit scoring
Streaming

Procesa flujos continuos de eventos en tiempo real

Características:

  • • Latencia: segundos
  • • Throughput: 100K+ evt/s
  • • Costo: medio

Casos de uso:

  • • IoT sensor analysis
  • • Real-time analytics
  • • Clickstream processing

Código: Batch Inference con Spark

import mlflow
from pyspark.sql import SparkSession

# ============================================================
# PASO 1: Cargar modelo desde registry
# ============================================================
model_name = "churn_predictor"
model_stage = "Production"

# Cargar modelo actual en producción
model_uri = f"models:/{model_name}/{model_stage}"
pipeline_model = mlflow.spark.load_model(model_uri)

print(f"✓ Modelo cargado: {model_name} ({model_stage})")

# ============================================================
# PASO 2: Cargar datos para scoring (batch diario)
# ============================================================
spark = SparkSession.builder.appName("ChurnBatchScoring").getOrCreate()

# Leer datos del día actual
df_to_score = spark.read.parquet("s3://data-lake/customers/daily/2024-01-15/")

print(f"Clientes a procesar: {df_to_score.count():,}")

# ============================================================
# PASO 3: Aplicar modelo (inferencia distribuida)
# ============================================================
predictions = pipeline_model.transform(df_to_score)

# Seleccionar columnas relevantes
output = predictions.select(
    "customer_id",
    "prediction",           # 0 o 1
    "probability",          # Vector de probabilidades [P(no_churn), P(churn)]
    "rawPrediction"
)

# Extraer probabilidad de churn (índice 1 del vector)
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType

def extract_prob_churn(probability_vector):
    return float(probability_vector[1])

extract_prob_udf = udf(extract_prob_churn, DoubleType())

output = output.withColumn(
    "churn_probability",
    extract_prob_udf(col("probability"))
)

# ============================================================
# PASO 4: Aplicar lógica de negocio
# ============================================================
# Segmentar clientes por riesgo
from pyspark.sql.functions import when

output = output.withColumn(
    "risk_segment",
    when(col("churn_probability") >= 0.7, "HIGH")
    .when(col("churn_probability") >= 0.4, "MEDIUM")
    .otherwise("LOW")
)

# ============================================================
# PASO 5: Guardar resultados
# ============================================================
# Guardar en S3 para consumo por sistemas downstream
output.select("customer_id", "churn_probability", "risk_segment") \
    .write \
    .mode("overwrite") \
    .partitionBy("risk_segment") \
    .parquet("s3://predictions/churn/2024-01-15/")

print("✓ Predicciones guardadas en S3")

# Guardar en base de datos para dashboards
output.select("customer_id", "churn_probability", "risk_segment") \
    .write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://db.company.com:5432/analytics") \
    .option("dbtable", "churn_scores") \
    .option("user", "ml_service") \
    .option("password", "***") \
    .mode("overwrite") \
    .save()

print("✓ Predicciones cargadas a PostgreSQL")

# ============================================================
# PASO 6: Métricas de monitoreo
# ============================================================
# Contar predicciones por segmento
segment_counts = output.groupBy("risk_segment").count()
segment_counts.show()

# Output:
# +------------+-------+
# |risk_segment|  count|
# +------------+-------+
# |        HIGH|  12345|
# |      MEDIUM|  45678|
# |         LOW| 234567|
# +------------+-------+

# Logear métricas a sistema de monitoreo (ej: Prometheus)
high_risk_count = segment_counts.filter(col("risk_segment") == "HIGH").first()["count"]
medium_risk_count = segment_counts.filter(col("risk_segment") == "MEDIUM").first()["count"]

print(f"Clientes de alto riesgo: {high_risk_count:,} ({high_risk_count/df_to_score.count()*100:.1f}%)")

2. Data Drift Detection

Detección Estadística de Cambios en Distribución

Data Drift ocurre cuando la distribución de features en producción diverge de la distribución de entrenamiento:

$$ P_{\text{train}}(X) \neq P_{\text{prod}}(X, t) \quad \Rightarrow \quad \text{Data Drift} $$

Aunque el modelo siga siendo correcto matemáticamente, sus predicciones pueden degradarse si el drift es significativo.

Test Kolmogorov-Smirnov (KS)

Para features numéricas: mide distancia máxima entre CDFs

$$ \begin{aligned} D_{KS} &= \sup_x |F_{\text{train}}(x) - F_{\text{prod}}(x)| \\[0.5em] H_0 &: \text{Misma distribución} \\ H_1 &: \text{Drift detectado si } p\text{-value} < 0.05 \end{aligned} $$

Interpretación: DKS ∈ [0, 1], valores >0.3 indican drift severo

Population Stability Index (PSI)

Para features categóricas: compara proporciones por bins

$$ PSI = \sum_{i=1}^{n} (P_{\text{actual},i} - P_{\text{expected},i}) \times \ln\left(\frac{P_{\text{actual},i}}{P_{\text{expected},i}}\right) $$

Umbrales:
• PSI < 0.1: Sin drift (estable)
• 0.1 ≤ PSI < 0.25: Drift moderado (investigar)
• PSI ≥ 0.25: Drift severo (reentrenar)

Código: Detección de Drift

from scipy.stats import ks_2samp
import numpy as np
import pandas as pd

# ============================================================
# FUNCIÓN: Calcular PSI
# ============================================================
def calculate_psi(expected, actual, bins=10):
    """
    Calculate Population Stability Index

    Args:
        expected: array de distribución de referencia (training)
        actual: array de distribución actual (production)
        bins: número de bins para discretización

    Returns:
        PSI value
    """
    # Discretizar en bins
    breakpoints = np.linspace(0, 100, bins + 1)
    breakpoints = np.percentile(expected, breakpoints)

    expected_percents = np.histogram(expected, breakpoints)[0] / len(expected)
    actual_percents = np.histogram(actual, breakpoints)[0] / len(actual)

    # Agregar pequeño valor para evitar log(0)
    expected_percents = np.where(expected_percents == 0, 0.0001, expected_percents)
    actual_percents = np.where(actual_percents == 0, 0.0001, actual_percents)

    # Calcular PSI
    psi_values = (actual_percents - expected_percents) * np.log(actual_percents / expected_percents)
    psi = np.sum(psi_values)

    return psi

# ============================================================
# MONITOREO DE DRIFT: FEATURES NUMÉRICAS
# ============================================================
# Supongamos df_train (referencia) y df_prod (producción)

numeric_features = [
    "age", "tenure", "monthly_charges", "total_charges",
    "num_services", "data_usage_gb"
]

drift_report = []

for feature in numeric_features:
    # Extraer distribuciones
    train_dist = df_train.select(feature).toPandas()[feature].dropna()
    prod_dist = df_prod.select(feature).toPandas()[feature].dropna()

    # Test Kolmogorov-Smirnov
    ks_statistic, p_value = ks_2samp(train_dist, prod_dist)

    # Population Stability Index
    psi = calculate_psi(train_dist.values, prod_dist.values)

    # Comparar estadísticas descriptivas
    mean_diff = (prod_dist.mean() - train_dist.mean()) / train_dist.mean() * 100
    std_diff = (prod_dist.std() - train_dist.std()) / train_dist.std() * 100

    # Clasificar drift
    if psi >= 0.25 or p_value < 0.01:
        drift_level = "SEVERE"
    elif psi >= 0.1 or p_value < 0.05:
        drift_level = "MODERATE"
    else:
        drift_level = "STABLE"

    drift_report.append({
        "feature": feature,
        "ks_statistic": ks_statistic,
        "p_value": p_value,
        "psi": psi,
        "mean_change_pct": mean_diff,
        "std_change_pct": std_diff,
        "drift_level": drift_level
    })

# Convertir a DataFrame para visualización
drift_df = pd.DataFrame(drift_report)

print("="*80)
print("REPORTE DE DATA DRIFT")
print("="*80)
print(drift_df.to_string(index=False))

# Output:
#          feature  ks_statistic  p_value    psi  mean_change_pct  std_change_pct drift_level
#              age         0.145    0.001  0.089             +2.3            +1.2      STABLE
#           tenure         0.234    0.000  0.178             +8.7            +5.4    MODERATE
#  monthly_charges         0.312    0.000  0.289            +15.2           +12.1      SEVERE
#    total_charges         0.089    0.123  0.045             -1.2            -0.8      STABLE
#     num_services         0.198    0.000  0.145             +6.5            +4.2    MODERATE
#    data_usage_gb         0.421    0.000  0.456            +32.1           +28.9      SEVERE

# ============================================================
# ALERTAS AUTOMÁTICAS
# ============================================================
severe_drifts = drift_df[drift_df["drift_level"] == "SEVERE"]

if len(severe_drifts) > 0:
    print("\n" + "="*80)
    print("⚠ ALERTA: DRIFT SEVERO DETECTADO")
    print("="*80)
    print(f"Features afectadas: {len(severe_drifts)}")
    print(severe_drifts[["feature", "psi", "mean_change_pct"]].to_string(index=False))
    print("\n📧 Enviando notificación a equipo de ML...")
    print("🔄 Reentrenamiento recomendado")

    # Aquí se integraría con sistema de alertas (PagerDuty, Slack, email)
    # send_alert(
    #     channel="ml-alerts",
    #     message=f"Drift severo en {len(severe_drifts)} features",
    #     severity="high"
    # )

Best Practices para Monitoreo de Drift

  • 1. Frecuencia: Monitorear diariamente/semanalmente según velocidad de cambio esperada
  • 2. Baseline móvil: Actualizar distribución de referencia trimestralmente
  • 3. Feature importance: Priorizar drift en features más importantes del modelo
  • 4. Umbrales adaptativos: Ajustar thresholds según contexto de negocio
  • 5. Visualización: Dashboards con distribuciones train vs prod superpuestas

3. Model Drift: Degradación de Performance

Concept Drift y Monitoreo de Métricas

Model Drift (o Concept Drift) ocurre cuando la relación entre features y target cambia:

$$ \begin{aligned} &P_{\text{train}}(Y|X) \neq P_{\text{prod}}(Y|X, t) \quad \Rightarrow \quad \text{Model Drift} \\[0.5em] &\text{Manifestación: } \text{Accuracy}(t) < \text{Accuracy}_{\text{train}} - \epsilon \end{aligned} $$

Código: Monitoreo de Performance en Producción

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col, lit, current_timestamp
import matplotlib.pyplot as plt
import pandas as pd

# ============================================================
# FUNCIÓN: Calcular métricas de performance
# ============================================================
def calculate_performance_metrics(predictions_df):
    """Calcula métricas completas de clasificación"""

    evaluator_auc = BinaryClassificationEvaluator(
        labelCol="churn_actual",
        rawPredictionCol="rawPrediction",
        metricName="areaUnderROC"
    )
    auc_roc = evaluator_auc.evaluate(predictions_df)

    evaluator_pr = BinaryClassificationEvaluator(
        labelCol="churn_actual",
        metricName="areaUnderPR"
    )
    auc_pr = evaluator_pr.evaluate(predictions_df)

    # Calcular precision, recall, F1 manualmente
    from sklearn.metrics import precision_score, recall_score, f1_score

    pred_pd = predictions_df.select("churn_actual", "prediction").toPandas()
    precision = precision_score(pred_pd["churn_actual"], pred_pd["prediction"])
    recall = recall_score(pred_pd["churn_actual"], pred_pd["prediction"])
    f1 = f1_score(pred_pd["churn_actual"], pred_pd["prediction"])

    return {
        "auc_roc": auc_roc,
        "auc_pr": auc_pr,
        "precision": precision,
        "recall": recall,
        "f1_score": f1
    }

# ============================================================
# MONITOREO CONTINUO: Comparar vs labels reales
# ============================================================
# En producción, los labels reales llegan con delay (ej: 1 semana después)
# Una vez disponibles, comparar predicciones con ground truth

# Cargar predicciones de la semana pasada
predictions_last_week = spark.read.parquet("s3://predictions/churn/2024-01-08/")

# Cargar labels reales que ahora están disponibles
actuals = spark.read.parquet("s3://data-lake/churn-actuals/2024-01-08/")

# Join predictions con actuals
df_with_labels = predictions_last_week.join(
    actuals.select("customer_id", col("churn").alias("churn_actual")),
    on="customer_id",
    how="inner"
)

# Calcular métricas actuales
current_metrics = calculate_performance_metrics(df_with_labels)

print("Métricas de la última semana:")
for metric, value in current_metrics.items():
    print(f"  {metric}: {value:.4f}")

# ============================================================
# COMPARAR CON BASELINE (métricas de training)
# ============================================================
baseline_metrics = {
    "auc_roc": 0.8523,    # Del entrenamiento
    "auc_pr": 0.8012,
    "precision": 0.7845,
    "recall": 0.8234,
    "f1_score": 0.8035
}

print("\n" + "="*80)
print("COMPARACIÓN CON BASELINE")
print("="*80)

alerts = []
for metric in baseline_metrics.keys():
    current = current_metrics[metric]
    baseline = baseline_metrics[metric]
    diff = current - baseline
    pct_change = (diff / baseline) * 100

    print(f"{metric}:")
    print(f"  Baseline: {baseline:.4f}")
    print(f"  Current:  {current:.4f}")
    print(f"  Change:   {diff:+.4f} ({pct_change:+.1f}%)")

    # Detectar degradación > 5%
    if pct_change < -5:
        alerts.append({
            "metric": metric,
            "degradation_pct": abs(pct_change),
            "current": current,
            "baseline": baseline
        })
    print()

# ============================================================
# ALERTAS DE DEGRADACIÓN
# ============================================================
if len(alerts) > 0:
    print("="*80)
    print("⚠ ALERTA: DEGRADACIÓN DE MODELO DETECTADA")
    print("="*80)
    print(f"Métricas degradadas: {len(alerts)}")
    for alert in alerts:
        print(f"  - {alert['metric']}: -{alert['degradation_pct']:.1f}% "
              f"({alert['baseline']:.4f} → {alert['current']:.4f})")
    print("\n🔄 Acción recomendada: REENTRENAR MODELO")
    print("📊 Revisar causas: data drift, concept drift, outliers")

    # Trigger retraining pipeline
    # trigger_retraining_job(model_name="churn_predictor", reason="performance_degradation")

# ============================================================
# GUARDAR MÉTRICAS HISTÓRICAS PARA TENDENCIAS
# ============================================================
metrics_history = pd.DataFrame([{
    "timestamp": "2024-01-15",
    "week": "2024-W02",
    **current_metrics
}])

# Append a histórico en database
metrics_history.to_sql(
    name="model_performance_history",
    con=db_connection,
    if_exists="append",
    index=False
)

print("✓ Métricas guardadas en histórico")
Anterior: Model Registry Siguiente: Reto CI/CD