Como soluciono falhas de estágio “no space left on device” em uma tarefa do Apache Spark no Amazon EMR?

7 minuto de leitura
0

Quando eu envio uma aplicação Apache Spark para um cluster do Amazon EMR, a aplicação apresenta a falha de estágio “no space left on device” (não há mais espaço no dispositivo).

Breve descrição

A aplicação Apache Spark pode encontrar um erro de falta de espaço no dispositivo por um dos seguintes motivos:

  • Dados intermediários substanciais são gerados durante o processo de embaralhamento devido à presença de junções aleatórias
  • Distribuição desigual de partições de dados e distribuição do workload do executor
  • Dimensionamento e contagem incorretos de partições
  • Disponibilidade inadequada de recursos, como disco e memória

O Apache Spark usa armazenamento local nos nós principais e de tarefa para armazenar dados intermediários (embaralhamento). Quando os discos ficam sem espaço na instância, o trabalho falha com um erro de falta de espaço no dispositivo.

Resolução

Este artigo aborda as causas e soluções mais frequentes para o erro de falta de espaço no dispositivo. Você deve identificar a causa raiz para implementar a correção apropriada.

Observação: Se você receber erros ao executar comandos da AWS Command Line Interface (AWS CLI), consulte Solução de problemas da AWS CLI. Além disso, verifique se você está usando a versão mais recente da AWS CLI.

Reparticionamento

Com base em quantos nós principais e de tarefa existem no cluster, talvez seja necessário aumentar o número de partições do Spark. Para adicionar mais partições do Spark, execute o seguinte comando:

val numPartitions = 500
val newDF = df.repartition(numPartitions)

Observação: substitua 500 pelo número de partições adequado ao seu caso de uso.

Ajustar as configurações do Spark

Gerenciamento de partições

Quando houver vazamentos excessivos de disco durante a tarefa de embaralhamento ou uma distribuição desigual de dados entre partições, ajuste os seguintes parâmetros:

spark.default.parallelism=${NUM_CORES * 2} #no. of partitions in RDDs
spark.sql.shuffle.partitions=${NUM_CORES * 2} #no. of shuffle partitions
spark.sql.files.maxPartitionBytes=256MB #max. no. of bytes in a partition when reading files
spark.sql.files.maxRecordsPerFile=10000000

Aumente o paralelismo e o número de partições se uma das seguintes opções for verdadeira:

  • A variação da duração da tarefa é maior que 3 vezes a duração média
  • O vazamento por tarefa é maior que 20%

Se o tamanho médio da partição for menor que 50 MB ou se houver muitos arquivos pequenos, diminua o paralelismo e o número de partições.

Para calcular a contagem ideal de partições, use a seguinte fórmula:

Initial Partitions = Number of Cores * 2
Optimal Partitions = max(
    Total Input Size / Target Partition Size,
    Initial Partitions
)

Ajuste com base no volume de dados

Seguem os parâmetros de configuração para diferentes tamanhos de conjuntos de dados:

Conjunto de dados pequeno (<100 GB):

spark.sql.files.maxPartitionBytes=128MB
spark.sql.shuffle.partitions=NUM_CORES * 2
spark.sql.files.maxRecordsPerFile=5000000

Conjunto de dados médio (100 GB - 1 TB):

spark.sql.files.maxPartitionBytes=256MB
spark.sql.shuffle.partitions=NUM_CORES * 3
spark.sql.files.maxRecordsPerFile=10000000

Grande conjunto de dados (>1 TB):

spark.sql.files.maxPartitionBytes=512MB
spark.sql.shuffle.partitions=NUM_CORES * 4
spark.sql.files.maxRecordsPerFile=20000000

Otimização de memória e armazenamento

Para otimizar a memória e o armazenamento, atualize seus parâmetros de configuração:

spark.memory.fraction=0.8 # Higher for compute-intensive jobs
spark.memory.storageFraction=0.3 # Lower for shuffle-heavy workloads
spark.executor.memoryOverhead=0.2 # 20% of executor memory
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=${EXECUTOR_MEMORY * 0.2}

Para calcular a alocação total de memória para um contêiner de executor do Spark, os quatro componentes de memória a seguir são combinados:

  • Memória do executor (spark.executor.memory)
  • Sobrecarga de memória (spark.executor.memoryOverhead)
  • Memória externa (spark.memory.offHeap.size)
  • Memória PySpark (spark.executor.pyspark.memory)

Memória total do contêiner do executor = spark.executor.memory + spark.executor.memoryOverhead + spark.memory.offHeap.size + spark.executor.pyspark.memory

O cálculo a seguir determina a alocação de memória interna do Spark:

Storage Memory = Executor Memory * memory.fraction * memory.storageFraction
Execution Memory = Executor Memory * memory.fraction * (1 - memory.storageFraction)
Off-Heap Memory = Executor Memory * 0.2

Gerenciamento de arquivos e discos

Se houver uma distorção na partição ou se o número de partições for muito alto ou muito baixo, ajuste as configurações de gerenciamento de arquivos. Defina maxPartitionNum como 2 vezes o total de núcleos e minPartitionNum como 1, a menos que seus casos de uso exijam um valor diferente.

