Reto: CI/CD para ML

Pipeline Automatizado de Integración y Despliegue Continuo

Volver

Desafío: Pipeline CI/CD Completo para ML

Construye un pipeline end-to-end de Continuous Integration / Continuous Deployment que automatice testing, validación, deployment y rollback de modelos ML. El objetivo es garantizar que solo modelos de alta calidad lleguen a producción, con reversión automática si se detectan problemas.

Contexto del Problema

Eres el MLOps Engineer de una fintech que despliega modelos de detección de fraude. Actualmente el deployment es manual y tardío (1-2 semanas), lo que permite que fraudsters exploten patrones nuevos. Debes automatizar completamente el ciclo de vida para reducir time-to-production a <24 horas, manteniendo alta calidad y seguridad.

Requisitos

  • • Tests automatizados (unit + integration)
  • • Validación de modelo (AUC > threshold)
  • • Deployment canary (5% tráfico primero)
  • • Monitoreo post-deploy
  • • Rollback automático si falla

Métricas

  • • Time-to-production: <24h
  • • Test coverage: >80%
  • • Zero-downtime deployment
  • • Rollback time: <5 min
  • • AUC-ROC > 0.92

Stack Tecnológico

  • • GitHub Actions (CI/CD)
  • • MLflow (Model Registry)
  • • Spark ML (Entrenamiento)
  • • pytest (Testing)
  • • Prometheus (Monitoreo)

Fase 1: Continuous Integration - Testing Automatizado

Objetivo de la Fase

Implementar suite completa de tests que validen transformaciones de datos, schema, y lógica de preprocesamiento. Los tests deben ejecutarse automáticamente en cada commit.

Arquitectura del CI Pipeline

graph LR
    A[Git Push] --> B[GitHub Actions Trigger]
    B --> C[Setup Environment]
    C --> D[Install Dependencies]
    D --> E[Run Unit Tests]
    E --> F{Tests Pass?}
    F -->|No| G[❌ Fail Build]
    F -->|Yes| H[Run Integration Tests]
    H --> I{Tests Pass?}
    I -->|No| G
    I -->|Yes| J[Data Validation]
    J --> K{Schema Valid?}
    K -->|No| G
    K -->|Yes| L[✓ CI Success]
    L --> M[Trigger CD Pipeline]

    style A fill:#3b82f6,stroke:#60a5fa,color:#fff
    style G fill:#ef4444,stroke:#f87171,color:#fff
    style L fill:#10b981,stroke:#34d399,color:#fff
    style M fill:#f59e0b,stroke:#fbbf24,color:#000
                        

Código: Suite de Tests con pytest

# tests/test_transformations.py
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from ml_pipeline.transformations import (
    clean_data, encode_categorical, validate_schema
)

# ============================================================
# FIXTURE: Spark Session para tests
# ============================================================
@pytest.fixture(scope="session")
def spark():
    """Create Spark session for testing"""
    spark = SparkSession.builder \
        .master("local[2]") \
        .appName("ML_Tests") \
        .getOrCreate()
    yield spark
    spark.stop()

# ============================================================
# TESTS UNITARIOS: Transformaciones Individuales
# ============================================================

def test_clean_data_removes_nulls(spark):
    """Test que clean_data elimina filas con nulls en columnas críticas"""
    # Arrange: crear dataframe con nulls
    data = [
        (1, "john@email.com", 25, 100.0),
        (2, None, 30, 150.0),           # null en email
        (3, "jane@email.com", None, 200.0),  # null en age
        (4, "bob@email.com", 35, 250.0)
    ]
    schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("email", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("amount", DoubleType(), True)
    ])
    df = spark.createDataFrame(data, schema)

    # Act: aplicar limpieza
    df_cleaned = clean_data(df, required_cols=["email", "age"])

    # Assert: verificar que quedan solo 2 filas válidas
    assert df_cleaned.count() == 2, "Should remove rows with nulls"
    assert df_cleaned.filter("email IS NULL").count() == 0
    assert df_cleaned.filter("age IS NULL").count() == 0

