Delta Live Tables
Pipelines declarativos de datos que se autoconstruyen, autovalidan y autorrecuperan.
Define qué quieres lograr, no cómo hacerlo. DLT maneja el resto.
01. ¿Qué es DLT?
Delta Live Tables (DLT) es el framework de Databricks para construir pipelines de datos confiables, escalables y mantenibles. Transforma código imperativo complejo en declaraciones SQL o Python simples.
-
DAG Automático
Define tablas con
@dlt.table. DLT calcula automáticamente dependencias y orden de ejecución. -
Calidad Incorporada
Usa
expect()para definir reglas. DLT monitorea y alerta violaciones automáticamente. -
Reintento Inteligente
Si falla una tabla, DLT reintenta solo lo necesario, sin reprocesar todo el pipeline.
import dlt
from pyspark.sql import functions as F
# Define una tabla en streaming
@dlt.table(
name="ventas_silver",
comment="Ventas limpias en tiempo real"
)
@dlt.expect_or_drop("valid_amount", "amount > 0")
def ventas_silver():
return (
dlt.read_stream("ventas_bronze")
.withColumn("processed_at", F.current_timestamp())
)
02. Anatomía de un Pipeline DLT
@dlt.table
Materializa el resultado como tabla Delta. Soporta streaming y batch.
@dlt.table(name="mi_tabla")
def mi_tabla(): ...
@dlt.view
Crea una vista temporal (no persiste). Ideal para transformaciones intermedias.
@dlt.view()
def staging_view(): ...
dlt.read_stream
Lee datos en modo streaming (incremental). Procesa solo datos nuevos.
dlt.read_stream("table")
dlt.read("batch_table")
03. Streaming vs Batch
DLT permite cambiar entre modos con una línea de código. La misma lógica funciona en ambos escenarios.
Streaming
INCREMENTAL- Procesa solo datos nuevos
- Latencia ultra-baja (segundos)
- Checkpoints automáticos
dlt.read_stream("source")
Batch
FULL REFRESH- Reprocesa todo el dataset
- Ideal para dimensiones
- Control total del estado
dlt.read("source")
04. Expectations: Calidad de Datos
expect
Monitorea violaciones pero permite que pasen los datos.
@dlt.expect("valid_email",
"email IS NOT NULL")
expect_or_drop
Descarta registros que no cumplan la expectativa.
@dlt.expect_or_drop(
"valid_amount",
"amount > 0")
expect_or_fail
Detiene todo el pipeline si encuentra una sola violación.
@dlt.expect_or_fail(
"no_duplicates",
"COUNT(*) = COUNT(DISTINCT id)")
@dlt.table(
name="customers_silver",
comment="Clientes validados con reglas de calidad"
)
@dlt.expect_or_drop("valid_email", "email RLIKE '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}$'")
@dlt.expect_or_drop("valid_age", "age >= 18 AND age <= 120")
@dlt.expect("has_phone", "phone IS NOT NULL") # Solo alerta
def customers_silver():
return (
dlt.read_stream("customers_bronze")
.select("id", "email", "age", "phone")
)
05. Change Data Capture (CDC)
DLT simplifica CDC con dlt.apply_changes().
Captura inserciones, actualizaciones y eliminaciones desde un stream de eventos, manteniendo historial completo (SCD Type 2).
Tipos de SCD
- SCD Type 1: Sobreescribe valores antiguos (no historial)
- SCD Type 2: Mantiene historial completo con fechas de vigencia
import dlt
# Aplicar CDC en streaming
dlt.create_streaming_table("customers_scd2")
dlt.apply_changes(
target="customers_scd2",
source="customers_cdc_stream",
keys=["customer_id"],
sequence_by="event_timestamp",
stored_as_scd_type=2, # Type 2 = historial
except_column_list=["_metadata"]
)
# Resultado: tabla con columnas automáticas:
# - __START_AT (fecha inicio vigencia)
# - __END_AT (fecha fin vigencia)
# - __CURRENT (bool: registro actual?)
Caso de Uso Real
Un sistema de e-commerce actualiza datos de clientes constantemente (cambios de dirección, email, categoría). Con CDC, puedes responder: "¿Cuál era la categoría de este cliente el 15 de marzo?" consultando la tabla SCD2.
06. Monitoreo y Observabilidad
Event Log
DLT genera automáticamente una tabla event_log con métricas detalladas de cada ejecución.
SELECT
timestamp,
details:flow_name,
details:num_output_rows
FROM event_log
WHERE event_type = 'flow_progress'
Expectation Metrics
Cada expect*() registra violaciones. Úsalas para alertas.
SELECT
dataset,
name AS expectation,
failed_records
FROM event_log
WHERE event_type = 'flow_progress'
AND failed_records > 0
Dashboard de Linaje
La UI de DLT muestra automáticamente el linaje completo: qué tablas dependen de cuáles, estado de cada transformación, y métricas en tiempo real.
07. Reto de Ingeniería
Pipeline CDC de Órdenes con DLT
📋 Contexto del Problema
Tienes un sistema de órdenes que emite eventos cada vez que una orden cambia de estado:
created, shipped, delivered, cancelled.
Los eventos llegan como JSON a una carpeta en S3. Necesitas construir un pipeline DLT que:
order_id no sea nulo y event_timestamp sea válido.
🧪 Datos de Ejemplo
// s3://mi-bucket/ordenes/evento_1.json
{
"order_id": "ORD-001",
"customer_id": "CUST-456",
"status": "created",
"amount": 129.99,
"event_timestamp": "2024-03-15T10:30:00Z"
}
// s3://mi-bucket/ordenes/evento_2.json
{
"order_id": "ORD-001",
"customer_id": "CUST-456",
"status": "shipped",
"amount": 129.99,
"event_timestamp": "2024-03-16T14:22:00Z"
}
✅ Criterios de Evaluación
Usa dlt.read_stream() con formato JSON.
Implementa al menos 2 expect_or_drop().
Usa apply_changes() con SCD Type 2.
SQL que detecte órdenes con múltiples cambios rápidos.
🎯 Objetivo de Aprendizaje
Al completar este reto, habrás dominado: ingesta streaming con DLT, validación declarativa de calidad, implementación de SCD Type 2 con CDC, y consultas analíticas sobre datos históricos. Esto representa el 80% de casos de uso reales en ingeniería de datos moderna.