SECCIÓN 13

MÓDULO 2.Pipelines()

COMPOSICIÓN DE TRANSFORMACIONES

Pipelines de Machine Learning

Construye flujos end-to-end componiendo transformaciones matemáticas como funciones encadenadas.

01. Concepto Matemático de Pipeline

Pipeline = Composición de Funciones

$$ h(\mathbf{x}) = f_n \circ f_{n-1} \circ \cdots \circ f_2 \circ f_1(\mathbf{x}) $$
Cada función \(f_i: \mathbb{R}^{p_i} \to \mathbb{R}^{p_{i+1}}\) transforma el espacio de features.
Ejemplo 1: Normalizar
$$ f_1(\mathbf{x}) = \frac{\mathbf{x} - \boldsymbol{\mu}}{\boldsymbol{\sigma}} $$
Entrada: \(\mathbf{x} \in \mathbb{R}^p\)
Salida: \(\mathbf{z} \in \mathbb{R}^p\) (normalizado)
Ejemplo 2: PCA
$$ f_2(\mathbf{z}) = \mathbf{W}^T \mathbf{z} $$
Entrada: \(\mathbf{z} \in \mathbb{R}^p\)
Salida: \(\mathbf{z}' \in \mathbb{R}^k\) (k componentes)
Ejemplo 3: Regresión
$$ f_3(\mathbf{z}') = \boldsymbol{\beta}^T \mathbf{z}' $$
Entrada: \(\mathbf{z}' \in \mathbb{R}^k\)
Salida: \(\hat{y} \in \mathbb{R}\) (predicción)
graph LR X[📊 x raw] -->|f₁: Normalizar| Z[z normalized] Z -->|f₂: PCA| ZP[z' reduced] ZP -->|f₃: Regresión| Y[ŷ prediction] style X fill:#3b82f6,stroke:#fff style Y fill:#10b981,stroke:#fff

Código: Pipeline en PySpark

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml.regression import LinearRegression

# Definir etapas (stages)
assembler = VectorAssembler(
    inputCols=["edad", "salario", "antiguedad"],
    outputCol="features_raw"
)

scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features_scaled"
)

pca = PCA(
    k=2,  # Reducir a 2 dimensiones
    inputCol="features_scaled",
    outputCol="features_pca"
)

lr = LinearRegression(
    featuresCol="features_pca",
    labelCol="target"
)

# Crear pipeline (composición h = f₃ ∘ f₂ ∘ f₁)
pipeline = Pipeline(stages=[assembler, scaler, pca, lr])

# Entrenar (fit ejecuta todas las etapas en secuencia)
model = pipeline.fit(train_df)

# Predecir (transform aplica toda la cadena)
predictions = model.transform(test_df)
💡 pipeline.fit() entrena cada etapa usando la salida de la anterior. model.transform() aplica todas las transformaciones automáticamente.

02. VectorAssembler: De Columnas a Vectores

Matemática: Construcción de Matriz

$$ \mathbf{X} = [\mathbf{x}_1 \; \mathbf{x}_2 \; \cdots \; \mathbf{x}_p] $$
Donde cada \(\mathbf{x}_j\) es una columna del DataFrame.

Spark ML requiere que todas las features estén en una sola columna como vector. VectorAssembler "apila" columnas horizontalmente.

Antes:
| edad | salario | antiguedad |
|------|---------|------------|
| 25 | 50000 | 2 |

Después:
| features |
|--------------------|
| [25.0, 50000.0, 2.0] |

Ingeniería: Código PySpark

from pyspark.ml.feature import VectorAssembler

# DataFrame con columnas separadas
df = spark.createDataFrame([
    (25, 50000, 2, 1),
    (30, 60000, 5, 0),
], ["edad", "salario", "antiguedad", "target"])

# Ensamblar features
assembler = VectorAssembler(
    inputCols=["edad", "salario", "antiguedad"],
    outputCol="features"  # Nueva columna vector
)

df_assembled = assembler.transform(df)
df_assembled.select("features", "target").show(truncate=False)

# +-------------------+------+
# |features           |target|
# +-------------------+------+
# |[25.0,50000.0,2.0] |1     |
# |[30.0,60000.0,5.0] |0     |
# +-------------------+------+
⚠️ VectorAssembler es siempre la primera etapa en un pipeline de ML.

Dense vs Sparse Vectors

Dense Vector
DenseVector([1.0, 2.0, 3.0])
Almacena todos los valores. Usa más memoria pero es más rápido para vectores pequeños.
Sparse Vector
SparseVector(1000, [0, 5], [1.0, 2.0])
Solo almacena índices no-cero. Ideal para one-hot encoding (mayoría ceros).

03. Transformers vs Estimators

Transformer

$$ T: \mathcal{X} \to \mathcal{Y} \quad \text{(función determinista)} $$
No necesita entrenamiento. Aplica transformación directamente.

Método principal: transform(df)

Ejemplos:
VectorAssembler - Combina columnas
Normalizer - Divide por norma L2
Binarizer - Convierte a 0/1
SQLTransformer - Ejecuta SQL

Estimator

$$ E: \mathcal{D} \to \mathcal{M} \quad \text{(optimización)} $$
Requiere entrenamiento con datos. Produce un modelo (Transformer).

Métodos principales: fit(df)transform(df)

Ejemplos:
StandardScaler - Aprende μ, σ
PCA - Calcula eigenvectors
LinearRegression - Optimiza β
StringIndexer - Mapea strings

Ejemplo: Flujo Completo

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression

# 1. Transformer (no necesita fit)
assembler = VectorAssembler(
    inputCols=["x1", "x2"],
    outputCol="features_raw"
)
df_assembled = assembler.transform(df)  # Directo

# 2. Estimator (necesita fit para aprender μ, σ)
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features_scaled"
)
scaler_model = scaler.fit(df_assembled)  # Aprende parámetros
df_scaled = scaler_model.transform(df_assembled)  # Aplica

# 3. Estimator (optimiza β)
lr = LinearRegression(
    featuresCol="features_scaled",
    labelCol="label"
)
lr_model = lr.fit(df_scaled)  # Entrena
predictions = lr_model.transform(test_df)  # Predice

04. StandardScaler: Normalización Z-Score

Matemática: Z-Score

$$ z_i = \frac{x_i - \mu}{\sigma}, \quad \mu = \frac{1}{n}\sum_{i=1}^{n} x_i, \quad \sigma = \sqrt{\frac{1}{n-1}\sum_{i=1}^{n}(x_i - \mu)^2} $$
Transforma cada feature a media 0 y desviación estándar 1.
❓ ¿Por qué normalizar?
Problema: Si edad ∈ [20, 60] y salario ∈ [30000, 100000], salario domina el cálculo de distancias porque su escala es 1000x mayor.
Solución: Normalizar pone todas las features en la misma escala [-3, 3] aproximadamente.
⚙️ Implementación Distribuida
Fase 1 (Map): Calcular suma parcial \(\sum_{i \in P_j} x_i\) en cada partición.
Fase 2 (Reduce): Agregar sumas → \(\mu = \frac{\sum}{n}\).
Fase 3 (Map): Calcular \((x_i - \mu)^2\) en cada partición.
Fase 4 (Reduce): Agregar → \(\sigma = \sqrt{\sum(x_i - \mu)^2 / (n-1)}\).

Código: StandardScaler en PySpark

from pyspark.ml.feature import VectorAssembler, StandardScaler

# Dataset ejemplo
df = spark.createDataFrame([
    (25, 50000),
    (30, 60000),
    (35, 70000),
    (40, 80000)
], ["edad", "salario"])

# Paso 1: Ensamblar features
assembler = VectorAssembler(
    inputCols=["edad", "salario"],
    outputCol="features"
)
df_assembled = assembler.transform(df)

# Paso 2: Normalizar
scaler = StandardScaler(
    inputCol="features",
    outputCol="features_scaled",
    withMean=True,   # Centrar en media 0
    withStd=True     # Escalar a std 1
)

# fit() aprende μ y σ del training set
scaler_model = scaler.fit(df_assembled)

# transform() aplica z = (x - μ) / σ
df_scaled = scaler_model.transform(df_assembled)

df_scaled.select("features", "features_scaled").show(truncate=False)

# +----------------+----------------------------------------------------+
# |features        |features_scaled                                      |
# +----------------+----------------------------------------------------+
# |[25.0,50000.0]  |[-1.3416..., -1.3416...]  # Ambos ~-1.34 (normalizados)|
# |[30.0,60000.0]  |[-0.4472..., -0.4472...]  # Ahora en misma escala     |
# |[35.0,70000.0]  |[0.4472..., 0.4472...]                               |
# |[40.0,80000.0]  |[1.3416..., 1.3416...]                               |
# +----------------+----------------------------------------------------+
⚠️ Importante: Usa los mismos μ y σ del training set para test set (scaler_model.transform(test_df)). No reajustes en test.

05. PCA: Reducción Dimensional

Matemática: Autovectores de la Covarianza

$$ \begin{aligned} \boldsymbol{\Sigma} &= \frac{1}{n} \mathbf{X}^T \mathbf{X} \quad \text{(matriz de covarianza)} \\[0.5em] \boldsymbol{\Sigma} \mathbf{v}_i &= \lambda_i \mathbf{v}_i \quad \text{(eigenvectors)} \\[0.5em] \mathbf{X}_{\text{reduced}} &= \mathbf{X} \mathbf{W}, \quad \mathbf{W} = [\mathbf{v}_1 \; \mathbf{v}_2 \; \cdots \; \mathbf{v}_k] \end{aligned} $$
PCA encuentra las \(k\) direcciones de máxima varianza. Proyectar en esas direcciones reduce dimensionalidad de \(p\) a \(k\).
❓ ¿Por qué reducir dimensiones?
  • Velocidad: Menos features → menos cómputo (regresión de \(p=1000\) a \(k=50\)).
  • Visualización: Proyectar a 2D/3D para graficar.
  • Regularización: Eliminar ruido y colinealidad.
⚙️ Problema Distribuido
Desafío: Calcular \(\boldsymbol{\Sigma}\) requiere \(\mathbf{X}^T \mathbf{X}\). Si \(p = 10,000\), entonces \(\boldsymbol{\Sigma}\) es 10,000 × 10,000.
Solución: Spark usa SVD (Singular Value Decomposition) distribuido, más eficiente que calcular eigenvectors directamente.

Código: PCA en PySpark

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

# Dataset con 5 features
df = spark.createDataFrame([
    (Vectors.dense([1.0, 0.0, 3.0, 0.0, 5.0]),),
    (Vectors.dense([2.0, 1.0, 4.0, 1.0, 6.0]),),
    (Vectors.dense([3.0, 2.0, 5.0, 2.0, 7.0]),)
], ["features"])

# Reducir de 5 a 2 componentes principales
pca = PCA(
    k=2,  # Número de componentes
    inputCol="features",
    outputCol="pca_features"
)

# fit() calcula eigenvectors de Σ
pca_model = pca.fit(df)

# Mostrar varianza explicada
print("Varianza explicada:", pca_model.explainedVariance)
# DenseVector([0.92, 0.05])  # 92% + 5% = 97% de varianza total

# transform() proyecta X en los 2 eigenvectors principales
df_pca = pca_model.transform(df)

df_pca.select("features", "pca_features").show(truncate=False)

# +--------------------+-------------------+
# |features            |pca_features       |
# +--------------------+-------------------+
# |[1.0,0.0,3.0,0.0,5.0]|[-4.47, 0.16]      |  # 5D → 2D
# |[2.0,1.0,4.0,1.0,6.0]|[-3.14, 0.11]      |
# |[3.0,2.0,5.0,2.0,7.0]|[-1.81, 0.06]      |
# +--------------------+-------------------+
💡 explainedVariance indica cuánta información retienes. 0.92 = 92% de varianza. Ajusta \(k\) según cuánto quieras comprimir.

06. Pipeline Completo (End-to-End)

Ejemplo: Predicción de Salario

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Cargar datos
df = spark.read.csv("salarios.csv", header=True, inferSchema=True)

# Split train/test (80/20)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# ======================================
# Etapa 1: Ensamblar features
# ======================================
assembler = VectorAssembler(
    inputCols=["edad", "antiguedad", "educacion_años", "horas_semana"],
    outputCol="features_raw"
)

# ======================================
# Etapa 2: Normalizar (z-score)
# ======================================
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features_scaled",
    withMean=True,
    withStd=True
)

# ======================================
# Etapa 3: Reducir dimensiones (PCA)
# ======================================
pca = PCA(
    k=3,  # 4 features → 3 componentes
    inputCol="features_scaled",
    outputCol="features_pca"
)

# ======================================
# Etapa 4: Regresión Lineal
# ======================================
lr = LinearRegression(
    featuresCol="features_pca",
    labelCol="salario",
    maxIter=100,
    regParam=0.01  # Regularización L2
)

# ======================================
# Crear Pipeline (composición)
# ======================================
pipeline = Pipeline(stages=[assembler, scaler, pca, lr])

# ======================================
# Entrenar (fit ejecuta todas las etapas)
# ======================================
print("Entrenando pipeline...")
model = pipeline.fit(train_df)

# ======================================
# Predecir en test set
# ======================================
predictions = model.transform(test_df)

# ======================================
# Evaluar (RMSE)
# ======================================
evaluator = RegressionEvaluator(
    labelCol="salario",
    predictionCol="prediction",
    metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse:.2f}")

# ======================================
# Guardar modelo entrenado
# ======================================
model.save("models/salary_pipeline")
✅ Ventajas del Pipeline
  • • Código limpio y modular
  • • Evita data leakage (μ, σ del train)
  • • Fácil de serializar y desplegar
📦 Despliegue
  • model.save(path) guarda todo
  • PipelineModel.load(path)
  • model.transform(new_data)
🔍 Inspección
  • model.stages[3] → LR model
  • .coefficients → β
  • .explainedVariance (PCA)