Come posso risolvere gli errori di fase "no space left on device" in un processo Apache Spark su Amazon EMR?

7 minuti di lettura
0

Quando invio un'applicazione Apache Spark a un cluster Amazon EMR, l'applicazione ha esito negativo con un errore di fase "no space left on device".

Breve descrizione

L'applicazione Apache Spark potrebbe riscontrare un errore no space left on device per uno dei seguenti motivi:

  • Generazione di una quantità notevole di dati intermedi durante il processo di shuffle a causa della presenza di shuffle join
  • Distribuzione non uniforme delle partizioni di dati e del carico di lavoro degli esecutori
  • Dimensionamento e conteggio delle partizioni non corretti
  • Disponibilità inadeguata di risorse come disco e memoria

Apache Spark utilizza l'archiviazione locale sui nodi core e attività per archiviare i dati intermedi (shuffle). Se lo spazio sui dischi esaurisce, il processo ha esito negativo e viene generato l'errore no space left on device.

Risoluzione

Questo articolo descrive le cause più frequenti degli errori di mancanza di spazio sul dispositivo fornendo le relative soluzioni. Per scegliere il procedimento appropriato, devi prima individuare la causa principale del problema.

Nota: se ricevi errori quando esegui i comandi dell'Interfaccia della linea di comando AWS (AWS CLI), consulta Risoluzione degli errori per AWS CLI. Inoltre, assicurati di utilizzare la versione più recente di AWS CLI.

Ripartizionamento

In base al numero di nodi core e attività presenti nel cluster, potrebbe essere necessario aumentare il numero di partizioni Spark. Per aggiungere altre partizioni Spark, esegui questo comando:

val numPartitions = 500
val newDF = df.repartition(numPartitions)

Nota: sostituisci 500 con il numero di partizioni adatto al caso d'uso.

Configurazioni Tune Spark

Gestione delle partizioni

In caso di fuoriuscite eccessive durante gli shuffle o di una distribuzione non uniforme dei dati tra le partizioni, regola i seguenti parametri:

spark.default.parallelism=${NUM_CORES * 2} #no. of partitions in RDDs
spark.sql.shuffle.partitions=${NUM_CORES * 2} #no. of shuffle partitions
spark.sql.files.maxPartitionBytes=256MB #max. no. of bytes in a partition when reading files
spark.sql.files.maxRecordsPerFile=10000000

Aumenta il parallelismo e il numero di partizioni se si verifica una delle seguenti condizioni:

  • La varianza della durata dell'attività è superiore al triplo della durata media
  • La fuoriuscita per attività è superiore al 20%

Se la dimensione media delle partizioni è inferiore a 50 MB o sono presenti troppi file di piccole dimensioni, riduci il parallelismo e il numero di partizioni.

Per calcolare il numero ottimale delle partizioni, utilizza la seguente formula:

Initial Partitions = Number of Cores * 2
Optimal Partitions = max(
    Total Input Size / Target Partition Size,
    Initial Partitions
)

Ottimizzazione basata sul volume di dati

Di seguito sono riportati i parametri di configurazione per set di dati di diverse dimensioni:

Set di dati di piccole dimensioni (<100 GB):

spark.sql.files.maxPartitionBytes=128MB
spark.sql.shuffle.partitions=NUM_CORES * 2
spark.sql.files.maxRecordsPerFile=5000000

Set di dati di medie dimensioni (100 GB-1 TB):

spark.sql.files.maxPartitionBytes=256MB
spark.sql.shuffle.partitions=NUM_CORES * 3
spark.sql.files.maxRecordsPerFile=10000000

Set di dati di grandi dimensioni (>1 TB):

spark.sql.files.maxPartitionBytes=512MB
spark.sql.shuffle.partitions=NUM_CORES * 4
spark.sql.files.maxRecordsPerFile=20000000

Ottimizzazione della memoria e dell'archiviazione

Per ottimizzare la memoria e l'archiviazione, aggiorna i parametri di configurazione:

spark.memory.fraction=0.8 # Higher for compute-intensive jobs
spark.memory.storageFraction=0.3 # Lower for shuffle-heavy workloads
spark.executor.memoryOverhead=0.2 # 20% of executor memory
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=${EXECUTOR_MEMORY * 0.2}

Per calcolare l'allocazione totale di memoria per il container di un esecutore Spark, vengono combinati i seguenti quattro elementi:

  • Memoria dell'esecutore (spark.executor.memory)
  • Sovraccarico di memoria (spark.executor.memoryOverhead)
  • Memoria off-heap (spark.memory.offHeap.size)
  • Memoria PySpark (spark.executor.pyspark.memory)

Memoria totale del container dell'esecutore = spark.executor.memory + spark.executor.memoryOverhead + spark.memory.offHeap.size + spark.executor.pyspark.memory

Il seguente calcolo determina l'allocazione di memoria interna di Spark:

Storage Memory = Executor Memory * memory.fraction * memory.storageFraction
Execution Memory = Executor Memory * memory.fraction * (1 - memory.storageFraction)
Off-Heap Memory = Executor Memory * 0.2

Gestione di file e dischi

Se riscontri un'asimmetria nelle partizioni o se il numero di partizioni è troppo alto o troppo basso, ottimizza le configurazioni di gestione dei file. Imposta maxPartitionNum sul doppio dei core totali e minPartitionNum su 1, a meno che i casi d'uso non richiedano un valore diverso.

