SQS long polling: messages delayed

0

Hi,

I use SQS standard queues with long polling via a Python client. Recently I've been seeing delays between when the message is successfully sent to the queue and when the message is received by the consumer, where the delay is approximately equal to the visibility timeout. Please see the simple example below, keeping in mind the following:
• There are no other consumers for this queue in this example
• When a message is ultimately received with delay, ApproximateReceiveCount=2 and ApproximateFirstReceiveTimestamp ~=SentTimestamp
• Debug-level logs in boto3 do not indicate that the delayed message was ever received in any way other than in the ultimate delayed receipt observed by my code
• A delay will be observed in ~30% of runs, and is always for one of the first few messages sent (have never seen it later than the 5th message after running for a long time)

#!/usr/bin/env python3

import os
import sys
import time
import boto3
import logging
import threading

# Logging setup omitted

times = {}
prefix = os.urandom(4).hex() + "_"

def sender(client, url):
    # Send a message every 2 seconds, recording the send time per message
    global times
    i = 0
    while True:
        body = prefix + str(i)
        times[body] = time.monotonic()
        client.send_message(QueueUrl=url, MessageBody=body)
        i += 1
        time.sleep(2)

def recver(client ,url):
    # Loop over receive_message forever on a long poll
    global times, total
    while True:
        resp = client.receive_message(QueueUrl=url,
                                      MaxNumberOfMessages=10,
                                      VisibilityTimeout=5,
                                      WaitTimeSeconds=20,
                                      AttributeNames=["All"])
        print("Got resp")
        for msg in resp["Messages"]:
            # Delete the received messages
            print(f"Msg attrs: {msg.get('Attributes')}")
            handle = msg["ReceiptHandle"]
            client.delete_message(QueueUrl=url, ReceiptHandle=handle)

            # See how long it took using the time set in the sender
            body = msg["Body"]
            try:
                start = times.pop(body)
            except KeyError:
                print(f"ERROR: key missing: {body}")
                continue
            delta = time.monotonic() - start
            print(f"{body} took {delta}")
            if delta > 1:
                print(f"LONG DELAY: body={body} delay={delta}")
                os._exit(1)

if __name__ == "__main__":
    # Start up a sender and receiver thread
    cli = boto3.client("sqs", region_name='us-east-1')
    url = cli.get_queue_url(QueueName="test_py")["QueueUrl"]
    threading.Thread(target=recver, args=(cli, url), daemon=True).start()
    threading.Thread(target=sender, args=(cli, url), daemon=True).start()
    time.sleep(24 * 60 * 60)

Please let me know if I can provide any more info. Thanks in advance for your help!

asked 5 years ago1379 views
2 Answers
0

How much do you wait before executing two runs of this same test code?

Your test code is using the same queue name for each run. On shutdown it does not seem to gracefully wait for the receive calls to finish. This means that when your test completes and you immediately start another copy of it, the first few messages you send into the queue are returned to the long-poll receive requests started by your previous test run, that you aborted waiting for, but were still issued to SQS.

To avoid the old long-poll receives from returning messages from new sends, consider either using a unique queue name for each test run or waiting at least 20s between runs so that all old long polls complete as empty.

Kuba
answered 5 years ago
0

You're right, that was the exact problem. Thanks for the help.

answered 5 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions