Como impedir que uma mensagem do Amazon SQS invoque minha função do Lambda mais de uma vez?

5 minuto de leitura
0

Configurei minha função do AWS Lambda para processar mensagens em uma fila do Amazon Simple Queue Service (Amazon SQS). Agora, algumas das mensagens válidas do Amazon SQS são recebidas várias vezes até o maxReceiveCount. Como interromper invocações duplicadas de função do Lambda para a mesma mensagem do Amazon SQS?

Breve descrição

O Lambda é compatível com a entrega de mensagens at-least-once. Em alguns casos, o mecanismo de nova tentativa pode enviar duplicatas da mesma mensagem. Em seguida, o Amazon SQS envia as mensagens para sua fila de mensagens não enviadas, caso você tenha configurado.

As invocações duplicadas do Lambda para a mesma mensagem do Amazon SQS podem ocorrer por um destes motivos:

  • Sua função retorna um erro ou atinge o tempo limite.
  • O serviço do Lambda não consegue excluir uma mensagem da fila do Amazon SQS após um lote bem-sucedido antes que o tempo limite de visibilidade expire.
  • O serviço do Lambda enviou o evento para a função, mas não recebeu uma confirmação da função.
  • Um problema intermitente fez com que o Amazon SQS retornasse a mesma mensagem e fosse pesquisado novamente pelo serviço do Lambda.
  • A soma da janela do lote e da duração da função é maior do que o tempo limite de visibilidade da fila do Amazon SQS. O tempo limite de visibilidade do SQS deve ser pelo menos seis vezes o tempo limite total da função e o tempo limite da janela do lote.

Para resolver esses problemas, ative Report Batch item failure (Relatório de falha de item do lote) no acionador SQS da função do Lambda. Em seguida, crie um código de função modular que itera pelo lote, processa e exclui mensagens duplicadas e bem-sucedidas. A função armazena o messageID das mensagens bem-sucedidas em uma tabela do Amazon DynamoDB e verifica se a mensagem foi processada anteriormente.

Importante: a resolução a seguir diminui o tempo de processamento ao enviar várias solicitações ao DynamoDB para cada mensagem recebida. Isso também resulta em custos mais altos com chamadas de API. Por isso, considere o tempo e o custo dessa resolução para seu projeto. Se sua taxa de erro para chamadas Lambda duplicadas for baixa, o tempo e o custo dessa resolução podem não valer a pena.

Resolução

Primeiro, confirme se você está recebendo a mesma mensagem várias vezes verificando o ID da mensagem. Se receber várias cópias da mesma mensagem devido a qualquer um dos motivos listados anteriormente, as mensagens terão o mesmo ID. Nesse caso, siga as etapas abaixo. Se receber várias mensagens com o mesmo conteúdo, mas com IDs de mensagem diferentes, a mensagem está sendo enviada para a fila mais de uma vez. Nesse caso, verifique seu produtor de mensagens.

As etapas a seguir se aplicam somente ao acionador SQS de uma função do Lambda. Elas não funcionam para solicitações pull manuais.

Crie uma tabela do DynamoDB

A tabela do DynamoDB a seguir contém seus IDs de mensagens para que uma função do Lambda possa compará-los quanto à duplicação.

  1. Abra o console do DynamoDB.
  2. Escolha Create table (Criar tabela).
  3. Na tela Create DynamoDB table (Criar tabela do DynamoDB), defina os seguintes valores:
    Em Table (Tabela), insira ProcessedRecords
    Em Primary key (Chave primária), em Partition key (Chave de partição), insira Records (Registros)
    Defina o tipo de dados como String
  4. Insira outras configurações conforme necessário para seu caso de uso. Em seguida, escolha Create (Criar).

Criar uma função do Lambda

Importante: o código da função do Lambda deve ser idempotente. Para obter as práticas recomendadas de idempotência e exemplos de lógica de função, consulte How do I make my Lambda function idempotent? (Como tornar minha função do Lambda idempotente?)

Depois de criar a tabela do DynamoDB, crie uma função do Lambda. Essa função compara mensagens recebidas com mensagens que foram anteriormente bem-sucedidas e, assim, mantidas em sua tabela do DynamoDB. Se uma mensagem foi bem-sucedida anteriormente, a função não permite o processamento de duplicatas. Se mensagens novas e exclusivas forem bem-sucedidas, elas serão enviadas para a tabela para comparação futura. As mensagens com falha são repetidas até serem processadas com sucesso ou até que o ReceiveCount de uma mensagem exceda o maxReceiveCount.

Na função:

No exemplo a seguir, a lógica da função converte o corpo da mensagem em maiúsculas. Está escrito sob o método process_message(...):

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])

Informações relacionadas

Por que a função Lambda está tentando reenviar mensagens válidas do Amazon SQS e colocando-as na fila de mensagens não enviadas?


AWS OFICIAL
AWS OFICIALAtualizada há um ano