Arquitectura Medallion
El estándar de la industria para organizar Data Lakehouses.
Refina tus datos en capas incrementales: Bronze, Silver y Gold.
01. El Paradigma de Capas
La arquitectura Medallion estructura tu lakehouse en tres capas de refinamiento progresivo, donde cada capa transforma y mejora la calidad de los datos.
Datos crudos sin transformar. Copia exacta de las fuentes. Inmutables y con schema completo.
bronze.orders_raw
Datos limpios, validados y deduplicados. Aplica reglas de negocio. Enterprise-ready.
silver.orders_clean
KPIs, métricas y features ML. Optimizadas para consultas analíticas. Business-ready.
gold.monthly_revenue
Flujo de Refinamiento
APIs, Files, DBs] --> Bronze[🥉 BRONZE
Raw Data] Bronze --> Silver[🥈 SILVER
Cleaned & Validated] Silver --> Gold[🥇 GOLD
Business Metrics] Bronze -.->|Datos Corruptos| Q[⚠️ QUARANTINE] Silver -.->|Reglas Fallidas| Q Gold --> BI[📊 Dashboards] Gold --> ML[🤖 ML Models] style Sources fill:#1e293b,stroke:#64748b,stroke-width:2px style Bronze fill:#CD7F32,stroke:#fff,color:#000 style Silver fill:#C0C0C0,stroke:#fff,color:#000 style Gold fill:#FFD700,stroke:#fff,color:#000 style Q fill:#DC2626,stroke:#fff,color:#fff style BI fill:#3b82f6,stroke:#fff style ML fill:#8b5cf6,stroke:#fff
02. Capa Bronze 🥉
Características
- Schema-on-read: Carga todos los campos tal cual vienen
- Append-only: Nunca actualiza ni borra (inmutable)
- Metadata: Agrega columnas de auditoría (_ingestion_time, _source_file)
- Delta Lake: Control de versiones y Time Travel
from pyspark.sql import functions as F
# Leer desde fuente cruda (CSV, JSON, Parquet)
df_raw = (spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("s3://landing/orders/*.csv")
)
# Agregar metadata de auditoría
df_bronze = df_raw.withColumn(
"_ingestion_time",
F.current_timestamp()
).withColumn(
"_source_file",
F.input_file_name()
)
# Escribir a Bronze (Delta Lake)
(df_bronze.write
.format("delta")
.mode("append")
.partitionBy("_ingestion_date")
.saveAsTable("bronze.orders_raw")
)
print(f"✅ Loaded {df_bronze.count()} rows to Bronze")
03. Capa Silver 🥈
# Leer desde Bronze
df_bronze = spark.table("bronze.orders_raw")
# TRANSFORMACIONES SILVER
df_silver = (df_bronze
# 1. Limpieza de tipos
.withColumn("amount", F.col("amount").cast("double"))
.withColumn("order_date", F.to_date("order_date"))
# 2. Deduplicación
.dropDuplicates(["order_id"])
# 3. Validaciones básicas
.filter(F.col("amount") > 0)
.filter(F.col("customer_id").isNotNull())
# 4. Normalización
.withColumn("email", F.lower(F.trim("email")))
# 5. Enriquecimiento
.withColumn("year_month", F.date_format("order_date", "yyyy-MM"))
)
# Escribir a Silver (MERGE para upserts)
(df_silver.write
.format("delta")
.mode("overwrite")
.option("mergeSchema", "true")
.saveAsTable("silver.orders_clean")
)
Operaciones Típicas
Convertir strings a tipos correctos (dates, decimals)
Eliminar duplicados por clave primaria
Lowercase, trim, formato consistente
Unir con tablas de referencia (dim_customers, dim_products)
04. Capa Gold 🥇
La capa Gold contiene métricas de negocio listas para consumo. Aquí viven los KPIs, dashboards de BI y features para modelos ML.
Casos de Uso
Optimizaciones Gold
- Z-Ordering en columnas de filtro frecuente
- Particionamiento temporal (año/mes)
- Agregaciones pre-calculadas
# Leer desde Silver
df_silver = spark.table("silver.orders_clean")
# AGREGACIÓN: Revenue mensual por región
df_gold_revenue = (df_silver
.groupBy("year_month", "region")
.agg(
F.sum("amount").alias("total_revenue"),
F.count("order_id").alias("order_count"),
F.countDistinct("customer_id").alias("unique_customers"),
F.avg("amount").alias("avg_order_value")
)
# Window function: Growth MoM
.withColumn(
"prev_month_revenue",
F.lag("total_revenue").over(
Window.partitionBy("region")
.orderBy("year_month")
)
)
.withColumn(
"growth_pct",
((F.col("total_revenue") - F.col("prev_month_revenue"))
/ F.col("prev_month_revenue") * 100)
)
)
# Escribir a Gold
(df_gold_revenue.write
.format("delta")
.mode("overwrite")
.saveAsTable("gold.monthly_revenue")
)
print("🥇 Gold metrics generated successfully")
05. Data Quality Gates 🚨
Un Quality Gate es un checkpoint de validación que decide si un registro puede avanzar a la siguiente capa o debe desviarse a cuarentena.
Validations} QG -->|✅ PASS| Silver[🥈 Silver Clean] QG -->|❌ FAIL| Quarantine[⚠️ Quarantine Table] Quarantine -.->|Manual Review| Fix[🔧 Data Fix] Fix -.->|Re-ingest| Bronze style Bronze fill:#CD7F32,stroke:#fff,color:#000 style QG fill:#3b82f6,stroke:#fff,color:#fff style Silver fill:#C0C0C0,stroke:#fff,color:#000 style Quarantine fill:#DC2626,stroke:#fff,color:#fff style Fix fill:#eab308,stroke:#fff,color:#000
Schema Validation
Verificar que existan columnas requeridas y tipos correctos.
amount IS NOT NULL
Business Rules
Lógica de negocio específica del dominio.
amount > 0 AND amount < 1000000
Referential Integrity
Verificar que FKs existan en tablas relacionadas.
customer_id IN dim_customers
from pyspark.sql import functions as F
df_bronze = spark.table("bronze.orders_raw")
# DEFINIR REGLAS DE VALIDACIÓN
df_validated = df_bronze.withColumn(
"_quality_checks",
F.struct(
# Regla 1: Amount positivo
(F.col("amount") > 0).alias("valid_amount"),
# Regla 2: Email no nulo
F.col("email").isNotNull().alias("valid_email"),
# Regla 3: Fecha no futura
(F.col("order_date") <= F.current_date()).alias("valid_date"),
# Regla 4: Customer ID existe
F.col("customer_id").isNotNull().alias("valid_customer")
)
)
# Calcular si TODAS las reglas pasan
df_validated = df_validated.withColumn(
"_is_valid",
(F.col("_quality_checks.valid_amount") &
F.col("_quality_checks.valid_email") &
F.col("_quality_checks.valid_date") &
F.col("_quality_checks.valid_customer"))
)
# Agregar razón de rechazo para registros fallidos
df_validated = df_validated.withColumn(
"_rejection_reason",
F.when(~F.col("_is_valid"), F.concat_ws(", ",
F.when(~F.col("_quality_checks.valid_amount"), "Invalid amount"),
F.when(~F.col("_quality_checks.valid_email"), "Missing email"),
F.when(~F.col("_quality_checks.valid_date"), "Future date"),
F.when(~F.col("_quality_checks.valid_customer"), "Invalid customer")
))
)
# SEPARAR: Válidos a Silver, Inválidos a Quarantine
df_clean = df_validated.filter(F.col("_is_valid"))
df_quarantine = df_validated.filter(~F.col("_is_valid"))
# Escribir a Silver
(df_clean
.drop("_quality_checks", "_is_valid", "_rejection_reason")
.write.format("delta").mode("append")
.saveAsTable("silver.orders_clean")
)
# Escribir a Quarantine (con razón de rechazo)
(df_quarantine
.withColumn("_quarantine_timestamp", F.current_timestamp())
.write.format("delta").mode("append")
.saveAsTable("silver.quarantine")
)
print(f"✅ Clean records: {df_clean.count()}")
print(f"❌ Quarantine records: {df_quarantine.count()}")
🎯 RETO DE INGENIERÍA
Implementa un pipeline Medallion completo con Quality Gate robusto
Desafío: Quality Gate con Cuarentena
📋 Objetivo
Crear un pipeline que procese datos de bronze.transactions
aplicando validaciones estrictas. Los registros válidos avanzan a Silver,
los inválidos se desvían a silver.quarantine con metadata de auditoría.
✅ Requisitos Técnicos
Campos obligatorios: transaction_id, amount, customer_id, transaction_date
amount entre 0 y 100,000 | date no futura | customer_id formato válido
Agregar: _validation_timestamp, _pipeline_version, _rejection_code
Generar métricas: % de rechazo por tipo de error, top 3 errores frecuentes
- • Implementar re-processing: script para reintentar registros en quarantine después de corrección
- • Agregar alertas: enviar notificación si tasa de rechazo > 5%
- • Crear dashboard en Databricks SQL mostrando tendencias de calidad
📦 Entregables
medallion_pipeline.py
- Script completo del pipeline
quality_report.sql
- Query de análisis de calidad
README.md
- Documentación de decisiones técnicas