Complete a 3 Question Survey and Earn a re:Post Badge
Help improve AWS Support Official channel in re:Post and share your experience - complete a quick three-question survey to earn a re:Post badge!
Come posso risolvere gli errori di fase "no space left on device" in un processo Apache Spark su Amazon EMR?
Quando invio un'applicazione Apache Spark a un cluster Amazon EMR, l'applicazione ha esito negativo con un errore di fase "no space left on device".
Breve descrizione
L'applicazione Apache Spark potrebbe riscontrare un errore no space left on device per uno dei seguenti motivi:
- Generazione di una quantità notevole di dati intermedi durante il processo di shuffle a causa della presenza di shuffle join
- Distribuzione non uniforme delle partizioni di dati e del carico di lavoro degli esecutori
- Dimensionamento e conteggio delle partizioni non corretti
- Disponibilità inadeguata di risorse come disco e memoria
Apache Spark utilizza l'archiviazione locale sui nodi core e attività per archiviare i dati intermedi (shuffle). Se lo spazio sui dischi esaurisce, il processo ha esito negativo e viene generato l'errore no space left on device.
Risoluzione
Questo articolo descrive le cause più frequenti degli errori di mancanza di spazio sul dispositivo fornendo le relative soluzioni. Per scegliere il procedimento appropriato, devi prima individuare la causa principale del problema.
Nota: se ricevi errori quando esegui i comandi dell'Interfaccia della linea di comando AWS (AWS CLI), consulta Risoluzione degli errori per AWS CLI. Inoltre, assicurati di utilizzare la versione più recente di AWS CLI.
Ripartizionamento
In base al numero di nodi core e attività presenti nel cluster, potrebbe essere necessario aumentare il numero di partizioni Spark. Per aggiungere altre partizioni Spark, esegui questo comando:
val numPartitions = 500 val newDF = df.repartition(numPartitions)
Nota: sostituisci 500 con il numero di partizioni adatto al caso d'uso.
Configurazioni Tune Spark
Gestione delle partizioni
In caso di fuoriuscite eccessive durante gli shuffle o di una distribuzione non uniforme dei dati tra le partizioni, regola i seguenti parametri:
spark.default.parallelism=${NUM_CORES * 2} #no. of partitions in RDDs spark.sql.shuffle.partitions=${NUM_CORES * 2} #no. of shuffle partitions spark.sql.files.maxPartitionBytes=256MB #max. no. of bytes in a partition when reading files spark.sql.files.maxRecordsPerFile=10000000
Aumenta il parallelismo e il numero di partizioni se si verifica una delle seguenti condizioni:
- La varianza della durata dell'attività è superiore al triplo della durata media
- La fuoriuscita per attività è superiore al 20%
Se la dimensione media delle partizioni è inferiore a 50 MB o sono presenti troppi file di piccole dimensioni, riduci il parallelismo e il numero di partizioni.
Per calcolare il numero ottimale delle partizioni, utilizza la seguente formula:
Initial Partitions = Number of Cores * 2 Optimal Partitions = max( Total Input Size / Target Partition Size, Initial Partitions )
Ottimizzazione basata sul volume di dati
Di seguito sono riportati i parametri di configurazione per set di dati di diverse dimensioni:
Set di dati di piccole dimensioni (<100 GB):
spark.sql.files.maxPartitionBytes=128MB spark.sql.shuffle.partitions=NUM_CORES * 2 spark.sql.files.maxRecordsPerFile=5000000
Set di dati di medie dimensioni (100 GB-1 TB):
spark.sql.files.maxPartitionBytes=256MB spark.sql.shuffle.partitions=NUM_CORES * 3 spark.sql.files.maxRecordsPerFile=10000000
Set di dati di grandi dimensioni (>1 TB):
spark.sql.files.maxPartitionBytes=512MB spark.sql.shuffle.partitions=NUM_CORES * 4 spark.sql.files.maxRecordsPerFile=20000000
Ottimizzazione della memoria e dell'archiviazione
Per ottimizzare la memoria e l'archiviazione, aggiorna i parametri di configurazione:
spark.memory.fraction=0.8 # Higher for compute-intensive jobs spark.memory.storageFraction=0.3 # Lower for shuffle-heavy workloads spark.executor.memoryOverhead=0.2 # 20% of executor memory spark.memory.offHeap.enabled=true spark.memory.offHeap.size=${EXECUTOR_MEMORY * 0.2}
Per calcolare l'allocazione totale di memoria per il container di un esecutore Spark, vengono combinati i seguenti quattro elementi:
- Memoria dell'esecutore (spark.executor.memory)
- Sovraccarico di memoria (spark.executor.memoryOverhead)
- Memoria off-heap (spark.memory.offHeap.size)
- Memoria PySpark (spark.executor.pyspark.memory)
Memoria totale del container dell'esecutore = spark.executor.memory + spark.executor.memoryOverhead + spark.memory.offHeap.size + spark.executor.pyspark.memory
Il seguente calcolo determina l'allocazione di memoria interna di Spark:
Storage Memory = Executor Memory * memory.fraction * memory.storageFraction Execution Memory = Executor Memory * memory.fraction * (1 - memory.storageFraction) Off-Heap Memory = Executor Memory * 0.2
Gestione di file e dischi
Se riscontri un'asimmetria nelle partizioni o se il numero di partizioni è troppo alto o troppo basso, ottimizza le configurazioni di gestione dei file. Imposta maxPartitionNum sul doppio dei core totali e minPartitionNum su 1, a meno che i casi d'uso non richiedano un valore diverso.
# File Management spark.sql.files.minPartitionNum=${NUM_CORES} spark.sql.files.maxPartitionNum=${NUM_CORES * 4} spark.shuffle.file.buffer=64k
Se maxPartitionNum è impostato su un valore troppo basso, potrebbe limitare il parallelismo e non impedire tutti gli scenari di asimmetria.
AQE e gestione delle asimmetrie
L’esecuzione adattiva delle query (AQE) in Spark è un'ottimizzazione del runtime che regola i piani di query in base a statistiche in tempo reale.
L'AQE è attivata per impostazione predefinita in Amazon EMR versione 5.30.0 e successive. L'AQE in Spark può ottimizzare automaticamente le strategie di join e lo shuffling. Può anche gestire efficacemente l'asimmetria dei dati tramite la suddivisione dinamica delle partizioni. Questo migliora il bilanciamento del carico e le prestazioni delle query.
# Skew Management spark.sql.adaptive.enabled=true spark.sql.adaptive.skewJoin.enabled=true spark.sql.adaptive.skewJoin.skewedPartitionFactor=10 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB
Se utilizzi una versione precedente di Spark che non supporta l'AQE, scegli uno dei seguenti metodi per gestire l'asimmetria dei dati:
- Regola la soglia spark.sql.autoBroadcastJoinThreshold per i join di trasmissione. Questo è utile quando un set di dati è notevolmente più piccolo dell'altro nei join.
- Utilizza repartition() o coalesce() nel codice per migliorare la distribuzione dei dati.
- Applica i suggerimenti SKEW alla tabella più grande o asimmetrica e trasmetti la tabella più piccola. I suggerimenti SKEW notificano a Spark Optimizer che una tabella ha dati asimmetrici contribuendo a ottimizzare le strategie di join.
Di seguito è riportato un esempio di suggerimenti SKEW nelle query SQL di Spark:
-- Using SKEW hint in Spark SQL SELECT /*+ SKEW('t1') */ t1.key, t2.value FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key -- Using MAPJOIN hint along with SKEW SELECT /*+ SKEW('t1'), MAPJOIN(t2) */ t1.key, t2.value FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key
Azione Bootstrap per l’aumento verticale dinamico dello spazio di archiviazione
Puoi utilizzare l'azione Bootstrap tramite il monitoraggio di Amazon CloudWatch e l'automazione Lambda per scalare automaticamente lo spazio di archiviazione per i cluster Amazon EMR. Quando lo spazio disponibile su disco scende al di sotto di una soglia prestabilita, Amazon CloudWatch avvia una funzione AWS Lambda. Tale funzione collega nuovi volumi Amazon Elastic Block Store (Amazon EBS) ai nodi del cluster. La funzione esegue quindi gli script per formattare, montare e integrare i volumi nel File system distribuito Hadoop (HDFS).
Questo approccio automatizzato previene gli errori del cluster causati da vincoli di archiviazione e mantiene l'efficacia dei costi aggiungendo capacità solo quando è necessaria. La sua implementazione richiede ruoli Identity and Access Management (IAM) appropriati, l'impostazione di allarmi in Amazon CloudWatch, la configurazione di AWS Lambda e script personalizzati per la gestione dei volumi. Per ulteriori informazioni, consulta Dynamically scale up storage on Amazon EMR clusters (Aumento verticale dinamico dello spazio di archiviazione su cluster Amazon EMR).
Aumenta la capacità a disposizione di Amazon EBS
Per i nuovi cluster utilizza volumi EBS più grandi
Avvia un cluster Amazon EMR e scegli un tipo di istanza Amazon Elastic Compute Cloud (Amazon EC2) con volumi EBS più grandi. Per ulteriori informazioni, consulta Spazio di archiviazione predefinito in Amazon EBS per le istanze.
Per i cluster in esecuzione aggiungi altri volumi EBS
Completa i seguenti passaggi:
-
Se un volume EBS più grande non risolve il problema, collega più volumi EBS ai nodi core e attività.
-
Formatta e monta i volumi collegati. Assicurati di utilizzare il numero di disco corretto (ad esempio, /mnt1 o /mnt2 anziché /data).
-
Crea una directory /mnt2/yarn, quindi imposta la proprietà della directory per l'utente YARN:
sudo mkdir /mnt2/yarn sudo chown yarn:yarn /mnt2/yarn
-
Aggiungi la directory /mnt2/yarn alla proprietà yarn.nodemanager.local-dirs di /etc/hadoop/conf/yarn-site.xml.
Esempio:<property> <name>yarn.nodemanager.local-dirs</name> <value>/mnt/yarn,/mnt1/yarn,/mnt2/yarn</value> </property>
-
Riavvia il servizio NodeManager:
Amazon EMR versioni 4.x-5.29.0sudo stop hadoop-yarn-nodemanager sudo start hadoop-yarn-nodemanager
Amazon EMR 5.30.0 e versioni successive
sudo systemctl stop hadoop-yarn-nodemanager sudo systemctl start hadoop-yarn-nodemanager
Informazioni correlate
Come posso risolvere gli errori di fase nei processi Spark in Amazon EMR?
Best practices (Best practice) sul sito web di AWS Open Data Analytics
