HUB CENTRAL

SECCIÓN 11.medallion()

BRONZE → SILVER → GOLD

ARQUITECTURA: MEDALLION

Arquitectura Medallion

El estándar de la industria para organizar Data Lakehouses.
Refina tus datos en capas incrementales: Bronze, Silver y Gold.

Popularizado por Databricks | Optimizado con Delta Lake

01. El Paradigma de Capas

La arquitectura Medallion estructura tu lakehouse en tres capas de refinamiento progresivo, donde cada capa transforma y mejora la calidad de los datos.

Bronze (Raw)

Datos crudos sin transformar. Copia exacta de las fuentes. Inmutables y con schema completo.

bronze.orders_raw
Silver (Cleaned)

Datos limpios, validados y deduplicados. Aplica reglas de negocio. Enterprise-ready.

silver.orders_clean
Gold (Aggregated)

KPIs, métricas y features ML. Optimizadas para consultas analíticas. Business-ready.

gold.monthly_revenue

Flujo de Refinamiento

graph TD Sources[Fuentes Externas
APIs, Files, DBs] --> Bronze[🥉 BRONZE
Raw Data] Bronze --> Silver[🥈 SILVER
Cleaned & Validated] Silver --> Gold[🥇 GOLD
Business Metrics] Bronze -.->|Datos Corruptos| Q[⚠️ QUARANTINE] Silver -.->|Reglas Fallidas| Q Gold --> BI[📊 Dashboards] Gold --> ML[🤖 ML Models] style Sources fill:#1e293b,stroke:#64748b,stroke-width:2px style Bronze fill:#CD7F32,stroke:#fff,color:#000 style Silver fill:#C0C0C0,stroke:#fff,color:#000 style Gold fill:#FFD700,stroke:#fff,color:#000 style Q fill:#DC2626,stroke:#fff,color:#fff style BI fill:#3b82f6,stroke:#fff style ML fill:#8b5cf6,stroke:#fff

02. Capa Bronze 🥉

Características

  • Schema-on-read: Carga todos los campos tal cual vienen
  • Append-only: Nunca actualiza ni borra (inmutable)
  • Metadata: Agrega columnas de auditoría (_ingestion_time, _source_file)
  • Delta Lake: Control de versiones y Time Travel
💡 Principio Clave Bronze es tu "seguro de vida". Si algo falla en capas superiores, siempre puedes reproducir el pipeline desde aquí.
bronze_ingestion.py BRONZE LAYER
from pyspark.sql import functions as F

# Leer desde fuente cruda (CSV, JSON, Parquet)
df_raw = (spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("s3://landing/orders/*.csv")
)

# Agregar metadata de auditoría
df_bronze = df_raw.withColumn(
    "_ingestion_time",
    F.current_timestamp()
).withColumn(
    "_source_file",
    F.input_file_name()
)

# Escribir a Bronze (Delta Lake)
(df_bronze.write
    .format("delta")
    .mode("append")
    .partitionBy("_ingestion_date")
    .saveAsTable("bronze.orders_raw")
)

print(f"✅ Loaded {df_bronze.count()} rows to Bronze")

03. Capa Silver 🥈

silver_cleaning.py SILVER LAYER
# Leer desde Bronze
df_bronze = spark.table("bronze.orders_raw")

# TRANSFORMACIONES SILVER
df_silver = (df_bronze
    # 1. Limpieza de tipos
    .withColumn("amount", F.col("amount").cast("double"))
    .withColumn("order_date", F.to_date("order_date"))

    # 2. Deduplicación
    .dropDuplicates(["order_id"])

    # 3. Validaciones básicas
    .filter(F.col("amount") > 0)
    .filter(F.col("customer_id").isNotNull())

    # 4. Normalización
    .withColumn("email", F.lower(F.trim("email")))

    # 5. Enriquecimiento
    .withColumn("year_month", F.date_format("order_date", "yyyy-MM"))
)

# Escribir a Silver (MERGE para upserts)
(df_silver.write
    .format("delta")
    .mode("overwrite")
    .option("mergeSchema", "true")
    .saveAsTable("silver.orders_clean")
)

Operaciones Típicas

1
Type Casting

