SQS long polling: messages delayed

0

Hi,

I use SQS standard queues with long polling via a Python client. Recently I've been seeing delays between when the message is successfully sent to the queue and when the message is received by the consumer, where the delay is approximately equal to the visibility timeout. Please see the simple example below, keeping in mind the following:
• There are no other consumers for this queue in this example
• When a message is ultimately received with delay, ApproximateReceiveCount=2 and ApproximateFirstReceiveTimestamp ~=SentTimestamp
• Debug-level logs in boto3 do not indicate that the delayed message was ever received in any way other than in the ultimate delayed receipt observed by my code
• A delay will be observed in ~30% of runs, and is always for one of the first few messages sent (have never seen it later than the 5th message after running for a long time)

#!/usr/bin/env python3

import os
import sys
import time
import boto3
import logging
import threading

# Logging setup omitted

times = {}
prefix = os.urandom(4).hex() + "_"

def sender(client, url):
    # Send a message every 2 seconds, recording the send time per message
    global times
    i = 0
    while True:
        body = prefix + str(i)
        times[body] = time.monotonic()
        client.send_message(QueueUrl=url, MessageBody=body)
        i += 1
        time.sleep(2)

def recver(client ,url):
    # Loop over receive_message forever on a long poll
    global times, total
    while True:
        resp = client.receive_message(QueueUrl=url,
                                      MaxNumberOfMessages=10,
                                      VisibilityTimeout=5,
                                      WaitTimeSeconds=20,
                                      AttributeNames=["All"])
        print("Got resp")
        for msg in resp["Messages"]:
            # Delete the received messages
            print(f"Msg attrs: {msg.get('Attributes')}")
            handle = msg["ReceiptHandle"]
            client.delete_message(QueueUrl=url, ReceiptHandle=handle)

            # See how long it took using the time set in the sender
            body = msg["Body"]
            try:
                start = times.pop(body)
            except KeyError:
                print(f"ERROR: key missing: {body}")
                continue
            delta = time.monotonic() - start
            print(f"{body} took {delta}")
            if delta > 1:
                print(f"LONG DELAY: body={body} delay={delta}")
                os._exit(1)

if __name__ == "__main__":
    # Start up a sender and receiver thread
    cli = boto3.client("sqs", region_name='us-east-1')
    url = cli.get_queue_url(QueueName="test_py")["QueueUrl"]
    threading.Thread(target=recver, args=(cli, url), daemon=True).start()
    threading.Thread(target=sender, args=(cli, url), daemon=True).start()
    time.sleep(24 * 60 * 60)

Please let me know if I can provide any more info. Thanks in advance for your help!

질문됨 5년 전1433회 조회
2개 답변
0

How much do you wait before executing two runs of this same test code?

Your test code is using the same queue name for each run. On shutdown it does not seem to gracefully wait for the receive calls to finish. This means that when your test completes and you immediately start another copy of it, the first few messages you send into the queue are returned to the long-poll receive requests started by your previous test run, that you aborted waiting for, but were still issued to SQS.

To avoid the old long-poll receives from returning messages from new sends, consider either using a unique queue name for each test run or waiting at least 20s between runs so that all old long polls complete as empty.

Kuba
답변함 5년 전
0

You're right, that was the exact problem. Thanks for the help.

답변함 5년 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