SECCIÓN 13

MÓDULO 1.Fundamentos()

ÁLGEBRA LINEAL EN UN CLUSTER

Fundamentos de ML Distribuido

Entiende cómo escalar álgebra lineal y cálculo estadístico desde un vector en memoria hasta particiones distribuidas en miles de nodos.

01. De Vectores a Particiones

📊 Matemática Clásica

$$ \mathbf{x} \in \mathbb{R}^n, \quad \mathbf{X} \in \mathbb{R}^{n \times p} $$
Un vector \(\mathbf{x}\) o matriz \(\mathbf{X}\) completo en memoria RAM.
import numpy as np

# Vector de 1 millón de elementos
x = np.random.randn(1_000_000)

# Calcular media
mu = np.mean(x)  # Todo en memoria
⚠️ Problema: Si x tiene 10 mil millones de elementos (80GB), no cabe en RAM de laptop.

⚙️ Ingeniería Distribuida

$$ \mathbf{x} = \begin{bmatrix} \mathbf{x}_1 \\ \mathbf{x}_2 \\ \vdots \\ \mathbf{x}_k \end{bmatrix} \quad \text{(k particiones)} $$
El vector se divide en \(k\) particiones, cada una en un nodo diferente.
from pyspark.sql import functions as F

# DataFrame con 10 mil millones de filas
# Distribuido en 100 particiones
df = spark.read.parquet("s3://data.parquet")

# Calcular media (distribuido)
mu = df.select(F.avg("value")).collect()[0][0]
✅ Solución: Spark divide en particiones, calcula sumas parciales en paralelo, luego agrega.

🔗 Por qué np.dot(A, B) no funciona con A de 100GB

NumPy asume que \(A\) y \(B\) están completos en memoria contigua. Con 100GB de datos, el sistema operativo empezará a hacer swap (usar disco como RAM), reduciendo velocidad 1000x. Spark mantiene particiones en disco y procesa solo lo necesario en cada momento.

02. MapReduce Explicado Matemáticamente

Ejemplo: Calcular Media μ = (1/n)Σxᵢ

Matemática clásica:
$$ \mu = \frac{1}{n} \sum_{i=1}^{n} x_i $$
Requiere recorrer todos los \(n\) elementos secuencialmente.
Versión distribuida (MapReduce):
$$ \begin{aligned} \text{Map: } &\quad s_j = \sum_{i \in P_j} x_i \quad \text{(suma parcial en partición } j\text{)} \\[0.5em] \text{Reduce: } &\quad \mu = \frac{1}{n} \sum_{j=1}^{k} s_j \quad \text{(agregar sumas parciales)} \end{aligned} $$
Cada partición \(P_j\) calcula su suma local \(s_j\) en paralelo. Luego se agregan.
🗺️ Fase MAP (Paralelo)
# En Worker 1 (Partición 1)
sum_p1 = 0
for x in partition_1:
    sum_p1 += x
# sum_p1 = 12345.67

# En Worker 2 (Partición 2)
sum_p2 = 0
for x in partition_2:
    sum_p2 += x
# sum_p2 = 23456.78

# ... (100 workers en total)
📉 Fase REDUCE (Secuencial)
# En el Driver (nodo maestro)
partial_sums = [
    sum_p1,   # 12345.67
    sum_p2,   # 23456.78
    # ...
    sum_p100  # 98765.43
]

total = sum(partial_sums)
n = 10_000_000_000
mu = total / n

print(f"Media: {mu}")

Implementación PySpark Real

from pyspark.sql import functions as F

# Dataset distribuido
df = spark.createDataFrame([(1,), (2,), (3,), (4,), (5,)], ["value"])

# Spark ejecuta MapReduce automáticamente
result = df.agg(
    F.avg("value").alias("media"),      # Media: (1+2+3+4+5)/5 = 3.0
    F.sum("value").alias("suma"),       # Suma: 15
    F.stddev("value").alias("desv_std") # Desviación estándar
).collect()[0]

print(f"Media: {result['media']}")  # 3.0
💡 F.avg() ejecuta MapReduce internamente: suma parcial en cada partición, luego divide por n total.