# File Management
spark.sql.files.minPartitionNum=${NUM_CORES}
spark.sql.files.maxPartitionNum=${NUM_CORES * 4}
spark.shuffle.file.buffer=64k

Se maxPartitionNum è impostato su un valore troppo basso, potrebbe limitare il parallelismo e non impedire tutti gli scenari di asimmetria.

AQE e gestione delle asimmetrie

L’esecuzione adattiva delle query (AQE) in Spark è un'ottimizzazione del runtime che regola i piani di query in base a statistiche in tempo reale.

L'AQE è attivata per impostazione predefinita in Amazon EMR versione 5.30.0 e successive. L'AQE in Spark può ottimizzare automaticamente le strategie di join e lo shuffling. Può anche gestire efficacemente l'asimmetria dei dati tramite la suddivisione dinamica delle partizioni. Questo migliora il bilanciamento del carico e le prestazioni delle query.

# Skew Management
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=10
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB

Se utilizzi una versione precedente di Spark che non supporta l'AQE, scegli uno dei seguenti metodi per gestire l'asimmetria dei dati:

  • Regola la soglia spark.sql.autoBroadcastJoinThreshold per i join di trasmissione. Questo è utile quando un set di dati è notevolmente più piccolo dell'altro nei join.
  • Utilizza repartition() o coalesce() nel codice per migliorare la distribuzione dei dati.
  • Applica i suggerimenti SKEW alla tabella più grande o asimmetrica e trasmetti la tabella più piccola. I suggerimenti SKEW notificano a Spark Optimizer che una tabella ha dati asimmetrici contribuendo a ottimizzare le strategie di join.

Di seguito è riportato un esempio di suggerimenti SKEW nelle query SQL di Spark:

-- Using SKEW hint in Spark SQL
SELECT /*+ SKEW('t1') */
    t1.key, t2.value
FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key

-- Using MAPJOIN hint along with SKEW
SELECT /*+ SKEW('t1'), MAPJOIN(t2) */
    t1.key, t2.value
FROM table1 t1
JOIN table2 t2
ON t1.key = t2.key

Azione Bootstrap per l’aumento verticale dinamico dello spazio di archiviazione

Puoi utilizzare l'azione Bootstrap tramite il monitoraggio di Amazon CloudWatch e l'automazione Lambda per scalare automaticamente lo spazio di archiviazione per i cluster Amazon EMR. Quando lo spazio disponibile su disco scende al di sotto di una soglia prestabilita, Amazon CloudWatch avvia una funzione AWS Lambda. Tale funzione collega nuovi volumi Amazon Elastic Block Store (Amazon EBS) ai nodi del cluster. La funzione esegue quindi gli script per formattare, montare e integrare i volumi nel File system distribuito Hadoop (HDFS).

Questo approccio automatizzato previene gli errori del cluster causati da vincoli di archiviazione e mantiene l'efficacia dei costi aggiungendo capacità solo quando è necessaria. La sua implementazione richiede ruoli Identity and Access Management (IAM) appropriati, l'impostazione di allarmi in Amazon CloudWatch, la configurazione di AWS Lambda e script personalizzati per la gestione dei volumi. Per ulteriori informazioni, consulta Dynamically scale up storage on Amazon EMR clusters (Aumento verticale dinamico dello spazio di archiviazione su cluster Amazon EMR).

Aumenta la capacità a disposizione di Amazon EBS

Per i nuovi cluster utilizza volumi EBS più grandi

Avvia un cluster Amazon EMR e scegli un tipo di istanza Amazon Elastic Compute Cloud (Amazon EC2) con volumi EBS più grandi. Per ulteriori informazioni, consulta Spazio di archiviazione predefinito in Amazon EBS per le istanze.

Per i cluster in esecuzione aggiungi altri volumi EBS

Completa i seguenti passaggi:

  1. Se un volume EBS più grande non risolve il problema, collega più volumi EBS ai nodi core e attività.

  2. Formatta e monta i volumi collegati. Assicurati di utilizzare il numero di disco corretto (ad esempio, /mnt1 o /mnt2 anziché /data).

  3. Utilizza un client SSH per connetterti al nodo.

  4. Crea una directory /mnt2/yarn, quindi imposta la proprietà della directory per l'utente YARN:

    sudo mkdir /mnt2/yarn
    sudo chown yarn:yarn /mnt2/yarn
  5. Aggiungi la directory /mnt2/yarn alla proprietà yarn.nodemanager.local-dirs di /etc/hadoop/conf/yarn-site.xml.
    Esempio:

    <property>
        <name>yarn.nodemanager.local-dirs</name>
        <value>/mnt/yarn,/mnt1/yarn,/mnt2/yarn</value>
    </property>
  6. Riavvia il servizio NodeManager:
    Amazon EMR versioni 4.x-5.29.0

    sudo stop hadoop-yarn-nodemanager
    sudo start hadoop-yarn-nodemanager

    Amazon EMR 5.30.0 e versioni successive

    sudo systemctl stop hadoop-yarn-nodemanager
    sudo systemctl start hadoop-yarn-nodemanager

Informazioni correlate

Come posso risolvere gli errori di fase nei processi Spark in Amazon EMR?

Che cos'è Amazon EMR su EKS?

Cos'è Amazon EMR Serverless?

Best practices (Best practice) sul sito web di AWS Open Data Analytics

AWS UFFICIALE
AWS UFFICIALEAggiornata un mese fa