SQS轮询可以限制速率吗?

0

【以下的问题经过翻译处理】 我想每分钟只轮询50个消息可以吗?或者我应该每分钟只发布50个消息到队列中?

profile picture
EXPERTE
gefragt vor 5 Monaten10 Aufrufe
1 Antwort
0

【以下的回答经过翻译处理】 需要在消费者中控制轮询速率。通过令牌发放机制来实现。也就是说,在主应用程序中启动两个线程:一个线程产生令牌(每分钟50个令牌),另一个线程只在令牌可用时执行轮询。如果您在ReceiveMessage API调用中将MaxNumberOfMessages设置为1(这是默认值),则此令牌发放机制可确保您在某段时间内最多执行所需数量的轮询。

以下是一个用Python演示概念的示例。这里QoSCounterqosRefillThread的组合形成了一个令牌发放机。SQSWorker执行实际工作,只有在从令牌发放机中获取到令牌时才执行工作。

import time
import boto3
import multiprocessing

"""
QoSCounter is a LeakyBucket QoS algorithm. Each sub-process can not do any polling
unless the QoSCounter is greater than 0. The main process needs to start a separate
process to call the refill() method every minute to refill the LeakyBucket.
"""

class QoSCounter(object):
    def __init__(self, value=0):
        """
        RawValue because we don't need it to create a Lock:
        """
        self.capacity   = multiprocessing.RawValue('i', value)
        self.refillRate = multiprocessing.RawValue('i', value)
        self.lock       = multiprocessing.Lock()

    def consume(self, value=0):
        with self.lock:
            self.capacity.value -= value

    def refill(self):
        """
        Here we assume limit capacity for the LeakyBucket to avoid burst traffic.
        """ 
        with self.lock:
            self.capacity.value += self.refillRate.value
            if self.capacity.value > self.refillRate.value:
              self.capacity.value = self.refillRate.value

    def value(self):
        with self.lock:
            return self.capacity.value

"""
This is a thread to refill the QoSCounter once every munite.
"""
def qosRefillThread(counter):
  while True:
      counter.refill()
      time.sleep(60)


"""
SQSWorker is a thread, and multiple thread shares the same QoSCounter for rate
limit.
"""
def SQSWorker(workerId, counter):
#  session = boto3.session.Session()
#  client = session.resource('sqs', region_name = 'us-east-1')
  while True:
    while counter.value() <= 0:
      time.sleep(1)
    counter.consume(1)
    print('Do a polling')
#   do the work here


"""
The main program starts here.
"""
counter = QoSCounter(50)
qos = multiprocessing.Process(target=qosRefillThread, args=(counter, ))
qos.start()
worker = multiprocessing.Process(target=SQSWorker, args=('Test', counter))
worker.start()
profile picture
EXPERTE
beantwortet vor 5 Monaten

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen