Short description
Your Apache Spark application might encounter a no space left on device error for one of the following reasons:
- Substantial intermediate data is generated during the shuffle process because of the presence of shuffle joins
- Uneven distribution of data partitions and executor workload distribution
- Improper partition sizing and count
- Inadequate availability of resources such as disk and memory
Apache Spark uses local storage on the core and task nodes to store intermediate (shuffle) data. When disks run out of space on the instance, the job fails with a no space left on device error.
Resolution
This article addresses the most frequent causes and solutions for no space left on device errors. You must identify the root cause to implement the appropriate fix.
Note: If you receive errors when you run AWS Command Line Interface (AWS CLI) commands, then see Troubleshooting errors for the AWS CLI. Also, make sure that you're using the most recent AWS CLI version.
Repartitioning
Based on how many core and task nodes are in the cluster, you might need to increase the number of Spark partitions. To add more Spark partitions, run the following command:
val numPartitions = 500
val newDF = df.repartition(numPartitions)
Note: Replace 500 with the number of partitions that fits your use case.
Tune Spark configurations
Partition management
When there is excessive disk spills during shuffles or an uneven data distribution across partitions, tune the following parameters:
spark.default.parallelism=${NUM_CORES * 2} #no. of partitions in RDDs
spark.sql.shuffle.partitions=${NUM_CORES * 2} #no. of shuffle partitions
spark.sql.files.maxPartitionBytes=256MB #max. no. of bytes in a partition when reading files
spark.sql.files.maxRecordsPerFile=10000000
Increase the parallelism and number of partitions if one of the following is true:
- The Task Duration Variance is greater than 3 times the mean duration
- The Spill per Task is greater than 20%
If the average partition size is less than 50 MB or there are too many small files, then decrease the parallelism and the number of partitions.
To calculate optimal partition count, use the following formula:
Initial Partitions = Number of Cores * 2
Optimal Partitions = max(
Total Input Size / Target Partition Size,
Initial Partitions
)
Tuning based on data volume
The following are configuration parameters for different sizes of datasets:
Small dataset (<100 GB):
spark.sql.files.maxPartitionBytes=128MB
spark.sql.shuffle.partitions=NUM_CORES * 2
spark.sql.files.maxRecordsPerFile=5000000
Medium dataset (100 GB-1 TB):
spark.sql.files.maxPartitionBytes=256MB
spark.sql.shuffle.partitions=NUM_CORES * 3
spark.sql.files.maxRecordsPerFile=10000000
Large dataset (>1 TB):
spark.sql.files.maxPartitionBytes=512MB
spark.sql.shuffle.partitions=NUM_CORES * 4
spark.sql.files.maxRecordsPerFile=20000000
Memory and storage optimization
To optimize memory and storage, update your configuration parameters:
spark.memory.fraction=0.8 # Higher for compute-intensive jobs
spark.memory.storageFraction=0.3 # Lower for shuffle-heavy workloads
spark.executor.memoryOverhead=0.2 # 20% of executor memory
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=${EXECUTOR_MEMORY * 0.2}
To calculate the total memory allocation for a Spark executor container, the following four memory components are combined:
- Executor Memory (spark.executor.memory)
- Memory Overhead (spark.executor.memoryOverhead)
- Off-Heap Memory (spark.memory.offHeap.size)
- PySpark Memory (spark.executor.pyspark.memory)
Total Executor Container Memory = spark.executor.memory + spark.executor.memoryOverhead + spark.memory.offHeap.size + spark.executor.pyspark.memory
The following calculation determines Spark's internal memory allocation:
Storage Memory = Executor Memory * memory.fraction * memory.storageFraction
Execution Memory = Executor Memory * memory.fraction * (1 - memory.storageFraction)
Off-Heap Memory = Executor Memory * 0.2
Files and disk management
If there is a partition skew or the number of partitions is too high or too low, then tune the file management configurations. Set maxPartitionNum to 2 times the total cores and minPartitionNum to 1, unless your use cases require a different value.
# File Management
spark.sql.files.minPartitionNum=${NUM_CORES}
spark.sql.files.maxPartitionNum=${NUM_CORES * 4}
spark.shuffle.file.buffer=64k
If the maxPartitionNum is set too low, then it could limit parallelism and might not prevent all skew scenarios.
AQE and skew handling
AQE (Adaptive Query Execution) in Spark is a runtime optimization that adjusts query plans based on real time statistics.
AQE is turned on by default in Amazon EMR version 5.30.0 and later. AQE in Spark can optimize join strategies and shuffling automatically. It can also effectively handle data skew through dynamic partition splitting. This improves load balancing and query performance.
# Skew Management
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=10
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB
If you use an earlier version of Spark that doesn't support AQE, then use one of the following methods to manage the data skew:
- Tune spark.sql.autoBroadcastJoinThreshold threshold for broadcast joins. This is useful when one dataset is significantly smaller than the other in joins.
- Use repartition() or coalesce() in your code to improve data distribution.
- Apply SKEW hints to the larger or skewed table and broadcast the smaller table. SKEW hints notifies Spark optimizer that a table has skewed data and helps to optimize join strategies.
The following is an example of SKEW hints in Spark SQL queries:
-- Using SKEW hint in Spark SQL
SELECT /*+ SKEW('t1') */
t1.key, t2.value
FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key
-- Using MAPJOIN hint along with SKEW
SELECT /*+ SKEW('t1'), MAPJOIN(t2) */
t1.key, t2.value
FROM table1 t1
JOIN table2 t2
ON t1.key = t2.key
Bootstrap action to dynamically scale up storage
You can use Bootstrap action through Amazon CloudWatch monitoring and Lambda automation to auto scale storage for Amazon EMR clusters. When available disk space falls below a set threshold, Amazon CloudWatch launches an AWS Lambda function. This function attaches new Amazon Elastic Block Store (Amazon EBS) volumes to the cluster nodes. The function then runs scripts to format, mount, and integrate the volumes into Hadoop Distributed File System (HDFS).
This automated approach prevents cluster failures caused by storage constraints and maintains cost effectiveness by adding capacity only when it's needed. The implementation requires proper Identity and Access Management (IAM) roles, Amazon CloudWatch alarms, AWS Lambda configuration, and custom scripts for volume management. For more information, see Dynamically scale up storage on Amazon EMR clusters.
Add more Amazon EBS capacity
For new clusters, use larger EBS volumes
Launch an Amazon EMR cluster and choose an Amazon Elastic Compute Cloud (Amazon EC2) instance type with larger EBS volumes. For more information, see Default Amazon EBS storage for instances.
For running clusters, add more EBS volumes
Complete the following steps:
-
If a larger EBS volume doesn't resolve the problem, then attach more EBS volumes to the core and task nodes.
-
Format and mount the attached volumes. Be sure to use the correct disk number, for example, /mnt1 or /mnt2 instead of /data.
-
Use an SSH client to connect to the node.
-
Create a /mnt2/yarn directory, and then set ownership of the directory to the YARN user:
sudo mkdir /mnt2/yarn
sudo chown yarn:yarn /mnt2/yarn
-
Add the /mnt2/yarn directory to the yarn.nodemanager.local-dirs property of /etc/hadoop/conf/yarn-site.xml.
Example:
<property>
<name>yarn.nodemanager.local-dirs</name>
<value>/mnt/yarn,/mnt1/yarn,/mnt2/yarn</value>
</property>
-
Restart the NodeManager service:
Amazon EMR 4.x-5.29.0 release versions
sudo stop hadoop-yarn-nodemanager
sudo start hadoop-yarn-nodemanager
Amazon EMR 5.30.0 and later release versions
sudo systemctl stop hadoop-yarn-nodemanager
sudo systemctl start hadoop-yarn-nodemanager
Related information
How do I troubleshoot stage failures in Spark jobs on Amazon EMR?
What is Amazon EMR on EKS?
What is Amazon EMR Serverless?
Best practices on the AWS Open Data Analytics website