AWS re:Postを使用することにより、以下に同意したことになります AWS re:Post 利用規約

Amazon SQS メッセージが Lambda 関数を複数回呼び出さないようにするにはどうすればよいですか?

所要時間2分
0

Amazon Simple Queue Service (Amazon SQS) キューのメッセージを処理するために AWS Lambda 関数を設定しました。現在、有効な Amazon SQS メッセージの一部は maxReceiveCount まで複数回受信され、デッドレターキューに入ってしまいます。同じ Amazon SQS メッセージに対する Lambda 関数の重複呼び出しを止めるにはどうすればよいですか?

簡単な説明

Lambda は少なくとも 1 回のメッセージ配信をサポートしています。場合によっては、再試行メカニズムによって同じメッセージが重複して送信されることがあります。Amazon SQS は、デッドレターキューを設定していれば、メッセージをデッドレターキューに送信します。

同じ Amazon SQS メッセージに対する Lambda 呼び出しの重複は、次のいずれかの理由で発生する可能性があります。

  • 関数がエラーを返すか、タイムアウトしました。
  • Lambda サービスは、バッチが成功した後、可視性タイムアウトが期限切れになる前に Amazon SQS キューからメッセージを削除できません。
  • Lambda サービスはイベントをに送信しましたが、関数からの承認を受け取れませんでした。
  • 断続的な問題により、Amazon SQS から同じメッセージが返され、Lambda サービスによって再度ポーリングされました。
  • バッチウィンドウと関数期間の合計がAmazon SQS キューの可視性タイムアウトより長くなっています。SQS 可視性タイムアウトは、関数タイムアウトとバッチウィンドウタイムアウトの合計の 6 倍以上にする必要があります。

これらの問題を解決するには、Lambda 関数の SQS トリガーで [バッチ項目の失敗を報告] をオンにします。次に、バッチを反復処理し、成功したメッセージと重複したメッセージを処理、削除するモジュラー関数コードを作成します。この関数は、成功したメッセージの messageID を Amazon DynamoDB テーブルに保存し、メッセージが以前に処理されたことを確認します。

重要: 次の解決策では、受信メッセージごとに複数のリクエストを DynamoDB に送信するため、処理時間が長くなります。この結果、API の呼び出しによるコスト も高くなります。したがって、プロジェクトにおけるこの解決策によって生じる時間とコストを必ず考慮するようにしてください。Lambda 呼び出しの重複によるエラー率が低い場合は、この解決策による時間とコストがメリットを上回る可能性があります。

解決方法

まず、メッセージ ID を確認して、同じメッセージが複数回受信されていることを確認します。前述のいずれかの理由で同じメッセージのコピーを複数回受信する場合、メッセージの ID は同じです。この場合は、以下の手順に従ってください。同じ内容でメッセージ ID が異なる複数のメッセージを受信する場合、メッセージはキューに複数回送信されています。この場合は、メッセージプロデューサーを確認してください。

次の手順は Lambda 関数の SQS トリガーにのみ適用されます。手動のプルリクエストでは機能しません。

DynamoDB テーブルを作成する

次の DynamoDB テーブルにはメッセージ ID が格納されているため、Lambda 関数が重複を確認できます。

  1. [DynamoDB コンソール] を開きます。
  2. テーブルを作成 を選択します。
  3. [Create DynamoDB table] 画面で、次の操作を実行します。
    [テーブル] に「処理済みレコード」と入力します
    [プライマリキー] の [パーティションキー] に「Records
    データ型を String に設定します。
  4. ユースケースに応じて、必要に応じて他の設定を入力します。その後、[Create] を選択します。

Lambda 関数の作成

重要:Lambda 関数コードはべき等である必要があります。冪等性のあるベストプラクティスと関数ロジックの例については、「Lambda 関数を冪等にするにはどうすればよいですか?」を参照してください。

DynamoDB テーブルを作成してから、Lambda 関数を作成します。この関数は、受信メッセージを、以前に成功して DynamoDB テーブルに保持されたメッセージと比較します。メッセージが以前に成功していた場合、この関数は重複の処理を許可しません。新しい一意のメッセージが成功すると、それらは後で比較できるようにテーブルに送信されます。失敗したメッセージは、正常に処理されるまで、またはメッセージの ReceiveCountmaxReceiveCount を超えるまで再試行されます。

ユーザーの関数では次のようになります。

次の例では、関数ロジックがメッセージ本文を大文字に変換します。process_message (...) メソッドで書かれています。

import boto3

dynamodb_client = boto3.client('dynamodb')

DDB_TABLE = 'ProcessedRecords'

# Validates if the message is already processed in previous invokes.
# @input string message_id
#
# @param message_id used to query the message from DynamoDB
# @return Boolean
def is_duplicate_message(message_id):
    return dynamodb_client.query(
        TableName='ProcessedRecords',
        Select='COUNT',
        KeyConditionExpression='Records = :Records',
        ExpressionAttributeValues={
            ':Records': {'S': message_id}
        }
    )["Count"] != 0
    
# Processes the message body to upper case.
# @input string body
#
# @param body to be processed
# @return uppercase body
def process_message(body):
    return body.upper()

# Put the message to the DynamoDB Table.
# @input string batch_item_success
#
# @param batch_item_success of the message to put.
# @return Boolean
def push_to_dynamoDB(batch_item_success):
    
    for message_id in batch_item_success:
        response = dynamodb_client.put_item(
                        TableName = DDB_TABLE, 
                        Item={ 'Records': {'S':message_id}
                        }
                    )
    return True

# processor function iterating through messages in the event.
# @input dict Records
#
# @param Records to be processed
# @return Boolean
def iterate_records(Records):
    
    batch_item_failures = []
    batch_item_success = []
    
    for record in Records:
        
        message_id = record["messageId"]
        print("Message: " + message_id)
        if is_duplicate_message(message_id):   
            print("Message duplicate: " + message_id)
            continue
        
        try:
            process_message(record["body"])
            batch_item_success.append(message_id) 
        except:
            batch_item_failures.append({"itemIdentifier": message_id}) 
    
    push_to_dynamoDB(batch_item_success)
    return batch_item_failures
    
def lambda_handler(event, context):
    
    return iterate_records(event["Records"])

関連情報

Lambda 関数が有効な Amazon SQS メッセージを再試行して、デッドレターキューに入れるのはなぜですか?


AWS公式
AWS公式更新しました 2年前