03. Particiones y Paralelización

graph TD DF[DataFrame
1TB de datos] --> R{Repartition
k=8} R --> P1[Partición 1
125GB] R --> P2[Partición 2
125GB] R --> P3[Partición 3
125GB] R --> P8[Partición 8
125GB] P1 --> C1[Core 1] P2 --> C2[Core 2] P3 --> C3[Core 3] P8 --> C8[Core 8] C1 --> Res[Resultado Final] C2 --> Res C3 --> Res C8 --> Res style DF fill:#E74C3C,stroke:#fff style Res fill:#27AE60,stroke:#fff

Ley de Amdahl (Speedup)

$$ S(n) = \frac{T_1}{T_n} = \frac{1}{(1-p) + \frac{p}{n}} $$
Donde:
• \(T_1\): Tiempo con 1 core
• \(T_n\): Tiempo con n cores
• \(p\): Fracción paralelizable del código
Ejemplo: Si p=0.9 (90% paralelizable) y n=8 cores:
\( S(8) = \frac{1}{0.1 + 0.9/8} = \frac{1}{0.2125} \approx 4.7x \)
⚡ Con 8 cores, obtienes 4.7x speedup (no 8x por overhead).

Código: Reparticionamiento

# Ver particiones actuales
print(df.rdd.getNumPartitions())  # 200 (default)

# Reparticionar a 8 (para cluster pequeño)
df_optimized = df.repartition(8)

# O usar coalesce (sin shuffle, más rápido)
# Solo para reducir particiones
df_reduced = df.coalesce(4)

# Particionar por columna (útil para joins)
df_by_city = df.repartition("city")
repartition(n): Distribuye datos uniformemente (con shuffle).
ℹ️ coalesce(n): Reduce particiones sin shuffle (más eficiente).

⚙️ Cuándo reparticionar

Muy pocas particiones
• Particiones > 1GB cada una
• Cores ociosos
Solución: Aumentar con repartition()
Balance óptimo
• 128MB - 1GB por partición
• Particiones ≈ 2-4x número de cores
Ideal para performance
Demasiadas particiones
• Miles de archivos pequeños
• Overhead de scheduling
Solución: Reducir con coalesce()

04. Broadcast Variables

❌ Problema: Envío Repetido

graph TD D[Driver
θ coeficientes] -->|Copia| W1[Worker 1] D -->|Copia| W2[Worker 2] D -->|Copia| W3[Worker 3] D -->|Copia| W100[Worker 100] style D fill:#E74C3C,stroke:#fff
Sin broadcast: Envía θ (parámetros modelo) a cada tarea. Si hay 1000 tareas en 100 workers, envía θ 1000 veces por la red.
$$ \text{Tráfico red} = \text{size}(\theta) \times n_{\text{tasks}} $$
Ejemplo: θ = 10MB, 1000 tareas → 10GB de tráfico de red.

✅ Solución: Broadcast

graph TD D[Driver
θ broadcast] -->|1 copia| W1[Worker 1
Todas las tareas] D -->|1 copia| W2[Worker 2
Todas las tareas] D -->|1 copia| W3[Worker 3
Todas las tareas] D -->|1 copia| W100[Worker 100
Todas las tareas] style D fill:#27AE60,stroke:#fff
Con broadcast: Envía θ una sola vez por worker (100 veces para 100 workers), sin importar cuántas tareas.
$$ \text{Tráfico red} = \text{size}(\theta) \times n_{\text{workers}} $$
Ejemplo: θ = 10MB, 100 workers → 1GB de tráfico (10x menos).

Código: Broadcast en PySpark

from pyspark.sql import functions as F

# Diccionario de parámetros de modelo
# (en la práctica, serían coeficientes de regresión)
model_params = {
    "intercept": 2.5,
    "coef_edad": 0.3,
    "coef_salario": 0.0001
}

# Broadcast (enviar una sola vez a cada worker)
bc_params = spark.sparkContext.broadcast(model_params)

# Usar en UDF (User Defined Function)
def predict_udf(edad, salario):
    params = bc_params.value  # Acceder a broadcast
    return (params["intercept"] +
            params["coef_edad"] * edad +
            params["coef_salario"] * salario)

