SQS polling rate

0

I want to poll only 50 messages per minute. Is it possible? or should I publish only 50 messages/minute in the queue?

已提問 3 年前檢視次數 958 次
1 個回答
0
已接受的答案

It is possible to control the polling rate in the consumer. This can be easily implemented with a token vending mechanism. That is, in the main application you start up two threads, one thread produces tokens (50 tokens per minute), and the other thread performs polling only when tokens are available. If you set the MaxNumberOfMessages in the ReceiveMessage API call to 1 (which is the default value), this token vending mechanism ensures that you can perform up to the desired number of polling during a certain period of time.

Below is an example in Python that demonstrate the concept. Here the combination of the QoSCounter and the qosRefillThread forms a token vending machine. The actual work is done by the SQSWorker, and it only performs work when token is available from the token vending machine.

import time
import boto3
import multiprocessing

"""
QoSCounter is a LeakyBucket QoS algorithm. Each sub-process can not do any polling
unless the QoSCounter is greater than 0. The main process needs to start a separate
process to call the refill() method every minute to refill the LeakyBucket.
"""

class QoSCounter(object):
    def __init__(self, value=0):
        """
        RawValue because we don't need it to create a Lock:
        """
        self.capacity   = multiprocessing.RawValue('i', value)
        self.refillRate = multiprocessing.RawValue('i', value)
        self.lock       = multiprocessing.Lock()

    def consume(self, value=0):
        with self.lock:
            self.capacity.value -= value

    def refill(self):
        """
        Here we assume limit capacity for the LeakyBucket to avoid burst traffic.
        """ 
        with self.lock:
            self.capacity.value += self.refillRate.value
            if self.capacity.value > self.refillRate.value:
              self.capacity.value = self.refillRate.value

    def value(self):
        with self.lock:
            return self.capacity.value

"""
This is a thread to refill the QoSCounter once every munite.
"""
def qosRefillThread(counter):
  while True:
      counter.refill()
      time.sleep(60)


"""
SQSWorker is a thread, and multiple thread shares the same QoSCounter for rate
limit.
"""
def SQSWorker(workerId, counter):
#  session = boto3.session.Session()
#  client = session.resource('sqs', region_name = 'us-east-1')
  while True:
    while counter.value() <= 0:
      time.sleep(1)
    counter.consume(1)
    print('Do a polling')
#   do the work here


"""
The main program starts here.
"""
counter = QoSCounter(50)
qos = multiprocessing.Process(target=qosRefillThread, args=(counter, ))
qos.start()
worker = multiprocessing.Process(target=SQSWorker, args=('Test', counter))
worker.start()

Since SQSWorker is a thread, you can have multiple SQSWorker working in parallel, and these workers will share the same QoSCounter. This ensures that you do not exceed the desired rate limit when you do the work with multiple sub-processes.

AWS
已回答 3 年前
profile picture
專家
已審閱 3 天前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南