def test_clean_data_removes_duplicates(spark):
    """Test que clean_data elimina duplicados"""
    data = [
        (1, "john@email.com", 25),
        (1, "john@email.com", 25),  # duplicado
        (2, "jane@email.com", 30)
    ]
    schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("email", StringType(), True),
        StructField("age", IntegerType(), True)
    ])
    df = spark.createDataFrame(data, schema)

    df_cleaned = clean_data(df, dedup_cols=["id", "email"])

    assert df_cleaned.count() == 2, "Should remove duplicates"

def test_encode_categorical_preserves_count(spark):
    """Test que encoding no pierde filas"""
    data = [
        (1, "premium"),
        (2, "basic"),
        (3, "premium"),
        (4, "enterprise")
    ]
    schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("plan_type", StringType(), True)
    ])
    df = spark.createDataFrame(data, schema)

    df_encoded = encode_categorical(df, categorical_cols=["plan_type"])

    assert df_encoded.count() == df.count(), "Encoding should preserve row count"

def test_encode_categorical_creates_correct_columns(spark):
    """Test que encoding crea columnas correctas"""
    data = [(1, "A"), (2, "B"), (3, "A")]
    schema = StructType([
        StructField("id", IntegerType()),
        StructField("category", StringType())
    ])
    df = spark.createDataFrame(data, schema)

    df_encoded = encode_categorical(df, categorical_cols=["category"])

    # Verificar que existen columnas index y encoded
    assert "category_index" in df_encoded.columns
    assert "category_encoded" in df_encoded.columns

def test_validate_schema_passes_correct_schema(spark):
    """Test que schema correcto pasa validación"""
    expected_schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("email", StringType(), False),
        StructField("age", IntegerType(), True)
    ])

    data = [(1, "test@email.com", 25)]
    df = spark.createDataFrame(data, expected_schema)

    # No debe lanzar excepción
    validate_schema(df, expected_schema)

def test_validate_schema_fails_missing_column(spark):
    """Test que schema incorrecto falla validación"""
    expected_schema = StructType([
        StructField("id", IntegerType()),
        StructField("email", StringType()),
        StructField("age", IntegerType())
    ])

    # DataFrame sin columna 'age'
    data = [(1, "test@email.com")]
    schema = StructType([
        StructField("id", IntegerType()),
        StructField("email", StringType())
    ])
    df = spark.createDataFrame(data, schema)

    with pytest.raises(ValueError, match="Missing columns"):
        validate_schema(df, expected_schema)

# ============================================================
# TESTS DE INTEGRACIÓN: Pipeline Completo
# ============================================================

def test_full_pipeline_end_to_end(spark):
    """Test del pipeline completo de preprocesamiento"""
    from ml_pipeline.pipeline import build_preprocessing_pipeline

    # Crear datos raw simulados
    data = [
        (1, "john@email.com", 25, "premium", 100.0),
        (2, "jane@email.com", 30, "basic", 50.0),
        (3, "bob@email.com", 35, "premium", 150.0)
    ]
    schema = StructType([
        StructField("id", IntegerType()),
        StructField("email", StringType()),
        StructField("age", IntegerType()),
        StructField("plan_type", StringType()),
        StructField("amount", DoubleType())
    ])
    df_raw = spark.createDataFrame(data, schema)

    # Construir y aplicar pipeline
    pipeline = build_preprocessing_pipeline()
    df_processed = pipeline.transform(df_raw)

    # Assertions sobre output
    assert df_processed.count() == 3, "Pipeline should preserve all valid rows"
    assert "features" in df_processed.columns, "Pipeline should create features column"

    # Verificar que features es un vector
    from pyspark.ml.linalg import VectorUDT
    features_type = df_processed.schema["features"].dataType
    assert isinstance(features_type, VectorUDT), "features should be VectorUDT"

# ============================================================
# TESTS DE PROPIEDADES: Invariantes del Sistema
# ============================================================

def test_transformation_is_idempotent(spark):
    """Test que aplicar transformación 2 veces da mismo resultado"""
    data = [(1, "test", 25)]
    schema = StructType([
        StructField("id", IntegerType()),
        StructField("name", StringType()),
        StructField("age", IntegerType())
    ])
    df = spark.createDataFrame(data, schema)

    df_once = clean_data(df, required_cols=["name"])
    df_twice = clean_data(df_once, required_cols=["name"])

    # Verificar idempotencia
    assert df_once.count() == df_twice.count()
    assert df_once.collect() == df_twice.collect()

