Skip to content

如何解决 Amazon EMR 上 Apache Spark 作业中的“no space left on device”(设备上没有剩余空间)阶段故障?

3 分钟阅读
0

当我向 Amazon EMR 集群提交 Apache Spark 应用程序时,该应用程序因“no space left on device”(设备上没有剩余空间)阶段失败而失败。

简短描述

由于以下原因之一,您的 Apache Spark 应用程序可能会遇到 no space left on device(设备上没有剩余空间)错误:

  • 由于存在重组连接,在重组过程中会生成大量的中间数据
  • 数据分区和执行程序工作负载分配不均
  • 分区大小和数量不正确
  • 磁盘和内存等资源可用性不足

Apache Spark 使用核心和任务节点上的本地存储来存储中间(重组)数据。当实例上的磁盘空间耗尽时,任务将失败,并出现 no space left on device(设备上没有剩余空间)错误。

解决方法

本文探讨了“设备上没有剩余空间”错误的最常见原因和解决方案。您必须确定根本原因才能实施相应的修复。

**注意:**如果您在运行 AWS 命令行界面 (AWS CLI) 命令时收到错误,请参阅 AWS CLI 错误故障排除。此外,请确保您使用的是最新版本的 AWS CLI

重新分区

根据集群中有多少核心和任务节点,您可能需要增加 Spark 分区的数量。要添加更多 Spark 分区,请运行以下命令:

val numPartitions = 500
val newDF = df.repartition(numPartitions)

**注意:**将 500 替换为适合您的用例的分区数量。

调整 Spark 配置

分区管理

如果在重组期间出现过多磁盘溢出或分区间数据分配不均的情况,请调整以下参数:

spark.default.parallelism=${NUM_CORES * 2} #no. of partitions in RDDs
spark.sql.shuffle.partitions=${NUM_CORES * 2} #no. of shuffle partitions
spark.sql.files.maxPartitionBytes=256MB #max. no. of bytes in a partition when reading files
spark.sql.files.maxRecordsPerFile=10000000

如果满足以下条件之一,请增加并行度和分区数量:

  • 任务持续时间偏差大于平均持续时间的 3 倍
  • 每项任务的溢出量大于 20%

如果平均分区大小小于 50 MB 或小文件过多,请减少并行度和分区数量。

要计算最佳分区数,请使用以下公式:

Initial Partitions = Number of Cores * 2
Optimal Partitions = max(
    Total Input Size / Target Partition Size,
    Initial Partitions
)

根据数据量进行调整

以下是不同大小的数据集的配置参数:

小型数据集 (<100 GB):

spark.sql.files.maxPartitionBytes=128MB
spark.sql.shuffle.partitions=NUM_CORES * 2
spark.sql.files.maxRecordsPerFile=5000000

中型数据集 (100 GB-1 TB):

spark.sql.files.maxPartitionBytes=256MB
spark.sql.shuffle.partitions=NUM_CORES * 3
spark.sql.files.maxRecordsPerFile=10000000

大型数据集 (>1 TB):

spark.sql.files.maxPartitionBytes=512MB
spark.sql.shuffle.partitions=NUM_CORES * 4
spark.sql.files.maxRecordsPerFile=20000000

内存和存储优化

要优化内存和存储,请更新您的配置参数:

spark.memory.fraction=0.8 # Higher for compute-intensive jobs
spark.memory.storageFraction=0.3 # Lower for shuffle-heavy workloads
spark.executor.memoryOverhead=0.2 # 20% of executor memory
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=${EXECUTOR_MEMORY * 0.2}

要计算 Spark 执行程序容器的总内存分配,以下四个内存组件组合在一起:

  • 执行程序内存 (spark.executor.memory)
  • 内存开销 (spark.executor.memoryOverhead)
  • 堆外内存 (spark.memory.offHeap.size)
  • PySpark 内存 (spark.executor.pyspark.memory)

执行程序容器总内存 = spark.executor.memory + spark.executor.memoryOverhead + spark.memory.offHeap.size + spark.executor.pyspark.memory

以下计算决定了 Spark 的内部内存分配:

Storage Memory = Executor Memory * memory.fraction * memory.storageFraction
Execution Memory = Executor Memory * memory.fraction * (1 - memory.storageFraction)
Off-Heap Memory = Executor Memory * 0.2

文件和磁盘管理

如果存在分区倾斜或分区数量过高或过低,请调整文件管理配置。将 maxPartitionNum 设置为总内核数的 2 倍,将 minPartitionNum 设置为 1,除非您的用例需要不同的值。

