SQS polling rate

0

I want to poll only 50 messages per minute. Is it possible? or should I publish only 50 messages/minute in the queue?

已提问 3 年前1039 查看次数
1 回答
0
已接受的回答

It is possible to control the polling rate in the consumer. This can be easily implemented with a token vending mechanism. That is, in the main application you start up two threads, one thread produces tokens (50 tokens per minute), and the other thread performs polling only when tokens are available. If you set the MaxNumberOfMessages in the ReceiveMessage API call to 1 (which is the default value), this token vending mechanism ensures that you can perform up to the desired number of polling during a certain period of time.

Below is an example in Python that demonstrate the concept. Here the combination of the QoSCounter and the qosRefillThread forms a token vending machine. The actual work is done by the SQSWorker, and it only performs work when token is available from the token vending machine.

import time
import boto3
import multiprocessing

"""
QoSCounter is a LeakyBucket QoS algorithm. Each sub-process can not do any polling
unless the QoSCounter is greater than 0. The main process needs to start a separate
process to call the refill() method every minute to refill the LeakyBucket.
"""

class QoSCounter(object):
    def __init__(self, value=0):
        """
        RawValue because we don't need it to create a Lock:
        """
        self.capacity   = multiprocessing.RawValue('i', value)
        self.refillRate = multiprocessing.RawValue('i', value)
        self.lock       = multiprocessing.Lock()

    def consume(self, value=0):
        with self.lock:
            self.capacity.value -= value

    def refill(self):
        """
        Here we assume limit capacity for the LeakyBucket to avoid burst traffic.
        """ 
        with self.lock:
            self.capacity.value += self.refillRate.value
            if self.capacity.value > self.refillRate.value:
              self.capacity.value = self.refillRate.value

    def value(self):
        with self.lock:
            return self.capacity.value

"""
This is a thread to refill the QoSCounter once every munite.
"""
def qosRefillThread(counter):
  while True:
      counter.refill()
      time.sleep(60)


"""
SQSWorker is a thread, and multiple thread shares the same QoSCounter for rate
limit.
"""
def SQSWorker(workerId, counter):
#  session = boto3.session.Session()
#  client = session.resource('sqs', region_name = 'us-east-1')
  while True:
    while counter.value() <= 0:
      time.sleep(1)
    counter.consume(1)
    print('Do a polling')
#   do the work here


"""
The main program starts here.
"""
counter = QoSCounter(50)
qos = multiprocessing.Process(target=qosRefillThread, args=(counter, ))
qos.start()
worker = multiprocessing.Process(target=SQSWorker, args=('Test', counter))
worker.start()

Since SQSWorker is a thread, you can have multiple SQSWorker working in parallel, and these workers will share the same QoSCounter. This ensures that you do not exceed the desired rate limit when you do the work with multiple sub-processes.

AWS
已回答 3 年前
profile picture
专家
已审核 1 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则