并行调用具有QPS限制的API的一点经验

最近做了个小实体装置,通过调用某厂的API实现人脸检测,目标是在6s内检测60张照片。由于免费版API的QPS(Query Per Second,每秒询问数)限制为1,因此想要达到10以上的QPS,只能通过并行调用多个API来实现。此处整理一些具体的做法:

并行

python3最好引入 threading 模块来处理并行。这里可以用一个比较万金油的模块来并行:

1
2
3
4
5
6
7
8
import threading
class paraThread (threading.Thread):
def __init__(self, func, args):
threading.Thread.__init__(self)
self.func = func
self.args = args
def run(self):
self.func(*self.args)

然后假设你有一个需要并行的函数 myfunc(a, b, c) ,那么只需要

1
2
3
4
5
6
7
8
thread_num = 12 # 并行的线程数,如果是并行函数是CPU密集型,设为CPU线程数最好
thread_list = []
for i in range(thread_num):
thread_list.append(myThread(myfunc, (a, b, c)))
for i in range(thread_num):
thread_list[i].start() # 启动线程
for i in range(thread_num):
thread_list[i].join() # 同步操作,等待所有线程结束

限制QPS

我的实现里是每个线程绑定一个API,因为API具有QPS限制,如果这个线程两次请求API间隔小于1s,服务端不会响应第二次API请求,就出错了因此,需要在我们调用时就做好调用的QPS限制。

限制QPS用了个很直接的方法,就是如果两次调用的时间小于1s,就 time.sleep() 到足够1s:

1
2
3
4
5
6
7
import time
TIME = 0
for i in range(10):
if time.time() - TIME < 1:
time.sleep(max(0, 1 - (time.time() - TIME)))
TIME = time.time()
response = requests.post(url_add, data = data[i]) # 发送数据,调用API

但是有时侯还是会遇到调用失败,所以需要额外加一个失败重传:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import time
TIME = 0
for i in range(10):
while True:
try:
if time.time() - TIME < 1:
time.sleep(max(0, 1 - (time.time() - TIME)))
TIME = time.time()
response = requests.post(url_add, data = data[i]) # 发送数据,调用API
req_con = response.content.decode('utf-8') # 答复数据
req_dict = json.JSONDecoder().decode(req_con)
result = req_dict['result']
except KeyError: # 如果调用失败,答复数据为空,字典req_dict按键取值会出现KeyError
print('retrying...')
continue # 如果失败,重新开始
break # 如果成功,跳出循环

以上就是全部内容。