# ============================================================
# CONFIGURACIÓN: pytest.ini
# ============================================================
# [pytest]
# testpaths = tests
# python_files = test_*.py
# python_functions = test_*
# markers =
#     unit: Unit tests
#     integration: Integration tests
#     slow: Slow running tests

GitHub Actions Workflow

# .github/workflows/ci.yml
name: ML Pipeline CI

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
    - name: Checkout code
      uses: actions/checkout@v3

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'

    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install pytest pytest-cov

    - name: Run unit tests
      run: |
        pytest tests/ -v --cov=ml_pipeline --cov-report=xml

    - name: Upload coverage
      uses: codecov/codecov-action@v3
      with:
        file: ./coverage.xml

    - name: Data validation
      run: |
        python scripts/validate_data_schema.py

    - name: Lint code
      run: |
        pip install flake8
        flake8 ml_pipeline/ --max-line-length=100

  build:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'

    steps:
    - name: Trigger CD pipeline
      run: echo "CI passed, triggering CD..."

Criterios de Éxito - Fase 1

  • Al menos 15 tests implementados (unit + integration)
  • Test coverage > 80%
  • Todos los tests pasan en CI
  • Schema validation implementada

Fase 2: Model Validation - Quality Gates

Objetivo de la Fase

Implementar validaciones automáticas que aseguren que solo modelos de alta calidad sean promovidos a producción. Incluye tests de performance, fairness, y A/B testing.

Código: Model Validation Script

# scripts/validate_model.py
import mlflow
from mlflow.tracking import MlflowClient
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import sys

