HUB CENTRAL

SECCIÓN 12.DLT()

DELTA LIVE TABLES & DECLARATIVE PIPELINES

LIVE PIPELINES

Delta Live Tables

Pipelines declarativos de datos que se autoconstruyen, autovalidan y autorrecuperan.
Define qué quieres lograr, no cómo hacerlo. DLT maneja el resto.

01. ¿Qué es DLT?

Delta Live Tables (DLT) es el framework de Databricks para construir pipelines de datos confiables, escalables y mantenibles. Transforma código imperativo complejo en declaraciones SQL o Python simples.

  • DAG Automático

    Define tablas con @dlt.table. DLT calcula automáticamente dependencias y orden de ejecución.

  • Calidad Incorporada

    Usa expect() para definir reglas. DLT monitorea y alerta violaciones automáticamente.

  • Reintento Inteligente

    Si falla una tabla, DLT reintenta solo lo necesario, sin reprocesar todo el pipeline.

pipeline/ventas_dlt.py STREAMING

import dlt
from pyspark.sql import functions as F

# Define una tabla en streaming
@dlt.table(
    name="ventas_silver",
    comment="Ventas limpias en tiempo real"
)
@dlt.expect_or_drop("valid_amount", "amount > 0")
def ventas_silver():
    return (
        dlt.read_stream("ventas_bronze")
        .withColumn("processed_at", F.current_timestamp())
    )
                
DLT maneja orquestación, checkpoints y reintentos.

02. Anatomía de un Pipeline DLT

graph TD Source[📁 Fuente Raw] -->|@dlt.table| Bronze[🥉 BRONZE: ventas_bronze] Bronze -->|@dlt.view + expect| Silver[🥈 SILVER: ventas_silver] Silver -->|@dlt.table| Gold[🥇 GOLD: ventas_kpi] Bronze -.->|Violaciones| Q[⚠️ Expectation Failures] style Source fill:#1e293b,stroke:#fff style Bronze fill:#78350f,stroke:#fff style Silver fill:#475569,stroke:#fff style Gold fill:#854d0e,stroke:#fff style Q fill:#7f1d1d,stroke:#fff,stroke-dasharray: 5 5

@dlt.table

Materializa el resultado como tabla Delta. Soporta streaming y batch.

@dlt.table(name="mi_tabla")
def mi_tabla(): ...

@dlt.view

Crea una vista temporal (no persiste). Ideal para transformaciones intermedias.

@dlt.view()
def staging_view(): ...

dlt.read_stream

Lee datos en modo streaming (incremental). Procesa solo datos nuevos.

dlt.read_stream("table")
dlt.read("batch_table")

03. Streaming vs Batch

DLT permite cambiar entre modos con una línea de código. La misma lógica funciona en ambos escenarios.

Streaming

INCREMENTAL
  • Procesa solo datos nuevos
  • Latencia ultra-baja (segundos)
  • Checkpoints automáticos
dlt.read_stream("source")

Batch

FULL REFRESH
  • Reprocesa todo el dataset
  • Ideal para dimensiones
  • Control total del estado
dlt.read("source")

04. Expectations: Calidad de Datos

expect

Monitorea violaciones pero permite que pasen los datos.

@dlt.expect("valid_email",
  "email IS NOT NULL")

expect_or_drop

Descarta registros que no cumplan la expectativa.

@dlt.expect_or_drop(
  "valid_amount",
  "amount > 0")

expect_or_fail

Detiene todo el pipeline si encuentra una sola violación.

@dlt.expect_or_fail(
  "no_duplicates",
  "COUNT(*) = COUNT(DISTINCT id)")
pipeline/quality_checks.py

@dlt.table(
    name="customers_silver",
    comment="Clientes validados con reglas de calidad"
)
@dlt.expect_or_drop("valid_email", "email RLIKE '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}$'")
@dlt.expect_or_drop("valid_age", "age >= 18 AND age <= 120")
@dlt.expect("has_phone", "phone IS NOT NULL")  # Solo alerta
def customers_silver():
    return (
        dlt.read_stream("customers_bronze")
        .select("id", "email", "age", "phone")
    )
                

05. Change Data Capture (CDC)

