Saltar al contenido

¿Cómo soluciono los errores de fase "No queda espacio en el dispositivo" en el trabajo de Apache Spark en Amazon EMR?

8 minutos de lectura
0

Cuando envío una aplicación Apache Spark a un clúster de Amazon EMR, la aplicación devuelve un error en la fase "No queda espacio en el dispositivo".

Descripción corta

Es posible que la aplicación Apache Spark detecte un error que indica que no queda espacio en el dispositivo por uno de los siguientes motivos:

  • Durante el proceso de mezcla se generan datos intermedios importantes debido a la presencia de combinaciones de mezcla.
  • Existe una distribución desigual de las particiones de datos y de la distribución de la carga de trabajo del ejecutor
  • El tamaño y recuento no son correctos en las particiones
  • La disponibilidad de recursos como el disco y la memoria no es adecuada

Apache Spark usa un almacenamiento local en los nodos principales y de tareas para almacenar datos intermedios (aleatorios). Si los discos se quedan sin espacio en la instancia, se produce un error en el trabajo y aparece el error No queda espacio en el dispositivo.

Resolución

En este artículo se abordan las causas y soluciones más frecuentes de los errores por falta de espacio en los dispositivos. Debes identificar la causa raíz para implementar la solución adecuada.

Nota: Si se muestran errores al ejecutar comandos de la Interfaz de la línea de comandos de AWS (AWS CLI), consulta Solución de problemas de AWS CLI. Además, asegúrate de utilizar la versión más reciente de AWS CLI.

Volver a realizar la partición

En función del número de nodos principales y de tareas que haya en el clúster, es posible que tengas que aumentar el número de particiones de Spark. Para añadir más particiones de Spark, ejecuta el siguiente comando:

val numPartitions = 500
val newDF = df.repartition(numPartitions)

Nota: Sustituye 500 por la cantidad de particiones que se adapte a tu situación.

Configuraciones de Tune Spark

Administración de particiones

Cuando se produzcan desbordamientos de disco excesivos durante las mezclas o si se produce una distribución desigual de los datos entre las particiones, ajusta los siguientes parámetros:

spark.default.parallelism=${NUM_CORES * 2} #no. of partitions in RDDs
spark.sql.shuffle.partitions=${NUM_CORES * 2} #no. of shuffle partitions
spark.sql.files.maxPartitionBytes=256MB #max. no. of bytes in a partition when reading files
spark.sql.files.maxRecordsPerFile=10000000

Aumenta el paralelismo y el número de particiones si se cumple una de las siguientes condiciones:

  • La varianza de la duración de la tarea es 3 veces superior a la duración media
  • El desbordamiento por tarea es superior al 20 %

Si el tamaño medio de las particiones es inferior a 50 MB o hay demasiados archivos pequeños, reduce el paralelismo y el número de particiones.

Para calcular la cantidad óptima de particiones, utiliza la siguiente fórmula:

Initial Partitions = Number of Cores * 2
Optimal Partitions = max(
    Total Input Size / Target Partition Size,
    Initial Partitions
)

Ajuste basado en el volumen de datos

Los siguientes parámetros te permiten configurar diferentes tamaños de conjuntos de datos:

Conjunto de datos pequeño (<100 GB):

spark.sql.files.maxPartitionBytes=128MB
spark.sql.shuffle.partitions=NUM_CORES * 2
spark.sql.files.maxRecordsPerFile=5000000

Conjunto de datos mediano (100 GB-1 TB):

spark.sql.files.maxPartitionBytes=256MB
spark.sql.shuffle.partitions=NUM_CORES * 3
spark.sql.files.maxRecordsPerFile=10000000

Conjunto de datos grande (>1 TB):

spark.sql.files.maxPartitionBytes=512MB
spark.sql.shuffle.partitions=NUM_CORES * 4
spark.sql.files.maxRecordsPerFile=20000000

**Optimización de memoria y almacenamiento **

Para optimizar la memoria y el almacenamiento, actualiza los parámetros de configuración:

spark.memory.fraction=0.8 # Higher for compute-intensive jobs
spark.memory.storageFraction=0.3 # Lower for shuffle-heavy workloads
spark.executor.memoryOverhead=0.2 # 20% of executor memory
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=${EXECUTOR_MEMORY * 0.2}

Para calcular la asignación total de memoria para un contenedor ejecutor de Spark, se combinan los cuatro componentes de memoria siguientes:

  • Memoria del ejecutor (spark.executor.memory)
  • Sobrecarga de memoria (spark.executor.memoryOverhead)
  • Memoria disponible (spark.memory.offHeap.size)
  • Memoria PySpark (spark.executor.pyspark.memory)

Memoria total del contenedor del ejecutor = spark.executor.memory + spark.executor.memoryOverhead + spark.memory.offHeap.size + spark.executor.pyspark.memory

El siguiente cálculo determina la asignación de memoria interna de Spark:

Storage Memory = Executor Memory * memory.fraction * memory.storageFraction
Execution Memory = Executor Memory * memory.fraction * (1 - memory.storageFraction)
Off-Heap Memory = Executor Memory * 0.2

Administración de archivos y discos

Si hay un sesgo de partición o el número de particiones es demasiado alto o demasiado bajo, ajusta las configuraciones de administración de archivos. Establece el valor de maxPartitionNum en 2 veces el total de núcleos y el de minPartitionNum en 1, a menos que tus casos de uso requieran un valor diferente.

