SQS轮询可以限制速率吗?

0

【以下的问题经过翻译处理】 我想每分钟只轮询50个消息可以吗?或者我应该每分钟只发布50个消息到队列中?

profile picture
专家
已提问 5 个月前8 查看次数
1 回答
0

【以下的回答经过翻译处理】 需要在消费者中控制轮询速率。通过令牌发放机制来实现。也就是说,在主应用程序中启动两个线程:一个线程产生令牌(每分钟50个令牌),另一个线程只在令牌可用时执行轮询。如果您在ReceiveMessage API调用中将MaxNumberOfMessages设置为1(这是默认值),则此令牌发放机制可确保您在某段时间内最多执行所需数量的轮询。

以下是一个用Python演示概念的示例。这里QoSCounterqosRefillThread的组合形成了一个令牌发放机。SQSWorker执行实际工作,只有在从令牌发放机中获取到令牌时才执行工作。

import time
import boto3
import multiprocessing

"""
QoSCounter is a LeakyBucket QoS algorithm. Each sub-process can not do any polling
unless the QoSCounter is greater than 0. The main process needs to start a separate
process to call the refill() method every minute to refill the LeakyBucket.
"""

class QoSCounter(object):
    def __init__(self, value=0):
        """
        RawValue because we don't need it to create a Lock:
        """
        self.capacity   = multiprocessing.RawValue('i', value)
        self.refillRate = multiprocessing.RawValue('i', value)
        self.lock       = multiprocessing.Lock()

    def consume(self, value=0):
        with self.lock:
            self.capacity.value -= value

    def refill(self):
        """
        Here we assume limit capacity for the LeakyBucket to avoid burst traffic.
        """ 
        with self.lock:
            self.capacity.value += self.refillRate.value
            if self.capacity.value > self.refillRate.value:
              self.capacity.value = self.refillRate.value

    def value(self):
        with self.lock:
            return self.capacity.value

"""
This is a thread to refill the QoSCounter once every munite.
"""
def qosRefillThread(counter):
  while True:
      counter.refill()
      time.sleep(60)


"""
SQSWorker is a thread, and multiple thread shares the same QoSCounter for rate
limit.
"""
def SQSWorker(workerId, counter):
#  session = boto3.session.Session()
#  client = session.resource('sqs', region_name = 'us-east-1')
  while True:
    while counter.value() <= 0:
      time.sleep(1)
    counter.consume(1)
    print('Do a polling')
#   do the work here


"""
The main program starts here.
"""
counter = QoSCounter(50)
qos = multiprocessing.Process(target=qosRefillThread, args=(counter, ))
qos.start()
worker = multiprocessing.Process(target=SQSWorker, args=('Test', counter))
worker.start()
profile picture
专家
已回答 5 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则