# File Management
spark.sql.files.minPartitionNum=${NUM_CORES}
spark.sql.files.maxPartitionNum=${NUM_CORES * 4}
spark.shuffle.file.buffer=64k

Se maxPartitionNum estiver definido como muito baixo, isso poderá limitar o paralelismo e não impedir todos os cenários de distorção.

AQE e tratamento de distorções

O AQE (Adaptive Query Execution) no Spark é uma otimização do runtime que ajusta os planos de consulta com base em estatísticas em tempo real.

O AQE é ativado por padrão na versão 5.30.0 e posterior do Amazon EMR. O AQE no Spark pode otimizar estratégias de junção e embaralhamento automaticamente. Ele também pode lidar com distorção de dados com eficácia por meio da divisão dinâmica de partições. Isso melhora o balanceamento de carga e o desempenho da consulta.

# Skew Management
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=10
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB

Se você usa uma versão anterior do Spark que não suporta AQE, use um dos seguintes métodos para gerenciar a distorção de dados:

  • Ajuste o limite de spark.sql.autoBroadcastJoinThreshold para junções de transmissão. Isso é útil quando um conjunto de dados é significativamente menor do que o outro em junções.
  • Use repartition() ou coalesce() em seu código para melhorar a distribuição de dados.
  • Aplique dicas SKEW na tabela maior ou assimétrica e transmita a tabela menor. As dicas SKEW notificam o otimizador do Spark de que uma tabela tem dados distorcidos e ajudam a otimizar as estratégias de junção.

Veja a seguir um exemplo de dicas SKEW nas consultas do Spark SQL:

-- Using SKEW hint in Spark SQL
SELECT /*+ SKEW('t1') */
    t1.key, t2.value
FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key

-- Using MAPJOIN hint along with SKEW
SELECT /*+ SKEW('t1'), MAPJOIN(t2) */
    t1.key, t2.value
FROM table1 t1
JOIN table2 t2
ON t1.key = t2.key

Ação de bootstrap para aumentar de forma dinâmica a escala do armazenamento verticalmente

É possível usar a ação de bootstrap por meio do monitoramento do Amazon CloudWatch e da automação do Lambda para escalar automaticamente o armazenamento para clusters do Amazon EMR. Quando o espaço em disco disponível fica abaixo de um limite definido, o Amazon CloudWatch lança uma função do AWS Lambda. Essa função anexa novos volumes do Amazon Elastic Block Store (Amazon EBS) aos nós do cluster. Em seguida, a função executa scripts para formatar, montar e integrar os volumes no Sistema de Arquivos Distribuído do Hadoop (HDFS).

Essa abordagem automatizada evita falhas de cluster causadas por restrições de armazenamento e mantém a relação custo-benefício adicionando capacidade somente quando necessário. A implementação exige perfis adequados do Identity and Access Management (IAM), alarmes do Amazon CloudWatch, configuração do AWS Lambda e scripts personalizados para gerenciamento de volumes. Para mais informações, consulte Aumentar de forma dinâmica a escala do armazenamento verticalmente em clusters do Amazon EMR.

Adicionar mais capacidade do Amazon EBS

Para novos clusters, use volumes maiores do EBS

Inicie um cluster do Amazon EMR e escolha um tipo de instância do Amazon Elastic Compute Cloud (Amazon EC2) com volumes maiores do EBS. Para mais informações, consulte Armazenamento do Amazon EBS padrão para instâncias.

Para clusters em execução, adicione mais volumes do EBS

Conclua as etapas a seguir:

  1. Se um volume maior do EBS não resolver o problema, anexe mais volumes do EBS aos nós principais e de tarefa.

  2. Formate e monte os volumes anexados. Certifique-se de usar o número de disco correto, por exemplo, /mnt1 ou /mnt2 em vez de /data.

  3. Use um cliente SSH para se conectar ao nó.

  4. Crie um diretório /mnt2/yarn e, em seguida, defina a propriedade do diretório para o usuário do YARN:

    sudo mkdir /mnt2/yarn
    sudo chown yarn:yarn /mnt2/yarn
  5. Adicione o diretório /mnt2/yarn na propriedade yarn.nodemanager.local-dirs de /etc/hadoop/conf/yarn-site.xml.
    Exemplo:

    <property>
        <name>yarn.nodemanager.local-dirs</name>
        <value>/mnt/yarn,/mnt1/yarn,/mnt2/yarn</value>
    </property>
  6. Reinicie o serviço NodeManager:
    Versões do Amazon EMR 4.x-5.29.0

    sudo stop hadoop-yarn-nodemanager
    sudo start hadoop-yarn-nodemanager

    Amazon EMR 5.30.0 e versões posteriores

    sudo systemctl stop hadoop-yarn-nodemanager
    sudo systemctl start hadoop-yarn-nodemanager

Informações relacionadas

Como posso solucionar falhas de estágio em trabalhos do Spark no Amazon EMR?

O que é o Amazon EMR no EKS?

O que é o Amazon EMR Sem Servidor?

Práticas recomendadas no site do AWS Open Data Analytics

AWS OFICIALAtualizada há 2 meses