# Registrar UDF
predict = F.udf(predict_udf)

# Aplicar a DataFrame (se ejecuta en workers)
df_with_predictions = df.withColumn(
    "prediction",
    predict(F.col("edad"), F.col("salario"))
)
💡 bc_params.value accede al diccionario sin transferirlo 1000 veces. Cada worker lo tiene en caché.

05. Operaciones Matriciales Distribuidas

Ejemplo: Producto Matriz-Vector y = Ax + b

Matemática:
$$ \mathbf{y} = \mathbf{A}\mathbf{x} + \mathbf{b}, \quad \mathbf{A} \in \mathbb{R}^{n \times p}, \; \mathbf{x} \in \mathbb{R}^{p}, \; \mathbf{b} \in \mathbb{R}^{n} $$
Si \(n = 10^9\), la matriz \(\mathbf{A}\) tiene \(10^9 \times p\) elementos. No cabe en memoria.
Estrategia Distribuida (Partición por Filas):
$$ \mathbf{A} = \begin{bmatrix} \mathbf{A}_1 \\ \mathbf{A}_2 \\ \vdots \\ \mathbf{A}_k \end{bmatrix}, \quad \mathbf{y} = \begin{bmatrix} \mathbf{y}_1 \\ \mathbf{y}_2 \\ \vdots \\ \mathbf{y}_k \end{bmatrix} $$
Cada partición \(\mathbf{A}_i\) calcula \(\mathbf{y}_i = \mathbf{A}_i \mathbf{x} + \mathbf{b}_i\) independientemente.
from pyspark.ml.linalg import Vectors, DenseVector
from pyspark.sql import functions as F

# DataFrame con features como vectores (cada fila = fila de A)
df = spark.createDataFrame([
    (1, Vectors.dense([1.0, 2.0, 3.0])),
    (2, Vectors.dense([4.0, 5.0, 6.0])),
    (3, Vectors.dense([7.0, 8.0, 9.0]))
], ["id", "features"])

# Vector x (coeficientes) - broadcast
x = [0.5, 1.0, 1.5]
bc_x = spark.sparkContext.broadcast(x)

# UDF para producto punto: A_i · x
def dot_product(features):
    x_vec = bc_x.value
    return float(sum(f * x for f, x in zip(features, x_vec)))

dot_udf = F.udf(dot_product)

# Aplicar: y_i = A_i · x (cada fila en paralelo)
df_result = df.withColumn("y", dot_udf(F.col("features")))

df_result.show()
# +---+-------------+----+
# | id|     features|   y|
# +---+-------------+----+
# |  1|[1.0,2.0,3.0]| 7.0|  # (1×0.5 + 2×1.0 + 3×1.5)
# |  2|[4.0,5.0,6.0]|16.0|
# |  3|[7.0,8.0,9.0]|25.0|
# +---+-------------+----+

🔗 Conexión con Regresión Lineal

En regresión, predecir \(\hat{y} = X\beta\) es exactamente este producto matriz-vector. Spark ML hace esto internamente cuando llamas model.transform().

$$ \hat{\mathbf{y}} = \mathbf{X}\boldsymbol{\beta}, \quad \text{con } \mathbf{X} \text{ particionada por filas} $$

06. Persistencia y Caching

Sin Cache (Recomputación)

df = spark.read.parquet("data.parquet")
df_filtered = df.filter(col("age") > 18)

# Operación 1: Cuenta
count1 = df_filtered.count()  # Lee y filtra

# Operación 2: Media
avg_salary = df_filtered.agg(
    F.avg("salary")
).collect()  # Lee y filtra OTRA VEZ

# Operación 3: Máximo
max_age = df_filtered.agg(
    F.max("age")
).collect()  # Lee y filtra OTRA VEZ
⚠️ df_filtered se recalcula 3 veces porque Spark no persiste resultados por defecto.

Con Cache (Almacenamiento)

df = spark.read.parquet("data.parquet")
df_filtered = df.filter(col("age") > 18)

# Cachear en memoria
df_filtered.cache()  # Marca para persistir

# Primera operación: Lee, filtra, y GUARDA en RAM
count1 = df_filtered.count()