# File Management
spark.sql.files.minPartitionNum=${NUM_CORES}
spark.sql.files.maxPartitionNum=${NUM_CORES * 4}
spark.shuffle.file.buffer=64k

如果 maxPartitionNum 设置得过低,则可能会限制并行度,可能无法阻止所有偏差情境。

AQE 和偏差处理

Spark 中的 AQE(自适应查询执行)是一种运行时优化,可根据实时统计数据调整查询计划。

在 Amazon EMR 版本 5.30.0 及更高版本中,AQE 默认处于开启状态。Spark 中的 AQE 可以自动优化连接策略和重组。它还可以通过动态分区拆分有效地处理数据偏差。这样可提高负载平衡和查询性能。

# Skew Management
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=10
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB

如果您使用不支持 AQE 的早期版本的 Spark,请使用以下方法之一来管理数据偏差:

  • 调整广播连接的 spark.sql.autoBroadcastJoinThreshold 阈值。当连接中一个数据集明显小于另一个数据集时,这很有用。
  • 在代码中使用 repartition()coalesce() 改进数据分配。
  • 将 SKEW 提示应用于较大或偏差的表,并广播较小的表。SKEW 提示通知 Spark 优化器表存在偏差数据,有助于优化连接策略。

以下是 Spark SQL 查询中的 SKEW 提示示例:

-- Using SKEW hint in Spark SQL
SELECT /*+ SKEW('t1') */
    t1.key, t2.value
FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key

-- Using MAPJOIN hint along with SKEW
SELECT /*+ SKEW('t1'), MAPJOIN(t2) */
    t1.key, t2.value
FROM table1 t1
JOIN table2 t2
ON t1.key = t2.key

用于动态纵向扩展存储空间的引导操作

您可以通过 Amazon CloudWatch 监控和 Lambda 自动化使用 Bootstrap 操作来自动扩展 Amazon EMR 集群的存储。当可用磁盘空间低于设定的阈值时,Amazon CloudWatch 会启动 AWS Lambda 函数。此函数将新的 Amazon Elastic Block Store (Amazon EBS) 卷连接到集群节点。然后,此函数运行脚本来格式化、挂载卷并将其集成到 Hadoop Distributed File System (HDFS) 中。

这种自动化方法可以防止因存储限制而导致的集群故障,并通过仅在需要时增加容量来保持成本效益。实施需要适当的 Identity and Access Management (IAM) 角色、Amazon CloudWatch 警报、AWS Lambda 配置和用于卷管理的自定义脚本。有关详细信息,请参阅 Dynamically scale up storage on Amazon EMR clusters

增加更多 Amazon EBS 容量

对于新集群,使用更大的 EBS 卷

启动 Amazon EMR 集群并选择具有更大 EBS 卷的 Amazon Elastic Compute Cloud (Amazon EC2) 实例类型。有关详细信息,请参阅实例的默认 Amazon EBS 存储

对于正在运行的集群,请添加更多 EBS 卷

完成以下步骤:

  1. 如果更大的 EBS 卷不能解决问题,请将更多 EBS 卷连接到核心和任务节点。

  2. 格式化并挂载连接的卷。确保使用正确的磁盘编号,例如 /mnt1/mnt2 而不是 /data

  3. 使用 SSH 客户端连接到节点

  4. 创建 /mnt2/yarn 目录,然后将该目录的所有权设置为 YARN 用户:

    sudo mkdir /mnt2/yarn
    sudo chown yarn:yarn /mnt2/yarn
  5. /mnt2/yarn 目录添加到 /etc/hadoop/conf/yarn-site.xmlyarn.nodemanager.local-dirs 属性中。
    示例:

    <property>
        <name>yarn.nodemanager.local-dirs</name>
        <value>/mnt/yarn,/mnt1/yarn,/mnt2/yarn</value>
    </property>
  6. 重启 NodeManager 服务:
    Amazon EMR 4.x-5.29.0 发布版本

    sudo stop hadoop-yarn-nodemanager
    sudo start hadoop-yarn-nodemanager

    Amazon EMR 5.30.0 及更高版本的发布版本

    sudo systemctl stop hadoop-yarn-nodemanager
    sudo systemctl start hadoop-yarn-nodemanager

相关信息

如何解决 Amazon EMR 上 Spark 作业中的阶段故障?

什么是 EKS 上的 Amazon EMR?

什么是 Amazon EMR Serverless?

AWS Open Data Analytics 网站上的最佳实践

AWS 官方已更新 3 个月前