DLT simplifica CDC con dlt.apply_changes(). Captura inserciones, actualizaciones y eliminaciones desde un stream de eventos, manteniendo historial completo (SCD Type 2).

graph LR CDC[CDC Stream] -->|INSERT| T[Target Table] CDC -->|UPDATE| T CDC -->|DELETE| T T --> Hist[Historical Records] T --> Curr[Current Records] style CDC fill:#6366f1,stroke:#fff style T fill:#00A972,stroke:#fff style Hist fill:#475569,stroke:#fff style Curr fill:#854d0e,stroke:#fff

Tipos de SCD

  • SCD Type 1: Sobreescribe valores antiguos (no historial)
  • SCD Type 2: Mantiene historial completo con fechas de vigencia
pipeline/cdc_customers.py

import dlt

# Aplicar CDC en streaming
dlt.create_streaming_table("customers_scd2")

dlt.apply_changes(
    target="customers_scd2",
    source="customers_cdc_stream",
    keys=["customer_id"],
    sequence_by="event_timestamp",
    stored_as_scd_type=2,  # Type 2 = historial
    except_column_list=["_metadata"]
)

# Resultado: tabla con columnas automáticas:
# - __START_AT (fecha inicio vigencia)
# - __END_AT (fecha fin vigencia)
# - __CURRENT (bool: registro actual?)
                    

Caso de Uso Real

Un sistema de e-commerce actualiza datos de clientes constantemente (cambios de dirección, email, categoría). Con CDC, puedes responder: "¿Cuál era la categoría de este cliente el 15 de marzo?" consultando la tabla SCD2.

06. Monitoreo y Observabilidad

Event Log

DLT genera automáticamente una tabla event_log con métricas detalladas de cada ejecución.

SELECT
  timestamp,
  details:flow_name,
  details:num_output_rows
FROM event_log
WHERE event_type = 'flow_progress'

Expectation Metrics

Cada expect*() registra violaciones. Úsalas para alertas.

SELECT
  dataset,
  name AS expectation,
  failed_records
FROM event_log
WHERE event_type = 'flow_progress'
  AND failed_records > 0

Dashboard de Linaje

La UI de DLT muestra automáticamente el linaje completo: qué tablas dependen de cuáles, estado de cada transformación, y métricas en tiempo real.

99.7%
Success Rate
2.3s
Avg Latency
48
Active Tables

07. Reto de Ingeniería

Pipeline CDC de Órdenes con DLT

📋 Contexto del Problema

Tienes un sistema de órdenes que emite eventos cada vez que una orden cambia de estado: created, shipped, delivered, cancelled.

Los eventos llegan como JSON a una carpeta en S3. Necesitas construir un pipeline DLT que:

1
Bronze: Ingesta incremental (streaming) de los archivos JSON desde S3 a tabla Delta.
2
Silver: Valida que order_id no sea nulo y event_timestamp sea válido.
3
Gold (SCD Type 2): Tabla de órdenes con historial completo. Cada cambio de estado debe quedar registrado con fechas de vigencia.
4
Monitoreo: Query que identifique órdenes con más de 3 cambios de estado en las últimas 24 horas (anomalía).

🧪 Datos de Ejemplo


// s3://mi-bucket/ordenes/evento_1.json
{
  "order_id": "ORD-001",
  "customer_id": "CUST-456",
  "status": "created",
  "amount": 129.99,
  "event_timestamp": "2024-03-15T10:30:00Z"
}

// s3://mi-bucket/ordenes/evento_2.json
{
  "order_id": "ORD-001",
  "customer_id": "CUST-456",
  "status": "shipped",
  "amount": 129.99,
  "event_timestamp": "2024-03-16T14:22:00Z"
}
                    

✅ Criterios de Evaluación

Ingesta Incremental

Usa dlt.read_stream() con formato JSON.

Validaciones

Implementa al menos 2 expect_or_drop().

CDC Correcto

Usa apply_changes() con SCD Type 2.

Query de Anomalía

SQL que detecte órdenes con múltiples cambios rápidos.

🎯 Objetivo de Aprendizaje

Al completar este reto, habrás dominado: ingesta streaming con DLT, validación declarativa de calidad, implementación de SCD Type 2 con CDC, y consultas analíticas sobre datos históricos. Esto representa el 80% de casos de uso reales en ingeniería de datos moderna.