SQS polling rate

0

I want to poll only 50 messages per minute. Is it possible? or should I publish only 50 messages/minute in the queue?

feita há 3 anos958 visualizações
1 Resposta
0
Resposta aceita

It is possible to control the polling rate in the consumer. This can be easily implemented with a token vending mechanism. That is, in the main application you start up two threads, one thread produces tokens (50 tokens per minute), and the other thread performs polling only when tokens are available. If you set the MaxNumberOfMessages in the ReceiveMessage API call to 1 (which is the default value), this token vending mechanism ensures that you can perform up to the desired number of polling during a certain period of time.

Below is an example in Python that demonstrate the concept. Here the combination of the QoSCounter and the qosRefillThread forms a token vending machine. The actual work is done by the SQSWorker, and it only performs work when token is available from the token vending machine.

import time
import boto3
import multiprocessing

"""
QoSCounter is a LeakyBucket QoS algorithm. Each sub-process can not do any polling
unless the QoSCounter is greater than 0. The main process needs to start a separate
process to call the refill() method every minute to refill the LeakyBucket.
"""

class QoSCounter(object):
    def __init__(self, value=0):
        """
        RawValue because we don't need it to create a Lock:
        """
        self.capacity   = multiprocessing.RawValue('i', value)
        self.refillRate = multiprocessing.RawValue('i', value)
        self.lock       = multiprocessing.Lock()

    def consume(self, value=0):
        with self.lock:
            self.capacity.value -= value

    def refill(self):
        """
        Here we assume limit capacity for the LeakyBucket to avoid burst traffic.
        """ 
        with self.lock:
            self.capacity.value += self.refillRate.value
            if self.capacity.value > self.refillRate.value:
              self.capacity.value = self.refillRate.value

    def value(self):
        with self.lock:
            return self.capacity.value

"""
This is a thread to refill the QoSCounter once every munite.
"""
def qosRefillThread(counter):
  while True:
      counter.refill()
      time.sleep(60)


"""
SQSWorker is a thread, and multiple thread shares the same QoSCounter for rate
limit.
"""
def SQSWorker(workerId, counter):
#  session = boto3.session.Session()
#  client = session.resource('sqs', region_name = 'us-east-1')
  while True:
    while counter.value() <= 0:
      time.sleep(1)
    counter.consume(1)
    print('Do a polling')
#   do the work here


"""
The main program starts here.
"""
counter = QoSCounter(50)
qos = multiprocessing.Process(target=qosRefillThread, args=(counter, ))
qos.start()
worker = multiprocessing.Process(target=SQSWorker, args=('Test', counter))
worker.start()

Since SQSWorker is a thread, you can have multiple SQSWorker working in parallel, and these workers will share the same QoSCounter. This ensures that you do not exceed the desired rate limit when you do the work with multiple sub-processes.

AWS
respondido há 3 anos
profile picture
ESPECIALISTA
avaliado há 3 dias

Você não está conectado. Fazer login para postar uma resposta.

Uma boa resposta responde claramente à pergunta, dá feedback construtivo e incentiva o crescimento profissional de quem perguntou.

Diretrizes para responder a perguntas