Construyendo un Cluster Spark desde Cero
Para dominar Spark, debes entender dónde se ejecuta. En este laboratorio, no usaremos imágenes prefabricadas. Construiremos nuestra propia imagen de Docker, instalando manualmente Java, Spark y Python para entender la dependencia entre la JVM y PySpark.
Stack Tecnológico del Taller
El Dockerfile Base
Vamos a crear una imagen "Maestra" que servirá para los tres roles: Master, Worker y Driver (Jupyter). Observa cómo instalamos Java explícitamente: PySpark es solo un "wrapper" de Python que habla con la JVM. Sin Java, Spark no existe.
# 1. Empezamos con un Linux mínimo con Python
FROM python:3.11-slim
# 2. Instalar Java (REQUISITO CRÍTICO)
# Spark corre sobre la JVM (Scala). PySpark necesita Java para funcionar.
RUN apt-get update && \
apt-get install -y default-jdk procps curl && \
apt-get clean
# 3. Configurar Variables de Entorno
ENV JAVA_HOME=/usr/lib/jvm/default-java
ENV SPARK_VERSION=3.5.0
ENV SPARK_HOME=/opt/spark
ENV PATH=$PATH:$SPARK_HOME/bin
# 4. Descargar e instalar Binarios de Spark
# Bajamos Spark compilado para Hadoop 3 desde los espejos de Apache
RUN curl -O https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz && \
tar xvf spark-${SPARK_VERSION}-bin-hadoop3.tgz && \
mv spark-${SPARK_VERSION}-bin-hadoop3 /opt/spark && \
rm spark-${SPARK_VERSION}-bin-hadoop3.tgz
# 5. Instalar librerías de Python
# delta-spark: Permite usar el formato Delta Lake
# jupyterlab: Nuestro entorno de desarrollo (Driver)
RUN pip install pyspark==${SPARK_VERSION} delta-spark==3.0.0 pandas jupyterlab
WORKDIR /app
# Por defecto, el contenedor intentará lanzar Jupyter.
# Pero en docker-compose sobreescribiremos esto para el Master y el Worker.
CMD ["jupyter", "lab", "--ip=0.0.0.0", "--allow-root", "--no-browser", "--NotebookApp.token=''"]
Orquestación del Cluster
Aquí definimos la topología de red. Usaremos la misma imagen construida arriba para levantar 3 servicios distintos. Nota cómo el Worker y Jupyter se conectan al Master usando su nombre de servicio DNS (`spark-master`).
version: '3'
services:
# --- NODO 1: EL MASTER (Gerente) ---
spark-master:
build: .
container_name: spark-master
# Sobreescribimos el CMD para iniciar el proceso Master de Spark
command: /opt/spark/sbin/start-master.sh
ports:
- "8080:8080" # UI Web para monitorear el cluster
- "7077:7077" # Puerto interno de comunicación Spark
# --- NODO 2: EL WORKER (Obrero) ---
spark-worker:
build: .
container_name: spark-worker
# Iniciamos el proceso Worker y le decimos dónde está el Master
command: /opt/spark/sbin/start-worker.sh spark://spark-master:7077
depends_on:
- spark-master
environment:
- SPARK_WORKER_CORES=2 # Limitamos recursos para simulación
- SPARK_WORKER_MEMORY=2G
# --- NODO 3: EL DRIVER (Arquitecto) ---
jupyter-lab:
build: .
container_name: jupyter-lab
# Usa el CMD por defecto del Dockerfile (Jupyter)
ports:
- "8888:8888" # UI para escribir código
- "4040:4040" # UI del Job actual
volumes:
- ./data:/app/data # Mapeamos datos locales al contenedor
- ./notebooks:/app/notebooks
environment:
- SPARK_MASTER_URL=spark://spark-master:7077
🚀 Despliegue
Ejecuta esto en tu terminal para levantar la infraestructura.
Arquitectura Medallion
No vamos a guardar archivos desordenados. Implementaremos un Lakehouse usando la arquitectura Medallion (Bronce, Plata, Oro) sobre Delta Lake.
Capa Bronce
Datos crudos ("As-is"). Copia fiel del origen pero en formato Delta de alto rendimiento. Historial completo.
Capa Plata
Datos limpios. Tipos corregidos, nulos manejados, duplicados eliminados. Fuente de verdad.
Capa Oro
Datos agregados. KPIs de negocio listos para dashboards (PowerBI, Tableau).
Ingesta: De CSV a Bronce
Abre Jupyter (`localhost:8888`). Primero, configuramos la sesión Spark para que soporte Delta Lake y se conecte a nuestro Master.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from delta import *
# URL del Master (definida en docker-compose)
master_url = "spark://spark-master:7077"
# Configuración: Añadimos los paquetes JAR de Delta Lake al classpath
builder = SparkSession.builder \
.appName("Lab_SECOP_Bronze") \
.master(master_url) \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.executor.memory", "1g")
# Iniciar Sesión
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# --- LECTURA ---
# Leemos el CSV mapeado en el volumen. inferSchema es lento pero útil para empezar.
df_raw = spark.read \
.format("csv") \
.option("header", "true") \
.option("delimiter", ",") \
.option("inferSchema", "true") \
.load("data/SECOP_II_Contratos_Electronicos.csv")
# --- ESCRITURA BRONCE ---
# Guardamos en Delta. Esto crea archivos Parquet + _delta_log
df_raw.write.format("delta").mode("overwrite").save("data/lakehouse/bronze/secop")
print(f"Ingesta completada. Registros: {df_raw.count()}")
Limpieza: De Bronce a Plata
Ahora leemos desde la capa Bronce (mucho más rápido que el CSV). Aplicaremos limpieza de nombres, corrección de tipos de datos y filtrado de basura.
# Leer desde Delta Bronce
df_bronze = spark.read.format("delta").load("data/lakehouse/bronze/secop")
# Transformaciones (Lazy - Se ejecutan al guardar)
df_silver = df_bronze \
.withColumnRenamed("Precio Base", "precio_base") \
.withColumnRenamed("Departamento", "departamento") \
.withColumnRenamed("Fecha de Firma", "fecha_firma_str") \
.withColumn("fecha_firma", to_date(col("fecha_firma_str"), "MM/dd/yyyy")) \
.filter(col("precio_base") > 0) \
.select("entidad", "departamento", "precio_base", "fecha_firma")
# Escritura en Plata
df_silver.write.format("delta").mode("overwrite").save("data/lakehouse/silver/secop")
print("Capa Plata generada.")
df_silver.show(5)
Analítica: Capa Oro
Respondemos preguntas de negocio. Aquí ocurre una transformación ancha (Shuffle) ya que agrupamos datos de todo el cluster.
from pyspark.sql.functions import sum, desc
# Agregación: Top departamentos por dinero contratado
df_gold = df_silver \
.groupBy("departamento") \
.agg(sum("precio_base").alias("total_contratado")) \
.orderBy(desc("total_contratado")) \
.limit(10)
# Persistir en Oro
df_gold.write.format("delta").mode("overwrite").save("data/lakehouse/gold/top_deptos")
# Convertir a Pandas para visualizar en Jupyter
print(df_gold.toPandas())
Resultado Esperado (Dashboard)
Reto Final: Auditoría de Datos
Usa tu cluster para encontrar anomalías.
Pregunta: ¿Cuál es el contrato individual más costoso registrado en la historia de tu departamento? ¿Es un error de digitación o un megaproyecto real?