Convertir strings a tipos correctos (dates, decimals)

2
Deduplicación

Eliminar duplicados por clave primaria

3
Normalización

Lowercase, trim, formato consistente

4
Joins Empresariales

Unir con tablas de referencia (dim_customers, dim_products)

⚠️ Importante Silver NO elimina datos por defecto. Los datos inválidos deben desviarse a quarantine para auditoría posterior.

04. Capa Gold 🥇

La capa Gold contiene métricas de negocio listas para consumo. Aquí viven los KPIs, dashboards de BI y features para modelos ML.

Casos de Uso

📊
Revenue YTD
Ingresos acumulados
👥
Customer LTV
Valor de vida del cliente
📈
Churn Rate
Tasa de abandono
🎯
RFM Score
Recency-Frequency-Monetary
Optimizaciones Gold
  • Z-Ordering en columnas de filtro frecuente
  • Particionamiento temporal (año/mes)
  • Agregaciones pre-calculadas
gold_metrics.py GOLD LAYER
# Leer desde Silver
df_silver = spark.table("silver.orders_clean")

# AGREGACIÓN: Revenue mensual por región
df_gold_revenue = (df_silver
    .groupBy("year_month", "region")
    .agg(
        F.sum("amount").alias("total_revenue"),
        F.count("order_id").alias("order_count"),
        F.countDistinct("customer_id").alias("unique_customers"),
        F.avg("amount").alias("avg_order_value")
    )
    # Window function: Growth MoM
    .withColumn(
        "prev_month_revenue",
        F.lag("total_revenue").over(
            Window.partitionBy("region")
                  .orderBy("year_month")
        )
    )
    .withColumn(
        "growth_pct",
        ((F.col("total_revenue") - F.col("prev_month_revenue"))
         / F.col("prev_month_revenue") * 100)
    )
)

# Escribir a Gold
(df_gold_revenue.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("gold.monthly_revenue")
)

print("🥇 Gold metrics generated successfully")

05. Data Quality Gates 🚨

Un Quality Gate es un checkpoint de validación que decide si un registro puede avanzar a la siguiente capa o debe desviarse a cuarentena.

graph LR Bronze[🥉 Bronze Raw] --> QG{Quality Gate
Validations} QG -->|✅ PASS| Silver[🥈 Silver Clean] QG -->|❌ FAIL| Quarantine[⚠️ Quarantine Table] Quarantine -.->|Manual Review| Fix[🔧 Data Fix] Fix -.->|Re-ingest| Bronze style Bronze fill:#CD7F32,stroke:#fff,color:#000 style QG fill:#3b82f6,stroke:#fff,color:#fff style Silver fill:#C0C0C0,stroke:#fff,color:#000 style Quarantine fill:#DC2626,stroke:#fff,color:#fff style Fix fill:#eab308,stroke:#fff,color:#000

Schema Validation

Verificar que existan columnas requeridas y tipos correctos.

amount IS NOT NULL

Business Rules

Lógica de negocio específica del dominio.

amount > 0 AND amount < 1000000

Referential Integrity

Verificar que FKs existan en tablas relacionadas.

customer_id IN dim_customers
quality_gate.py QUALITY GATE
from pyspark.sql import functions as F

df_bronze = spark.table("bronze.orders_raw")

# DEFINIR REGLAS DE VALIDACIÓN
df_validated = df_bronze.withColumn(
    "_quality_checks",
    F.struct(
        # Regla 1: Amount positivo
        (F.col("amount") > 0).alias("valid_amount"),

        # Regla 2: Email no nulo
        F.col("email").isNotNull().alias("valid_email"),

        # Regla 3: Fecha no futura
        (F.col("order_date") <= F.current_date()).alias("valid_date"),

        # Regla 4: Customer ID existe
        F.col("customer_id").isNotNull().alias("valid_customer")
    )
)

# Calcular si TODAS las reglas pasan
df_validated = df_validated.withColumn(
    "_is_valid",
    (F.col("_quality_checks.valid_amount") &
     F.col("_quality_checks.valid_email") &
     F.col("_quality_checks.valid_date") &
     F.col("_quality_checks.valid_customer"))
)