def validate_model(model_name, new_version, test_data_path, thresholds):
    """
    Valida un modelo contra thresholds de performance

    Args:
        model_name: Nombre del modelo en registry
        new_version: Versión a validar
        test_data_path: Path a datos de test
        thresholds: Dict con métricas mínimas requeridas

    Returns:
        bool: True si modelo pasa validación
    """
    print(f"Validando modelo {model_name} v{new_version}...")

    # ============================================================
    # 1. CARGAR MODELO Y DATOS
    # ============================================================
    model_uri = f"models:/{model_name}/{new_version}"
    model = mlflow.spark.load_model(model_uri)

    df_test = spark.read.parquet(test_data_path)
    print(f"Test set size: {df_test.count():,} rows")

    # ============================================================
    # 2. GENERAR PREDICCIONES
    # ============================================================
    predictions = model.transform(df_test)

    # ============================================================
    # 3. CALCULAR MÉTRICAS
    # ============================================================
    evaluator_auc = BinaryClassificationEvaluator(
        labelCol="fraud_label",
        rawPredictionCol="rawPrediction",
        metricName="areaUnderROC"
    )
    auc_roc = evaluator_auc.evaluate(predictions)

    evaluator_pr = BinaryClassificationEvaluator(
        labelCol="fraud_label",
        metricName="areaUnderPR"
    )
    auc_pr = evaluator_pr.evaluate(predictions)

    from sklearn.metrics import precision_score, recall_score, f1_score
    pred_pd = predictions.select("fraud_label", "prediction").toPandas()
    precision = precision_score(pred_pd["fraud_label"], pred_pd["prediction"])
    recall = recall_score(pred_pd["fraud_label"], pred_pd["prediction"])
    f1 = f1_score(pred_pd["fraud_label"], pred_pd["prediction"])

    metrics = {
        "auc_roc": auc_roc,
        "auc_pr": auc_pr,
        "precision": precision,
        "recall": recall,
        "f1_score": f1
    }

    print("\n" + "="*60)
    print("MÉTRICAS DEL MODELO")
    print("="*60)
    for metric, value in metrics.items():
        print(f"{metric}: {value:.4f}")

    # ============================================================
    # 4. VALIDAR CONTRA THRESHOLDS
    # ============================================================
    validation_results = {}
    all_passed = True

    print("\n" + "="*60)
    print("VALIDACIÓN CONTRA THRESHOLDS")
    print("="*60)

    for metric, threshold in thresholds.items():
        actual_value = metrics.get(metric, 0)
        passed = actual_value >= threshold

        validation_results[metric] = {
            "threshold": threshold,
            "actual": actual_value,
            "passed": passed
        }

        status = "✓ PASS" if passed else "✗ FAIL"
        print(f"{metric}: {actual_value:.4f} >= {threshold:.4f} ? {status}")

        if not passed:
            all_passed = False

    # ============================================================
    # 5. COMPARAR CON MODELO EN PRODUCCIÓN
    # ============================================================
    client = MlflowClient()
    try:
        # Obtener modelo actual en producción
        prod_versions = client.get_latest_versions(model_name, stages=["Production"])
        if prod_versions:
            prod_version = prod_versions[0].version
            prod_model = mlflow.spark.load_model(f"models:/{model_name}/{prod_version}")

            # Generar predicciones con modelo de producción
            predictions_prod = prod_model.transform(df_test)
            auc_prod = evaluator_auc.evaluate(predictions_prod)

            print("\n" + "="*60)
            print("COMPARACIÓN CON PRODUCCIÓN")
            print("="*60)
            print(f"Producción (v{prod_version}): AUC = {auc_prod:.4f}")
            print(f"Nuevo (v{new_version}): AUC = {auc_roc:.4f}")

            improvement = ((auc_roc - auc_prod) / auc_prod) * 100
            print(f"Mejora: {improvement:+.2f}%")

            # Requerir al menos mejora del 1%
            if improvement < 1.0:
                print("⚠ Advertencia: Mejora menor al 1% sobre producción")
                # No bloquear, pero alertar
    except Exception as e:
        print(f"No se pudo comparar con producción: {e}")

    # ============================================================
    # 6. LOGEAR RESULTADOS EN MLFLOW
    # ============================================================
    with mlflow.start_run():
        for metric, value in metrics.items():
            mlflow.log_metric(f"validation_{metric}", value)

        mlflow.log_param("validation_passed", all_passed)
        mlflow.set_tag("validation_timestamp", str(pd.Timestamp.now()))

    # ============================================================
    # 7. RESULTADO FINAL
    # ============================================================
    print("\n" + "="*60)
    if all_passed:
        print("✓✓✓ VALIDACIÓN EXITOSA - MODELO APROBADO")
        print("="*60)
        return True
    else:
        print("✗✗✗ VALIDACIÓN FALLIDA - MODELO RECHAZADO")
        print("="*60)
        return False

# ============================================================
# EJECUTAR VALIDACIÓN
# ============================================================
if __name__ == "__main__":
    thresholds = {
        "auc_roc": 0.92,      # AUC > 92%
        "precision": 0.85,    # Precision > 85%
        "recall": 0.80,       # Recall > 80%
        "f1_score": 0.82      # F1 > 82%
    }

    passed = validate_model(
        model_name="fraud_detector",
        new_version="7",
        test_data_path="s3://data/fraud/test/",
        thresholds=thresholds
    )

    # Exit con código apropiado para CI/CD
    sys.exit(0 if passed else 1)

Criterios de Éxito - Fase 2

  • Modelo cumple todos los thresholds de métricas
  • Comparación con modelo en producción implementada
  • Resultados logeados en MLflow
  • Script integrado en CI/CD pipeline

Criterios de Evaluación del Reto

Tu solución de CI/CD será evaluada según los siguientes criterios:

Funcionalidad (50%)

1. CI Pipeline 15 pts
  • • Tests automatizados (10 pts)
  • • Coverage >80% (5 pts)
2. Model Validation 15 pts
  • • Quality gates (10 pts)
  • • Comparison con prod (5 pts)
3. CD Automatizado 10 pts
4. Monitoring 10 pts

Calidad (50%)

5. Código Limpio 15 pts
  • • Modular y documentado
  • • Error handling robusto
6. Configurabilidad 15 pts
  • • Thresholds parametrizables
  • • Environment configs
7. Documentación 10 pts
8. Innovación 10 pts

Calificación Final

90-100

Production-ready

75-89

Muy Bueno

60-74

Aceptable

Anterior: Deployment & Monitoring Volver al Hub