# Segunda operación: Lee desde RAM (sin recomputar)
avg_salary = df_filtered.agg(
    F.avg("salary")
).collect()

# Tercera operación: Lee desde RAM
max_age = df_filtered.agg(
    F.max("age")
).collect()

# Liberar memoria cuando termines
df_filtered.unpersist()
df_filtered se calcula 1 sola vez, luego se lee desde memoria (mucho más rápido).

Niveles de Persistencia

from pyspark import StorageLevel

# Solo memoria (más rápido, pero puede fallar si no hay RAM)
df.persist(StorageLevel.MEMORY_ONLY)

# Memoria + Disco (si RAM llena, escribe a disco)
df.persist(StorageLevel.MEMORY_AND_DISK)

# Solo disco (para datasets enormes)
df.persist(StorageLevel.DISK_ONLY)

# Memoria serializada (comprime datos, ahorra RAM)
df.persist(StorageLevel.MEMORY_ONLY_SER)

# cache() es equivalente a MEMORY_AND_DISK
df.cache()  # Shortcut
Cuándo usar cache()
• Reutilizas un DataFrame múltiples veces
• Algoritmos iterativos (ML)
• Datos intermedios costosos de recomputar
Cuándo NO usar cache()
• Usas el DataFrame una sola vez
• Datos muy grandes y poca RAM
• Transformaciones simples y baratas

07. Reto Conceptual

Gradiente Descendente Distribuido

📋 Problema

Implementa una iteración de gradiente descendente distribuido para regresión lineal:

$$ \begin{aligned} \text{Objetivo: } & \min_{\boldsymbol{\theta}} J(\boldsymbol{\theta}) = \frac{1}{2n} \sum_{i=1}^{n} (h_{\boldsymbol{\theta}}(\mathbf{x}_i) - y_i)^2 \\[0.5em] \text{Actualización: } & \boldsymbol{\theta} := \boldsymbol{\theta} - \alpha \nabla J(\boldsymbol{\theta}) \\[0.5em] \text{Gradiente: } & \nabla J(\boldsymbol{\theta}) = \frac{1}{n} \sum_{i=1}^{n} (h_{\boldsymbol{\theta}}(\mathbf{x}_i) - y_i) \mathbf{x}_i \end{aligned} $$
Donde \(h_{\boldsymbol{\theta}}(\mathbf{x}) = \boldsymbol{\theta}^T \mathbf{x}\) es la predicción.
Desafío: ¿Cómo calcular ∇J en paralelo?
1
Broadcast θ: Enviar parámetros actuales a todos los workers.
2
Map (por partición): Calcular gradiente parcial \(\nabla J_i = \sum_{j \in P_i} (h_{\boldsymbol{\theta}}(\mathbf{x}_j) - y_j) \mathbf{x}_j\).
3
Reduce: Agregar gradientes parciales: \(\nabla J = \frac{1}{n} \sum_{i=1}^{k} \nabla J_i\).
4
Actualizar θ: \(\boldsymbol{\theta}_{\text{new}} = \boldsymbol{\theta}_{\text{old}} - \alpha \nabla J\).

Esqueleto de Código (para completar)

from pyspark.sql import functions as F
import numpy as np

# Parámetros iniciales
theta = np.array([0.0, 0.0, 0.0])  # p=3 features
alpha = 0.01  # Learning rate

for iteration in range(100):
    # TODO 1: Broadcast theta
    bc_theta = # ...

    # TODO 2: Calcular gradiente parcial en cada partición
    def compute_gradient(features, label):
        # features: Vector de features
        # label: y real
        # Retornar: (error × features)
        pass

    # TODO 3: Reduce (agregar gradientes)
    gradient = # ...

    # TODO 4: Actualizar theta
    theta = # ...

    print(f"Iteration {iteration}: theta = {theta}")

🎯 Objetivo de Aprendizaje

Este ejercicio demuestra cómo algoritmos iterativos (ML) se ejecutan en Spark. Cada iteración requiere: broadcast de parámetros, map (computar local), reduce (agregar), y actualización. Es el núcleo de algoritmos como SGD, Adam, etc.