# Agregar razón de rechazo para registros fallidos
df_validated = df_validated.withColumn(
    "_rejection_reason",
    F.when(~F.col("_is_valid"), F.concat_ws(", ",
        F.when(~F.col("_quality_checks.valid_amount"), "Invalid amount"),
        F.when(~F.col("_quality_checks.valid_email"), "Missing email"),
        F.when(~F.col("_quality_checks.valid_date"), "Future date"),
        F.when(~F.col("_quality_checks.valid_customer"), "Invalid customer")
    ))
)

# SEPARAR: Válidos a Silver, Inválidos a Quarantine
df_clean = df_validated.filter(F.col("_is_valid"))
df_quarantine = df_validated.filter(~F.col("_is_valid"))

# Escribir a Silver
(df_clean
    .drop("_quality_checks", "_is_valid", "_rejection_reason")
    .write.format("delta").mode("append")
    .saveAsTable("silver.orders_clean")
)

# Escribir a Quarantine (con razón de rechazo)
(df_quarantine
    .withColumn("_quarantine_timestamp", F.current_timestamp())
    .write.format("delta").mode("append")
    .saveAsTable("silver.quarantine")
)

print(f"✅ Clean records: {df_clean.count()}")
print(f"❌ Quarantine records: {df_quarantine.count()}")

🎯 RETO DE INGENIERÍA

Implementa un pipeline Medallion completo con Quality Gate robusto

Desafío: Quality Gate con Cuarentena

📋 Objetivo

Crear un pipeline que procese datos de bronze.transactions aplicando validaciones estrictas. Los registros válidos avanzan a Silver, los inválidos se desvían a silver.quarantine con metadata de auditoría.

✅ Requisitos Técnicos
1
Validar Schema

Campos obligatorios: transaction_id, amount, customer_id, transaction_date

2
Reglas de Negocio

amount entre 0 y 100,000 | date no futura | customer_id formato válido

3
Metadata de Auditoría

Agregar: _validation_timestamp, _pipeline_version, _rejection_code

4
Reportes

Generar métricas: % de rechazo por tipo de error, top 3 errores frecuentes

🌟 Bonus Points
  • • Implementar re-processing: script para reintentar registros en quarantine después de corrección
  • • Agregar alertas: enviar notificación si tasa de rechazo > 5%
  • • Crear dashboard en Databricks SQL mostrando tendencias de calidad
📦 Entregables
medallion_pipeline.py - Script completo del pipeline
quality_report.sql - Query de análisis de calidad
README.md - Documentación de decisiones técnicas
📊 Criterios de Evaluación
25%
Correctitud del código
25%
Robustez de validaciones
25%
Calidad de metadata
25%
Documentación

🏗️ Arquitectura Medallion Completa

graph TB subgraph Sources[" "] S1[CSV Files] S2[REST APIs] S3[Databases] end subgraph Bronze[" 🥉 BRONZE LAYER "] B1[raw_customers] B2[raw_orders] B3[raw_products] end subgraph QualityGate[" 🚨 QUALITY GATE "] QG[Validation Engine] Q[Quarantine Table] end subgraph Silver[" 🥈 SILVER LAYER "] S1L[clean_customers] S2L[clean_orders] S3L[clean_products] end subgraph Gold[" 🥇 GOLD LAYER "] G1[monthly_revenue] G2[customer_ltv] G3[product_analytics] end subgraph Consumers[" "] C1[📊 BI Dashboards] C2[🤖 ML Models] C3[📈 Reports] end S1 & S2 & S3 --> B1 & B2 & B3 B1 & B2 & B3 --> QG QG -->|Valid| S1L & S2L & S3L QG -->|Invalid| Q S1L & S2L & S3L --> G1 & G2 & G3 G1 & G2 & G3 --> C1 & C2 & C3 style Sources fill:#1e293b,stroke:#64748b style Bronze fill:#CD7F32,stroke:#fff,color:#000 style QualityGate fill:#3b82f6,stroke:#fff style Silver fill:#C0C0C0,stroke:#fff,color:#000 style Gold fill:#FFD700,stroke:#fff,color:#000 style Consumers fill:#10b981,stroke:#fff style Q fill:#DC2626,stroke:#fff,color:#fff