Desafío: Pipeline CI/CD Completo para ML
Construye un pipeline end-to-end de Continuous Integration / Continuous Deployment que automatice testing, validación, deployment y rollback de modelos ML. El objetivo es garantizar que solo modelos de alta calidad lleguen a producción, con reversión automática si se detectan problemas.
Contexto del Problema
Eres el MLOps Engineer de una fintech que despliega modelos de detección de fraude. Actualmente el deployment es manual y tardío (1-2 semanas), lo que permite que fraudsters exploten patrones nuevos. Debes automatizar completamente el ciclo de vida para reducir time-to-production a <24 horas, manteniendo alta calidad y seguridad.
Requisitos
- • Tests automatizados (unit + integration)
- • Validación de modelo (AUC > threshold)
- • Deployment canary (5% tráfico primero)
- • Monitoreo post-deploy
- • Rollback automático si falla
Métricas
- • Time-to-production: <24h
- • Test coverage: >80%
- • Zero-downtime deployment
- • Rollback time: <5 min
- • AUC-ROC > 0.92
Stack Tecnológico
- • GitHub Actions (CI/CD)
- • MLflow (Model Registry)
- • Spark ML (Entrenamiento)
- • pytest (Testing)
- • Prometheus (Monitoreo)
Fase 1: Continuous Integration - Testing Automatizado
Objetivo de la Fase
Implementar suite completa de tests que validen transformaciones de datos, schema, y lógica de preprocesamiento. Los tests deben ejecutarse automáticamente en cada commit.
Arquitectura del CI Pipeline
graph LR
A[Git Push] --> B[GitHub Actions Trigger]
B --> C[Setup Environment]
C --> D[Install Dependencies]
D --> E[Run Unit Tests]
E --> F{Tests Pass?}
F -->|No| G[❌ Fail Build]
F -->|Yes| H[Run Integration Tests]
H --> I{Tests Pass?}
I -->|No| G
I -->|Yes| J[Data Validation]
J --> K{Schema Valid?}
K -->|No| G
K -->|Yes| L[✓ CI Success]
L --> M[Trigger CD Pipeline]
style A fill:#3b82f6,stroke:#60a5fa,color:#fff
style G fill:#ef4444,stroke:#f87171,color:#fff
style L fill:#10b981,stroke:#34d399,color:#fff
style M fill:#f59e0b,stroke:#fbbf24,color:#000
Código: Suite de Tests con pytest
# tests/test_transformations.py
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from ml_pipeline.transformations import (
clean_data, encode_categorical, validate_schema
)
# ============================================================
# FIXTURE: Spark Session para tests
# ============================================================
@pytest.fixture(scope="session")
def spark():
"""Create Spark session for testing"""
spark = SparkSession.builder \
.master("local[2]") \
.appName("ML_Tests") \
.getOrCreate()
yield spark
spark.stop()
# ============================================================
# TESTS UNITARIOS: Transformaciones Individuales
# ============================================================
def test_clean_data_removes_nulls(spark):
"""Test que clean_data elimina filas con nulls en columnas críticas"""
# Arrange: crear dataframe con nulls
data = [
(1, "john@email.com", 25, 100.0),
(2, None, 30, 150.0), # null en email
(3, "jane@email.com", None, 200.0), # null en age
(4, "bob@email.com", 35, 250.0)
]
schema = StructType([
StructField("id", IntegerType(), True),
StructField("email", StringType(), True),
StructField("age", IntegerType(), True),
StructField("amount", DoubleType(), True)
])
df = spark.createDataFrame(data, schema)
# Act: aplicar limpieza
df_cleaned = clean_data(df, required_cols=["email", "age"])
# Assert: verificar que quedan solo 2 filas válidas
assert df_cleaned.count() == 2, "Should remove rows with nulls"
assert df_cleaned.filter("email IS NULL").count() == 0
assert df_cleaned.filter("age IS NULL").count() == 0
def test_clean_data_removes_duplicates(spark):
"""Test que clean_data elimina duplicados"""
data = [
(1, "john@email.com", 25),
(1, "john@email.com", 25), # duplicado
(2, "jane@email.com", 30)
]
schema = StructType([
StructField("id", IntegerType(), True),
StructField("email", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.createDataFrame(data, schema)
df_cleaned = clean_data(df, dedup_cols=["id", "email"])
assert df_cleaned.count() == 2, "Should remove duplicates"
def test_encode_categorical_preserves_count(spark):
"""Test que encoding no pierde filas"""
data = [
(1, "premium"),
(2, "basic"),
(3, "premium"),
(4, "enterprise")
]
schema = StructType([
StructField("id", IntegerType(), True),
StructField("plan_type", StringType(), True)
])
df = spark.createDataFrame(data, schema)
df_encoded = encode_categorical(df, categorical_cols=["plan_type"])
assert df_encoded.count() == df.count(), "Encoding should preserve row count"
def test_encode_categorical_creates_correct_columns(spark):
"""Test que encoding crea columnas correctas"""
data = [(1, "A"), (2, "B"), (3, "A")]
schema = StructType([
StructField("id", IntegerType()),
StructField("category", StringType())
])
df = spark.createDataFrame(data, schema)
df_encoded = encode_categorical(df, categorical_cols=["category"])
# Verificar que existen columnas index y encoded
assert "category_index" in df_encoded.columns
assert "category_encoded" in df_encoded.columns
def test_validate_schema_passes_correct_schema(spark):
"""Test que schema correcto pasa validación"""
expected_schema = StructType([
StructField("id", IntegerType(), False),
StructField("email", StringType(), False),
StructField("age", IntegerType(), True)
])
data = [(1, "test@email.com", 25)]
df = spark.createDataFrame(data, expected_schema)
# No debe lanzar excepción
validate_schema(df, expected_schema)
def test_validate_schema_fails_missing_column(spark):
"""Test que schema incorrecto falla validación"""
expected_schema = StructType([
StructField("id", IntegerType()),
StructField("email", StringType()),
StructField("age", IntegerType())
])
# DataFrame sin columna 'age'
data = [(1, "test@email.com")]
schema = StructType([
StructField("id", IntegerType()),
StructField("email", StringType())
])
df = spark.createDataFrame(data, schema)
with pytest.raises(ValueError, match="Missing columns"):
validate_schema(df, expected_schema)
# ============================================================
# TESTS DE INTEGRACIÓN: Pipeline Completo
# ============================================================
def test_full_pipeline_end_to_end(spark):
"""Test del pipeline completo de preprocesamiento"""
from ml_pipeline.pipeline import build_preprocessing_pipeline
# Crear datos raw simulados
data = [
(1, "john@email.com", 25, "premium", 100.0),
(2, "jane@email.com", 30, "basic", 50.0),
(3, "bob@email.com", 35, "premium", 150.0)
]
schema = StructType([
StructField("id", IntegerType()),
StructField("email", StringType()),
StructField("age", IntegerType()),
StructField("plan_type", StringType()),
StructField("amount", DoubleType())
])
df_raw = spark.createDataFrame(data, schema)
# Construir y aplicar pipeline
pipeline = build_preprocessing_pipeline()
df_processed = pipeline.transform(df_raw)
# Assertions sobre output
assert df_processed.count() == 3, "Pipeline should preserve all valid rows"
assert "features" in df_processed.columns, "Pipeline should create features column"
# Verificar que features es un vector
from pyspark.ml.linalg import VectorUDT
features_type = df_processed.schema["features"].dataType
assert isinstance(features_type, VectorUDT), "features should be VectorUDT"
# ============================================================
# TESTS DE PROPIEDADES: Invariantes del Sistema
# ============================================================
def test_transformation_is_idempotent(spark):
"""Test que aplicar transformación 2 veces da mismo resultado"""
data = [(1, "test", 25)]
schema = StructType([
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("age", IntegerType())
])
df = spark.createDataFrame(data, schema)
df_once = clean_data(df, required_cols=["name"])
df_twice = clean_data(df_once, required_cols=["name"])
# Verificar idempotencia
assert df_once.count() == df_twice.count()
assert df_once.collect() == df_twice.collect()
# ============================================================
# CONFIGURACIÓN: pytest.ini
# ============================================================
# [pytest]
# testpaths = tests
# python_files = test_*.py
# python_functions = test_*
# markers =
# unit: Unit tests
# integration: Integration tests
# slow: Slow running tests
GitHub Actions Workflow
# .github/workflows/ci.yml
name: ML Pipeline CI
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run unit tests
run: |
pytest tests/ -v --cov=ml_pipeline --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
- name: Data validation
run: |
python scripts/validate_data_schema.py
- name: Lint code
run: |
pip install flake8
flake8 ml_pipeline/ --max-line-length=100
build:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Trigger CD pipeline
run: echo "CI passed, triggering CD..."
Criterios de Éxito - Fase 1
- ✓ Al menos 15 tests implementados (unit + integration)
- ✓ Test coverage > 80%
- ✓ Todos los tests pasan en CI
- ✓ Schema validation implementada
Fase 2: Model Validation - Quality Gates
Objetivo de la Fase
Implementar validaciones automáticas que aseguren que solo modelos de alta calidad sean promovidos a producción. Incluye tests de performance, fairness, y A/B testing.
Código: Model Validation Script
# scripts/validate_model.py
import mlflow
from mlflow.tracking import MlflowClient
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import sys
def validate_model(model_name, new_version, test_data_path, thresholds):
"""
Valida un modelo contra thresholds de performance
Args:
model_name: Nombre del modelo en registry
new_version: Versión a validar
test_data_path: Path a datos de test
thresholds: Dict con métricas mínimas requeridas
Returns:
bool: True si modelo pasa validación
"""
print(f"Validando modelo {model_name} v{new_version}...")
# ============================================================
# 1. CARGAR MODELO Y DATOS
# ============================================================
model_uri = f"models:/{model_name}/{new_version}"
model = mlflow.spark.load_model(model_uri)
df_test = spark.read.parquet(test_data_path)
print(f"Test set size: {df_test.count():,} rows")
# ============================================================
# 2. GENERAR PREDICCIONES
# ============================================================
predictions = model.transform(df_test)
# ============================================================
# 3. CALCULAR MÉTRICAS
# ============================================================
evaluator_auc = BinaryClassificationEvaluator(
labelCol="fraud_label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
auc_roc = evaluator_auc.evaluate(predictions)
evaluator_pr = BinaryClassificationEvaluator(
labelCol="fraud_label",
metricName="areaUnderPR"
)
auc_pr = evaluator_pr.evaluate(predictions)
from sklearn.metrics import precision_score, recall_score, f1_score
pred_pd = predictions.select("fraud_label", "prediction").toPandas()
precision = precision_score(pred_pd["fraud_label"], pred_pd["prediction"])
recall = recall_score(pred_pd["fraud_label"], pred_pd["prediction"])
f1 = f1_score(pred_pd["fraud_label"], pred_pd["prediction"])
metrics = {
"auc_roc": auc_roc,
"auc_pr": auc_pr,
"precision": precision,
"recall": recall,
"f1_score": f1
}
print("\n" + "="*60)
print("MÉTRICAS DEL MODELO")
print("="*60)
for metric, value in metrics.items():
print(f"{metric}: {value:.4f}")
# ============================================================
# 4. VALIDAR CONTRA THRESHOLDS
# ============================================================
validation_results = {}
all_passed = True
print("\n" + "="*60)
print("VALIDACIÓN CONTRA THRESHOLDS")
print("="*60)
for metric, threshold in thresholds.items():
actual_value = metrics.get(metric, 0)
passed = actual_value >= threshold
validation_results[metric] = {
"threshold": threshold,
"actual": actual_value,
"passed": passed
}
status = "✓ PASS" if passed else "✗ FAIL"
print(f"{metric}: {actual_value:.4f} >= {threshold:.4f} ? {status}")
if not passed:
all_passed = False
# ============================================================
# 5. COMPARAR CON MODELO EN PRODUCCIÓN
# ============================================================
client = MlflowClient()
try:
# Obtener modelo actual en producción
prod_versions = client.get_latest_versions(model_name, stages=["Production"])
if prod_versions:
prod_version = prod_versions[0].version
prod_model = mlflow.spark.load_model(f"models:/{model_name}/{prod_version}")
# Generar predicciones con modelo de producción
predictions_prod = prod_model.transform(df_test)
auc_prod = evaluator_auc.evaluate(predictions_prod)
print("\n" + "="*60)
print("COMPARACIÓN CON PRODUCCIÓN")
print("="*60)
print(f"Producción (v{prod_version}): AUC = {auc_prod:.4f}")
print(f"Nuevo (v{new_version}): AUC = {auc_roc:.4f}")
improvement = ((auc_roc - auc_prod) / auc_prod) * 100
print(f"Mejora: {improvement:+.2f}%")
# Requerir al menos mejora del 1%
if improvement < 1.0:
print("⚠ Advertencia: Mejora menor al 1% sobre producción")
# No bloquear, pero alertar
except Exception as e:
print(f"No se pudo comparar con producción: {e}")
# ============================================================
# 6. LOGEAR RESULTADOS EN MLFLOW
# ============================================================
with mlflow.start_run():
for metric, value in metrics.items():
mlflow.log_metric(f"validation_{metric}", value)
mlflow.log_param("validation_passed", all_passed)
mlflow.set_tag("validation_timestamp", str(pd.Timestamp.now()))
# ============================================================
# 7. RESULTADO FINAL
# ============================================================
print("\n" + "="*60)
if all_passed:
print("✓✓✓ VALIDACIÓN EXITOSA - MODELO APROBADO")
print("="*60)
return True
else:
print("✗✗✗ VALIDACIÓN FALLIDA - MODELO RECHAZADO")
print("="*60)
return False
# ============================================================
# EJECUTAR VALIDACIÓN
# ============================================================
if __name__ == "__main__":
thresholds = {
"auc_roc": 0.92, # AUC > 92%
"precision": 0.85, # Precision > 85%
"recall": 0.80, # Recall > 80%
"f1_score": 0.82 # F1 > 82%
}
passed = validate_model(
model_name="fraud_detector",
new_version="7",
test_data_path="s3://data/fraud/test/",
thresholds=thresholds
)
# Exit con código apropiado para CI/CD
sys.exit(0 if passed else 1)
Criterios de Éxito - Fase 2
- ✓ Modelo cumple todos los thresholds de métricas
- ✓ Comparación con modelo en producción implementada
- ✓ Resultados logeados en MLflow
- ✓ Script integrado en CI/CD pipeline
Criterios de Evaluación del Reto
Tu solución de CI/CD será evaluada según los siguientes criterios:
Funcionalidad (50%)
- • Tests automatizados (10 pts)
- • Coverage >80% (5 pts)
- • Quality gates (10 pts)
- • Comparison con prod (5 pts)
Calidad (50%)
- • Modular y documentado
- • Error handling robusto
- • Thresholds parametrizables
- • Environment configs
Calificación Final
90-100
Production-ready
75-89
Muy Bueno
60-74
Aceptable