Fundamentos de ML Distribuido
Entiende cómo escalar álgebra lineal y cálculo estadístico desde un vector en memoria hasta particiones distribuidas en miles de nodos.
01. De Vectores a Particiones
📊 Matemática Clásica
import numpy as np
# Vector de 1 millón de elementos
x = np.random.randn(1_000_000)
# Calcular media
mu = np.mean(x) # Todo en memoria
⚙️ Ingeniería Distribuida
from pyspark.sql import functions as F
# DataFrame con 10 mil millones de filas
# Distribuido en 100 particiones
df = spark.read.parquet("s3://data.parquet")
# Calcular media (distribuido)
mu = df.select(F.avg("value")).collect()[0][0]
🔗 Por qué np.dot(A, B) no funciona con A de 100GB
NumPy asume que \(A\) y \(B\) están completos en memoria contigua. Con 100GB de datos, el sistema operativo empezará a hacer swap (usar disco como RAM), reduciendo velocidad 1000x. Spark mantiene particiones en disco y procesa solo lo necesario en cada momento.
02. MapReduce Explicado Matemáticamente
Ejemplo: Calcular Media μ = (1/n)Σxᵢ
🗺️ Fase MAP (Paralelo)
# En Worker 1 (Partición 1)
sum_p1 = 0
for x in partition_1:
sum_p1 += x
# sum_p1 = 12345.67
# En Worker 2 (Partición 2)
sum_p2 = 0
for x in partition_2:
sum_p2 += x
# sum_p2 = 23456.78
# ... (100 workers en total)
📉 Fase REDUCE (Secuencial)
# En el Driver (nodo maestro)
partial_sums = [
sum_p1, # 12345.67
sum_p2, # 23456.78
# ...
sum_p100 # 98765.43
]
total = sum(partial_sums)
n = 10_000_000_000
mu = total / n
print(f"Media: {mu}")
Implementación PySpark Real
from pyspark.sql import functions as F
# Dataset distribuido
df = spark.createDataFrame([(1,), (2,), (3,), (4,), (5,)], ["value"])
# Spark ejecuta MapReduce automáticamente
result = df.agg(
F.avg("value").alias("media"), # Media: (1+2+3+4+5)/5 = 3.0
F.sum("value").alias("suma"), # Suma: 15
F.stddev("value").alias("desv_std") # Desviación estándar
).collect()[0]
print(f"Media: {result['media']}") # 3.0
F.avg() ejecuta MapReduce internamente: suma parcial en cada partición, luego divide por n total.
03. Particiones y Paralelización
1TB de datos] --> R{Repartition
k=8} R --> P1[Partición 1
125GB] R --> P2[Partición 2
125GB] R --> P3[Partición 3
125GB] R --> P8[Partición 8
125GB] P1 --> C1[Core 1] P2 --> C2[Core 2] P3 --> C3[Core 3] P8 --> C8[Core 8] C1 --> Res[Resultado Final] C2 --> Res C3 --> Res C8 --> Res style DF fill:#E74C3C,stroke:#fff style Res fill:#27AE60,stroke:#fff
Ley de Amdahl (Speedup)
• \(T_1\): Tiempo con 1 core
• \(T_n\): Tiempo con n cores
• \(p\): Fracción paralelizable del código
Código: Reparticionamiento
# Ver particiones actuales
print(df.rdd.getNumPartitions()) # 200 (default)
# Reparticionar a 8 (para cluster pequeño)
df_optimized = df.repartition(8)
# O usar coalesce (sin shuffle, más rápido)
# Solo para reducir particiones
df_reduced = df.coalesce(4)
# Particionar por columna (útil para joins)
df_by_city = df.repartition("city")
⚙️ Cuándo reparticionar
• Cores ociosos
• Solución: Aumentar con
repartition()
• Particiones ≈ 2-4x número de cores
• Ideal para performance
• Overhead de scheduling
• Solución: Reducir con
coalesce()
04. Broadcast Variables
❌ Problema: Envío Repetido
θ coeficientes] -->|Copia| W1[Worker 1] D -->|Copia| W2[Worker 2] D -->|Copia| W3[Worker 3] D -->|Copia| W100[Worker 100] style D fill:#E74C3C,stroke:#fff
✅ Solución: Broadcast
θ broadcast] -->|1 copia| W1[Worker 1
Todas las tareas] D -->|1 copia| W2[Worker 2
Todas las tareas] D -->|1 copia| W3[Worker 3
Todas las tareas] D -->|1 copia| W100[Worker 100
Todas las tareas] style D fill:#27AE60,stroke:#fff
Código: Broadcast en PySpark
from pyspark.sql import functions as F
# Diccionario de parámetros de modelo
# (en la práctica, serían coeficientes de regresión)
model_params = {
"intercept": 2.5,
"coef_edad": 0.3,
"coef_salario": 0.0001
}
# Broadcast (enviar una sola vez a cada worker)
bc_params = spark.sparkContext.broadcast(model_params)
# Usar en UDF (User Defined Function)
def predict_udf(edad, salario):
params = bc_params.value # Acceder a broadcast
return (params["intercept"] +
params["coef_edad"] * edad +
params["coef_salario"] * salario)
# Registrar UDF
predict = F.udf(predict_udf)
# Aplicar a DataFrame (se ejecuta en workers)
df_with_predictions = df.withColumn(
"prediction",
predict(F.col("edad"), F.col("salario"))
)
bc_params.value accede al diccionario sin transferirlo 1000 veces. Cada worker lo tiene en caché.
05. Operaciones Matriciales Distribuidas
Ejemplo: Producto Matriz-Vector y = Ax + b
from pyspark.ml.linalg import Vectors, DenseVector
from pyspark.sql import functions as F
# DataFrame con features como vectores (cada fila = fila de A)
df = spark.createDataFrame([
(1, Vectors.dense([1.0, 2.0, 3.0])),
(2, Vectors.dense([4.0, 5.0, 6.0])),
(3, Vectors.dense([7.0, 8.0, 9.0]))
], ["id", "features"])
# Vector x (coeficientes) - broadcast
x = [0.5, 1.0, 1.5]
bc_x = spark.sparkContext.broadcast(x)
# UDF para producto punto: A_i · x
def dot_product(features):
x_vec = bc_x.value
return float(sum(f * x for f, x in zip(features, x_vec)))
dot_udf = F.udf(dot_product)
# Aplicar: y_i = A_i · x (cada fila en paralelo)
df_result = df.withColumn("y", dot_udf(F.col("features")))
df_result.show()
# +---+-------------+----+
# | id| features| y|
# +---+-------------+----+
# | 1|[1.0,2.0,3.0]| 7.0| # (1×0.5 + 2×1.0 + 3×1.5)
# | 2|[4.0,5.0,6.0]|16.0|
# | 3|[7.0,8.0,9.0]|25.0|
# +---+-------------+----+
🔗 Conexión con Regresión Lineal
En regresión, predecir \(\hat{y} = X\beta\) es exactamente este producto matriz-vector.
Spark ML hace esto internamente cuando llamas model.transform().
06. Persistencia y Caching
Sin Cache (Recomputación)
df = spark.read.parquet("data.parquet")
df_filtered = df.filter(col("age") > 18)
# Operación 1: Cuenta
count1 = df_filtered.count() # Lee y filtra
# Operación 2: Media
avg_salary = df_filtered.agg(
F.avg("salary")
).collect() # Lee y filtra OTRA VEZ
# Operación 3: Máximo
max_age = df_filtered.agg(
F.max("age")
).collect() # Lee y filtra OTRA VEZ
df_filtered se recalcula 3 veces porque Spark no persiste resultados por defecto.
Con Cache (Almacenamiento)
df = spark.read.parquet("data.parquet")
df_filtered = df.filter(col("age") > 18)
# Cachear en memoria
df_filtered.cache() # Marca para persistir
# Primera operación: Lee, filtra, y GUARDA en RAM
count1 = df_filtered.count()
# Segunda operación: Lee desde RAM (sin recomputar)
avg_salary = df_filtered.agg(
F.avg("salary")
).collect()
# Tercera operación: Lee desde RAM
max_age = df_filtered.agg(
F.max("age")
).collect()
# Liberar memoria cuando termines
df_filtered.unpersist()
df_filtered se calcula 1 sola vez, luego se lee desde memoria (mucho más rápido).
Niveles de Persistencia
from pyspark import StorageLevel
# Solo memoria (más rápido, pero puede fallar si no hay RAM)
df.persist(StorageLevel.MEMORY_ONLY)
# Memoria + Disco (si RAM llena, escribe a disco)
df.persist(StorageLevel.MEMORY_AND_DISK)
# Solo disco (para datasets enormes)
df.persist(StorageLevel.DISK_ONLY)
# Memoria serializada (comprime datos, ahorra RAM)
df.persist(StorageLevel.MEMORY_ONLY_SER)
# cache() es equivalente a MEMORY_AND_DISK
df.cache() # Shortcut
• Algoritmos iterativos (ML)
• Datos intermedios costosos de recomputar
• Datos muy grandes y poca RAM
• Transformaciones simples y baratas
07. Reto Conceptual
Gradiente Descendente Distribuido
📋 Problema
Implementa una iteración de gradiente descendente distribuido para regresión lineal:
Desafío: ¿Cómo calcular ∇J en paralelo?
Esqueleto de Código (para completar)
from pyspark.sql import functions as F
import numpy as np
# Parámetros iniciales
theta = np.array([0.0, 0.0, 0.0]) # p=3 features
alpha = 0.01 # Learning rate
for iteration in range(100):
# TODO 1: Broadcast theta
bc_theta = # ...
# TODO 2: Calcular gradiente parcial en cada partición
def compute_gradient(features, label):
# features: Vector de features
# label: y real
# Retornar: (error × features)
pass
# TODO 3: Reduce (agregar gradientes)
gradient = # ...
# TODO 4: Actualizar theta
theta = # ...
print(f"Iteration {iteration}: theta = {theta}")
🎯 Objetivo de Aprendizaje
Este ejercicio demuestra cómo algoritmos iterativos (ML) se ejecutan en Spark. Cada iteración requiere: broadcast de parámetros, map (computar local), reduce (agregar), y actualización. Es el núcleo de algoritmos como SGD, Adam, etc.