# File Management
spark.sql.files.minPartitionNum=${NUM_CORES}
spark.sql.files.maxPartitionNum=${NUM_CORES * 4}
spark.shuffle.file.buffer=64k

Si el valor de maxPartitionNum es demasiado bajo, podría limitar el paralelismo y no se evitarían todas las situaciones de sesgo.

AQE y gestión de sesgos

AQE (Ejecución adaptativa de consultas) en Spark es una optimización de la versión ejecutable que ajusta los planes de consulta en función de estadísticas en tiempo real.

AQE está activado de forma predeterminada en la versión 5.30.0 y posteriores de Amazon EMR. AQE en Spark puede optimizar las estrategias de unión y mezcla automáticamente. También puede administrar eficazmente el sesgo de datos mediante la división dinámica de particiones. Esto mejora el equilibrio de carga y el rendimiento de las consultas.

# Skew Management
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=10
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB

Si utilizas una versión anterior de Spark que no admite AQE, utiliza uno de los siguientes métodos para gestionar el sesgo de datos:

  • Ajusta el umbral spark.sql.autoBroadcastJoinThreshold para las uniones de difusión. Esto es útil cuando un conjunto de datos es significativamente más pequeño que el otro en las uniones.
  • Usa repartition() o coalesce() en tu código para mejorar la distribución de datos.
  • Aplica sugerencias de SKEW a la tabla más grande o sesgada y difunde la tabla más pequeña. Las sugerencias de SKEW notifican al optimizador de Spark que una tabla tiene datos sesgados y le permiten optimizar las estrategias de unión.

A continuación se muestra un ejemplo de sugerencias de SKEW en las consultas SQL de Spark:

-- Using SKEW hint in Spark SQL
SELECT /*+ SKEW('t1') */
    t1.key, t2.value
FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key

-- Using MAPJOIN hint along with SKEW
SELECT /*+ SKEW('t1'), MAPJOIN(t2) */
    t1.key, t2.value
FROM table1 t1
JOIN table2 t2
ON t1.key = t2.key

Acción de arranque para ampliar el almacenamiento de forma dinámica

Puedes utilizar la acción de arranque mediante la supervisión de Amazon CloudWatch y la automatización de Lambda para escalar automáticamente el almacenamiento de los clústeres de Amazon EMR. Cuando el espacio disponible en disco cae por debajo de un umbral establecido, Amazon CloudWatch lanza una función de AWS Lambda. Esta función adjunta nuevos volúmenes de Amazon Elastic Block Store (Amazon EBS) a los nodos del clúster. A continuación, la función ejecuta scripts para formatear, montar e integrar los volúmenes en el Sistema de archivos distribuidos de Hadoop (HDFS).

Este enfoque automatizado evita errores en los clústeres causados por restricciones del almacenamiento y mantiene la rentabilidad, ya que solo agrega capacidad cuando es necesaria. La implementación requiere funciones adecuadas para Identity and Access Management (IAM), alarmas de Amazon CloudWatch, configuración de AWS Lambda y scripts personalizados para la administración de volúmenes. Para obtener más información, consulta Escalar vertical y dinámicamente el almacenamiento en clústeres de Amazon EMR.

Añade más capacidad de Amazon EBS

Para clústeres nuevos: utiliza volúmenes de EBS más grandes

Inicia un clúster de Amazon EMR y elige un tipo de instancia de Amazon Elastic Compute Cloud (Amazon EC2) con volúmenes de EBS más grandes. Para más información, consulta Almacenamiento predeterminado de Amazon EBS para instancias.

Para ejecutar clústeres, añade más volúmenes de EBS

Sigue estos pasos:

  1. Si volúmenes EBS más grandes no resuelven el problema,adjunta más volúmenes EBS a los nodos principales y de tareas.

  2. Formatea y monta los volúmenes adjuntos. Asegúrate de utilizar el número de disco correcto (por ejemplo, /mnt1 o /mnt2 en lugar de** /data**).

  3. Usa un cliente SSH para conectarte al nodo.

  4. Crea un directorio /mnt2/yarn y, a continuación, asigna la propiedad del directorio al usuario YARN:

    sudo mkdir /mnt2/yarn
    sudo chown yarn:yarn /mnt2/yarn
  5. Añade el directorio /mnt2/yarn a la propiedad yarn.nodemanager.local-dirs de /etc/hadoop/conf/yarn-site.xml.
    Ejemplo:

    <property>
        <name>yarn.nodemanager.local-dirs</name>
        <value>/mnt/yarn,/mnt1/yarn,/mnt2/yarn</value>
    </property>
  6. Reinicia el servicio NodeManager:
    Versiones 4.x-5.29.0 de Amazon EMR

    sudo stop hadoop-yarn-nodemanager
    sudo start hadoop-yarn-nodemanager

    Versiones 5.30.0 y posteriores de Amazon EMR

    sudo systemctl stop hadoop-yarn-nodemanager
    sudo systemctl start hadoop-yarn-nodemanager

Información relacionada

¿Cómo puedo solucionar los errores de fase de los trabajos de Spark en Amazon EMR?

¿Qué es Amazon EMR en EKS?

¿Qué es Amazon EMR sin servidor?

Prácticas recomendadas en el sitio web de análisis de datos abiertos de AWS

OFICIAL DE AWSActualizada hace 9 meses