How can I prevent an Amazon SQS message from invoking my Lambda function more than once?

0

I configured my AWS Lambda function to process messages in an Amazon Simple Queue Service (Amazon SQS) queue. Now, some of my valid Amazon SQS messages are received multiple times, up to the maxReceiveCount. How do I stop duplicate Lambda function invocations for the same Amazon SQS message?

AWS
전문가
질문됨 2년 전5651회 조회
3개 답변
7

There may be different reasons for this:

First, SQS standard queues are "at least once delivery" so it may be that the same message will be delivered more than once. It is important to have your processing to be idempotent so that further invocations will not have bad affect on the system.

Another option might be a short visibility timeout. If the visibility timeout is too short, the message may become visible again before the function finishes handling it. The recommended value for the visibility timeout is 6 * function duration.

It may also be that your function ended with an exception. In this case the messages will not be deleted from the queue and will be delivered again.

profile pictureAWS
전문가
Uri
답변함 2년 전
0

Short description

Lambda supports at-least-once message delivery. In some cases, the retry mechanism might send duplicates of the same message. Amazon SQS then sends the messages to your dead-letter queue, if you've configured one.

Duplicate Lambda invokes for the same Amazon SQS message can happen for one of the following reasons:

  • Your function returns an error or times out.
  • The Lambda service fails to delete a message from the Amazon SQS queue after a successful batch before the visibility timeout expires.
  • The Lambda service sent the event to but failed to receive an acknowledgement from the function.
  • An intermittent issue caused Amazon SQS to return the same message and it's polled again by the Lambda service.
  • The sum of the batch window and the function duration is longer than your Amazon SQS queue visibility timeout. The SQS visibility timeout must be at least six times the total of the function timeout and the batch window timeout.

To resolve these issues, turn on Report Batch item failure in your Lambda function's SQS trigger. Then, create a modular function code that iterates through the batch, processes, and deletes successful and duplicate messages. The function stores the messageID of the successful messages in a DynamoDB table and then verifies that the message was processed earlier.

Note: Be sure to consider the cost for making API calls to Amazon DynamoDB.

Resolution

Create a DynamoDB table

  1. Open the DynamoDB console.
  2. Choose Create table.
  3. In the Create DynamoDB table screen, set the following values:
    • For Table, enter ProcessedRecords
    • Under Primary key, for Partition key, enter Records
    • Set the data type to String
  4. Enter other settings as needed for your use case. Then, choose Create.

** Create a Lambda function**

Important: The Lambda function code must be idempotent. For idempotency best practices and example function logic, see How do I make my Lambda function idempotent?

After you have created a DynamoDB table, create a Lambda function.

In your function:

  • Add an execution role to allow the **dynamodb:Query **and **dynamodb:PutItem **actions.
  • Report batch item failures (Using Lambda with Amazon SQS) in the SQS trigger to identify and skip duplicate messages in the batch.

In the following example, the function logic converts the message body to upper case. It is written under process_message(...) method:

import boto3
dynamodb_client = boto3.client('dynamodb')
DDB_TABLE = 'ProcessedRecords'
# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0

   # Processes the message body to upper case.
   # @input string body
   #
   # @param body to be processed
   # @return uppercase body
   def process_message(body):
       return body.upper()
   # Put the message to the DynamoDB Table.
   # @input string batch_item_success
   #
   # @param batch_item_success of the message to put.
   # @return Boolean
   def push_to_dynamoDB(batch_item_success):

          for message_id in batch_item_success:
              response = dynamodb_client.put_item(
                               TableName = DDB_TABLE, 
                               Item={ 'Records': {'S':message_id}
                                }                                             
                         )
          return True
   # processor function iterating through messages in the event.
   # @input dict Records
   #
   # @param Records to be processed
   # @return Boolean
   def iterate_records(Records):

              batch_item_failures = []
              batch_item_success = []

              for record in Records:
    
                     message_id = record["messageId"]
                     print("Message: " + message_id)
                     if is_duplicate_message(message_id):   
                            print("Message duplicate: " + message_id)
                            continue
    
                         try:
                             process_message(record["body"])
                             batch_item_success.append(message_id) 
                         except:
                               batch_item_failures.append({"itemIdentifier": message_id}) 

              push_to_dynamoDB(batch_item_success)
              return batch_item_failures

   def lambda_handler(event, context):

           return iterate_records(event["Records"])

Related information

Why is my Lambda function retrying valid Amazon SQS messages and placing them in my dead-letter queue?

AWS
전문가
답변함 2년 전
AWS
전문가
검토됨 2년 전
0

Hi Christine,

assuming you're not hitting the limits of an SQS fifo queue, that would be the easiest solution. One limit is that the fifo version only supports up to 300 API calls per second, per API method (SendMessage, ReceiveMessage, or DeleteMessage) [https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html]. if you're able to use batch operations, you can exceed those limits.

If you're eventually hitting the limits, you need to manually do a deduplication [https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html] but that requires some implementation effort.

Kind regards

Till
답변함 2년 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