如何防止 Amazon SQS 消息多次调用我的 Lambda 函数?

2 分钟阅读
0

我配置了 AWS Lambda 函数以处理 Amazon Simple Queue Service (Amazon SQS) 队列中的消息。现在,我的一些有效 Amazon SQS 消息在达到 maxReceiveCount 之前会被多次接收。如何停止对同一 Amazon SQS 消息的 Lambda 函数重复调用?

简短描述

Lambda 支持至少一次消息传递。在某些情况下,重试机制可能会发送相同消息的副本。然后,Amazon SQS 会将消息发送到死信队列(如果您已经配置了死信队列)。

对同一 Amazon SQS 消息重复调用 Lambda 可能由于以下原因之一:

  • 函数返回错误或超时。
  • Lambda 服务成功完成批处理后,无法在可见性超时到期之前从 Amazon SQS 队列中删除消息。
  • Lambda 服务发送了事件,但未能收到函数的确认。
  • 间歇性的问题导致 Amazon SQS 返回相同的消息,Lambda 服务再次轮询该消息。
  • 批处理窗口和函数持续时间之和长于 Amazon SQS 队列可见性超时。SQS 可见性超时必须至少是函数超时和批处理窗口超时总和的六倍。

要解决这些问题,请在 Lambda 函数的 SQS 触发器中启用报告批处理项目故障。然后,创建模块化函数代码,在批处理、处理和删除成功消息和重复消息中进行迭代。该函数将成功消息的 messageID 存储在 Amazon DynamoDB 表中,然后验证消息是否已在前面处理。

**重要提示:**以下解决方案会针对每条传入消息向 DynamoDB 发送多个请求,从而减缓处理时间。这也导致 API 调用成本的升高。因此,请务必考虑项目采用此解决方案所花费的时间和成本。如果重复 Lambda 调用的错误率很低,则此解决方案所花费的时间和成本可能会超过其所带来的优点。

解决方案

首先,通过检查消息 ID 确认您多次收到相同的消息。如果由于前面列出的任一原因而收到同一个消息的多个副本,则这些消息具有相同的 ID。在这种情况下,请按照以下步骤操作。如果您收到多条内容相同但消息 ID 不同的消息,则该消息会被多次发送到队列。在这种情况下,请检查您的消息创建器。

以下步骤仅适用于 Lambda 函数的 SQS 触发器。它们不适用于手动拉取请求。

创建 DynamoDB 表

以下 DynamoDB 表保存您的消息 ID,以便 Lambda 函数比较它们是否重复。

  1. 打开 DynamoDB 控制台
  2. 选择 Create table(创建表)。
  3. Create DynamoDB table(创建 DynamoDB 表)屏幕上,设置以下值:
    对于 Table(表),输入 ProcessedRecords
    Primary key(主键)下,对于 Partition key(分区键),输入 Records
    将数据类型设置为 String(字符串)
  4. 根据您使用案例的需要输入其他设置。然后,选择 Create (创建)

创建 Lambda 函数

重要提示:Lambda 函数代码必须是幂等的。有关幂等性最佳实践和示例函数逻辑,请参阅如何让我的 Lambda 函数幂等?

创建 DynamoDB 表后,创建 Lambda 函数。此函数将传入的消息与之前成功后保存在 DynamoDB 表中的消息进行比较。如果之前成功发送过消息,则该函数不允许处理重复消息。如果新的唯一消息处理成功,则会将它们发送到表中以供将来比较。失败的消息会经过重试,直到成功处理,或者直到消息的 ReceiveCount 超过 maxReceiveCount 为止。

在您的函数中:

在下面的示例中,函数逻辑会将消息正文转换为大写。该函数使用 process_message(...) 方法编写:

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])

相关信息

为什么我的 Lambda 函数重试有效的 Amazon SQS 消息并将其放入死信队列中?


AWS 官方
AWS 官方已更新 1 年前