Pipelines de Machine Learning
Construye flujos end-to-end componiendo transformaciones matemáticas como funciones encadenadas.
01. Concepto Matemático de Pipeline
Pipeline = Composición de Funciones
$$
h(\mathbf{x}) = f_n \circ f_{n-1} \circ \cdots \circ f_2 \circ f_1(\mathbf{x})
$$
Cada función \(f_i: \mathbb{R}^{p_i} \to \mathbb{R}^{p_{i+1}}\) transforma el espacio de features.
Ejemplo 1: Normalizar
$$
f_1(\mathbf{x}) = \frac{\mathbf{x} - \boldsymbol{\mu}}{\boldsymbol{\sigma}}
$$
Entrada: \(\mathbf{x} \in \mathbb{R}^p\)
Salida: \(\mathbf{z} \in \mathbb{R}^p\) (normalizado)
Salida: \(\mathbf{z} \in \mathbb{R}^p\) (normalizado)
Ejemplo 2: PCA
$$
f_2(\mathbf{z}) = \mathbf{W}^T \mathbf{z}
$$
Entrada: \(\mathbf{z} \in \mathbb{R}^p\)
Salida: \(\mathbf{z}' \in \mathbb{R}^k\) (k componentes)
Salida: \(\mathbf{z}' \in \mathbb{R}^k\) (k componentes)
Ejemplo 3: Regresión
$$
f_3(\mathbf{z}') = \boldsymbol{\beta}^T \mathbf{z}'
$$
Entrada: \(\mathbf{z}' \in \mathbb{R}^k\)
Salida: \(\hat{y} \in \mathbb{R}\) (predicción)
Salida: \(\hat{y} \in \mathbb{R}\) (predicción)
graph LR
X[📊 x raw] -->|f₁: Normalizar| Z[z normalized]
Z -->|f₂: PCA| ZP[z' reduced]
ZP -->|f₃: Regresión| Y[ŷ prediction]
style X fill:#3b82f6,stroke:#fff
style Y fill:#10b981,stroke:#fff
Código: Pipeline en PySpark
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml.regression import LinearRegression
# Definir etapas (stages)
assembler = VectorAssembler(
inputCols=["edad", "salario", "antiguedad"],
outputCol="features_raw"
)
scaler = StandardScaler(
inputCol="features_raw",
outputCol="features_scaled"
)
pca = PCA(
k=2, # Reducir a 2 dimensiones
inputCol="features_scaled",
outputCol="features_pca"
)
lr = LinearRegression(
featuresCol="features_pca",
labelCol="target"
)
# Crear pipeline (composición h = f₃ ∘ f₂ ∘ f₁)
pipeline = Pipeline(stages=[assembler, scaler, pca, lr])
# Entrenar (fit ejecuta todas las etapas en secuencia)
model = pipeline.fit(train_df)
# Predecir (transform aplica toda la cadena)
predictions = model.transform(test_df)
💡
pipeline.fit() entrena cada etapa usando la salida de la anterior. model.transform() aplica todas las transformaciones automáticamente.
02. VectorAssembler: De Columnas a Vectores
Matemática: Construcción de Matriz
$$
\mathbf{X} = [\mathbf{x}_1 \; \mathbf{x}_2 \; \cdots \; \mathbf{x}_p]
$$
Donde cada \(\mathbf{x}_j\) es una columna del DataFrame.
Spark ML requiere que todas las features estén en una sola columna como vector. VectorAssembler "apila" columnas horizontalmente.
Antes:
| edad | salario | antiguedad |
|------|---------|------------|
| 25 | 50000 | 2 |
Después:
| features |
|--------------------|
| [25.0, 50000.0, 2.0] |
| edad | salario | antiguedad |
|------|---------|------------|
| 25 | 50000 | 2 |
Después:
| features |
|--------------------|
| [25.0, 50000.0, 2.0] |
Ingeniería: Código PySpark
from pyspark.ml.feature import VectorAssembler
# DataFrame con columnas separadas
df = spark.createDataFrame([
(25, 50000, 2, 1),
(30, 60000, 5, 0),
], ["edad", "salario", "antiguedad", "target"])
# Ensamblar features
assembler = VectorAssembler(
inputCols=["edad", "salario", "antiguedad"],
outputCol="features" # Nueva columna vector
)
df_assembled = assembler.transform(df)
df_assembled.select("features", "target").show(truncate=False)
# +-------------------+------+
# |features |target|
# +-------------------+------+
# |[25.0,50000.0,2.0] |1 |
# |[30.0,60000.0,5.0] |0 |
# +-------------------+------+
⚠️
VectorAssembler es siempre la primera etapa en un pipeline de ML.
Dense vs Sparse Vectors
Dense Vector
DenseVector([1.0, 2.0, 3.0])
Almacena todos los valores. Usa más memoria pero es más rápido para vectores pequeños.
Sparse Vector
SparseVector(1000, [0, 5], [1.0, 2.0])
Solo almacena índices no-cero. Ideal para one-hot encoding (mayoría ceros).
03. Transformers vs Estimators
Transformer
$$
T: \mathcal{X} \to \mathcal{Y} \quad \text{(función determinista)}
$$
No necesita entrenamiento. Aplica transformación directamente.
Método principal: transform(df)
Ejemplos:
•
VectorAssembler - Combina columnas•
Normalizer - Divide por norma L2•
Binarizer - Convierte a 0/1•
SQLTransformer - Ejecuta SQLEstimator
$$
E: \mathcal{D} \to \mathcal{M} \quad \text{(optimización)}
$$
Requiere entrenamiento con datos. Produce un modelo (Transformer).
Métodos principales: fit(df) → transform(df)
Ejemplos:
•
StandardScaler - Aprende μ, σ•
PCA - Calcula eigenvectors•
LinearRegression - Optimiza β•
StringIndexer - Mapea stringsEjemplo: Flujo Completo
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
# 1. Transformer (no necesita fit)
assembler = VectorAssembler(
inputCols=["x1", "x2"],
outputCol="features_raw"
)
df_assembled = assembler.transform(df) # Directo
# 2. Estimator (necesita fit para aprender μ, σ)
scaler = StandardScaler(
inputCol="features_raw",
outputCol="features_scaled"
)
scaler_model = scaler.fit(df_assembled) # Aprende parámetros
df_scaled = scaler_model.transform(df_assembled) # Aplica
# 3. Estimator (optimiza β)
lr = LinearRegression(
featuresCol="features_scaled",
labelCol="label"
)
lr_model = lr.fit(df_scaled) # Entrena
predictions = lr_model.transform(test_df) # Predice
04. StandardScaler: Normalización Z-Score
Matemática: Z-Score
$$
z_i = \frac{x_i - \mu}{\sigma}, \quad
\mu = \frac{1}{n}\sum_{i=1}^{n} x_i, \quad
\sigma = \sqrt{\frac{1}{n-1}\sum_{i=1}^{n}(x_i - \mu)^2}
$$
Transforma cada feature a media 0 y desviación estándar 1.
❓ ¿Por qué normalizar?
Problema: Si
edad ∈ [20, 60] y salario ∈ [30000, 100000],
salario domina el cálculo de distancias porque su escala es 1000x mayor.
Solución: Normalizar pone todas las features en la misma escala [-3, 3] aproximadamente.
⚙️ Implementación Distribuida
Fase 1 (Map): Calcular suma parcial \(\sum_{i \in P_j} x_i\) en cada partición.
Fase 2 (Reduce): Agregar sumas → \(\mu = \frac{\sum}{n}\).
Fase 3 (Map): Calcular \((x_i - \mu)^2\) en cada partición.
Fase 4 (Reduce): Agregar → \(\sigma = \sqrt{\sum(x_i - \mu)^2 / (n-1)}\).
Código: StandardScaler en PySpark
from pyspark.ml.feature import VectorAssembler, StandardScaler
# Dataset ejemplo
df = spark.createDataFrame([
(25, 50000),
(30, 60000),
(35, 70000),
(40, 80000)
], ["edad", "salario"])
# Paso 1: Ensamblar features
assembler = VectorAssembler(
inputCols=["edad", "salario"],
outputCol="features"
)
df_assembled = assembler.transform(df)
# Paso 2: Normalizar
scaler = StandardScaler(
inputCol="features",
outputCol="features_scaled",
withMean=True, # Centrar en media 0
withStd=True # Escalar a std 1
)
# fit() aprende μ y σ del training set
scaler_model = scaler.fit(df_assembled)
# transform() aplica z = (x - μ) / σ
df_scaled = scaler_model.transform(df_assembled)
df_scaled.select("features", "features_scaled").show(truncate=False)
# +----------------+----------------------------------------------------+
# |features |features_scaled |
# +----------------+----------------------------------------------------+
# |[25.0,50000.0] |[-1.3416..., -1.3416...] # Ambos ~-1.34 (normalizados)|
# |[30.0,60000.0] |[-0.4472..., -0.4472...] # Ahora en misma escala |
# |[35.0,70000.0] |[0.4472..., 0.4472...] |
# |[40.0,80000.0] |[1.3416..., 1.3416...] |
# +----------------+----------------------------------------------------+
⚠️ Importante: Usa los mismos μ y σ del training set para test set (
scaler_model.transform(test_df)). No reajustes en test.
05. PCA: Reducción Dimensional
Matemática: Autovectores de la Covarianza
$$
\begin{aligned}
\boldsymbol{\Sigma} &= \frac{1}{n} \mathbf{X}^T \mathbf{X} \quad \text{(matriz de covarianza)} \\[0.5em]
\boldsymbol{\Sigma} \mathbf{v}_i &= \lambda_i \mathbf{v}_i \quad \text{(eigenvectors)} \\[0.5em]
\mathbf{X}_{\text{reduced}} &= \mathbf{X} \mathbf{W}, \quad \mathbf{W} = [\mathbf{v}_1 \; \mathbf{v}_2 \; \cdots \; \mathbf{v}_k]
\end{aligned}
$$
PCA encuentra las \(k\) direcciones de máxima varianza. Proyectar en esas direcciones reduce dimensionalidad de \(p\) a \(k\).
❓ ¿Por qué reducir dimensiones?
- Velocidad: Menos features → menos cómputo (regresión de \(p=1000\) a \(k=50\)).
- Visualización: Proyectar a 2D/3D para graficar.
- Regularización: Eliminar ruido y colinealidad.
⚙️ Problema Distribuido
Desafío: Calcular \(\boldsymbol{\Sigma}\) requiere \(\mathbf{X}^T \mathbf{X}\).
Si \(p = 10,000\), entonces \(\boldsymbol{\Sigma}\) es 10,000 × 10,000.
Solución: Spark usa SVD (Singular Value Decomposition) distribuido, más eficiente que calcular eigenvectors directamente.
Código: PCA en PySpark
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
# Dataset con 5 features
df = spark.createDataFrame([
(Vectors.dense([1.0, 0.0, 3.0, 0.0, 5.0]),),
(Vectors.dense([2.0, 1.0, 4.0, 1.0, 6.0]),),
(Vectors.dense([3.0, 2.0, 5.0, 2.0, 7.0]),)
], ["features"])
# Reducir de 5 a 2 componentes principales
pca = PCA(
k=2, # Número de componentes
inputCol="features",
outputCol="pca_features"
)
# fit() calcula eigenvectors de Σ
pca_model = pca.fit(df)
# Mostrar varianza explicada
print("Varianza explicada:", pca_model.explainedVariance)
# DenseVector([0.92, 0.05]) # 92% + 5% = 97% de varianza total
# transform() proyecta X en los 2 eigenvectors principales
df_pca = pca_model.transform(df)
df_pca.select("features", "pca_features").show(truncate=False)
# +--------------------+-------------------+
# |features |pca_features |
# +--------------------+-------------------+
# |[1.0,0.0,3.0,0.0,5.0]|[-4.47, 0.16] | # 5D → 2D
# |[2.0,1.0,4.0,1.0,6.0]|[-3.14, 0.11] |
# |[3.0,2.0,5.0,2.0,7.0]|[-1.81, 0.06] |
# +--------------------+-------------------+
💡
explainedVariance indica cuánta información retienes. 0.92 = 92% de varianza. Ajusta \(k\) según cuánto quieras comprimir.
06. Pipeline Completo (End-to-End)
Ejemplo: Predicción de Salario
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
# Cargar datos
df = spark.read.csv("salarios.csv", header=True, inferSchema=True)
# Split train/test (80/20)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# ======================================
# Etapa 1: Ensamblar features
# ======================================
assembler = VectorAssembler(
inputCols=["edad", "antiguedad", "educacion_años", "horas_semana"],
outputCol="features_raw"
)
# ======================================
# Etapa 2: Normalizar (z-score)
# ======================================
scaler = StandardScaler(
inputCol="features_raw",
outputCol="features_scaled",
withMean=True,
withStd=True
)
# ======================================
# Etapa 3: Reducir dimensiones (PCA)
# ======================================
pca = PCA(
k=3, # 4 features → 3 componentes
inputCol="features_scaled",
outputCol="features_pca"
)
# ======================================
# Etapa 4: Regresión Lineal
# ======================================
lr = LinearRegression(
featuresCol="features_pca",
labelCol="salario",
maxIter=100,
regParam=0.01 # Regularización L2
)
# ======================================
# Crear Pipeline (composición)
# ======================================
pipeline = Pipeline(stages=[assembler, scaler, pca, lr])
# ======================================
# Entrenar (fit ejecuta todas las etapas)
# ======================================
print("Entrenando pipeline...")
model = pipeline.fit(train_df)
# ======================================
# Predecir en test set
# ======================================
predictions = model.transform(test_df)
# ======================================
# Evaluar (RMSE)
# ======================================
evaluator = RegressionEvaluator(
labelCol="salario",
predictionCol="prediction",
metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse:.2f}")
# ======================================
# Guardar modelo entrenado
# ======================================
model.save("models/salary_pipeline")
✅ Ventajas del Pipeline
- • Código limpio y modular
- • Evita data leakage (μ, σ del train)
- • Fácil de serializar y desplegar
📦 Despliegue
- •
model.save(path)guarda todo - •
PipelineModel.load(path) - •
model.transform(new_data)
🔍 Inspección
- •
model.stages[3]→ LR model - •
.coefficients→ β - •
.explainedVariance(PCA)