1 回答
- 最新
- 投票最多
- 评论最多
0
【以下的回答经过翻译处理】 需要在消费者中控制轮询速率。通过令牌发放机制来实现。也就是说,在主应用程序中启动两个线程:一个线程产生令牌(每分钟50个令牌),另一个线程只在令牌可用时执行轮询。如果您在ReceiveMessage API调用中将MaxNumberOfMessages设置为1(这是默认值),则此令牌发放机制可确保您在某段时间内最多执行所需数量的轮询。
以下是一个用Python演示概念的示例。这里QoSCounter和qosRefillThread的组合形成了一个令牌发放机。SQSWorker执行实际工作,只有在从令牌发放机中获取到令牌时才执行工作。
import time
import boto3
import multiprocessing
"""
QoSCounter is a LeakyBucket QoS algorithm. Each sub-process can not do any polling
unless the QoSCounter is greater than 0. The main process needs to start a separate
process to call the refill() method every minute to refill the LeakyBucket.
"""
class QoSCounter(object):
def __init__(self, value=0):
"""
RawValue because we don't need it to create a Lock:
"""
self.capacity = multiprocessing.RawValue('i', value)
self.refillRate = multiprocessing.RawValue('i', value)
self.lock = multiprocessing.Lock()
def consume(self, value=0):
with self.lock:
self.capacity.value -= value
def refill(self):
"""
Here we assume limit capacity for the LeakyBucket to avoid burst traffic.
"""
with self.lock:
self.capacity.value += self.refillRate.value
if self.capacity.value > self.refillRate.value:
self.capacity.value = self.refillRate.value
def value(self):
with self.lock:
return self.capacity.value
"""
This is a thread to refill the QoSCounter once every munite.
"""
def qosRefillThread(counter):
while True:
counter.refill()
time.sleep(60)
"""
SQSWorker is a thread, and multiple thread shares the same QoSCounter for rate
limit.
"""
def SQSWorker(workerId, counter):
# session = boto3.session.Session()
# client = session.resource('sqs', region_name = 'us-east-1')
while True:
while counter.value() <= 0:
time.sleep(1)
counter.consume(1)
print('Do a polling')
# do the work here
"""
The main program starts here.
"""
counter = QoSCounter(50)
qos = multiprocessing.Process(target=qosRefillThread, args=(counter, ))
qos.start()
worker = multiprocessing.Process(target=SQSWorker, args=('Test', counter))
worker.start()