如何在不使用While循环进行检查的情况下检查在另一个调度线程中共享和编辑的变量状态是否已更改
问题描述
我的API是在900ms内接收用户的文本,并将其发送到模型计算其长度(仅用于简单的演示)。我已经意识到了,但方式很难看。我将打开一个新的后台计划线程。API在主线程中接收到查询,将其放入由主线程和新线程共享的队列中。而新的线程将调度获取队列中的所有文本并将它们发送到模型。在模型计算它们之后,结果被存储在共享词典中。在主线程中,Get_Response方法会使用一个While循环来检查共享判定中的结果,我的问题是如何摆脱Get_Response方法中的While循环。我想要一个优雅的方法。谢谢!
这是服务器代码,需要在Get-Response中删除睡眠时间,因为它很难看:
import asyncio
import uuid
from typing import Union, List
import threading
from queue import Queue
from fastapi import FastAPI, Request, Body, APIRouter
from fastapi_utils.tasks import repeat_every
import uvicorn
import time
import logging
import datetime
logger = logging.getLogger(__name__)
app = APIRouter()
def feed_data_into_model(queue,shared_dict,lock):
if queue.qsize() != 0:
data = []
ids = []
while queue.qsize() != 0:
task = queue.get()
task_id = task[0]
ids.append(task_id)
text = task[1]
data.append(text)
result = model_work(data)
# print("model result:",result)
for index,task_id in enumerate(ids):
value = result[index]
handle_dict(task_id,value,action = "put",lock=lock, shared_dict = shared_dict)
class TestThreading(object):
def __init__(self, interval, queue,shared_dict,lock):
self.interval = interval
thread = threading.Thread(target=self.run, args=(queue,shared_dict,lock))
thread.daemon = True
thread.start()
def run(self,queue,shared_dict,lock):
while True:
# More statements comes here
# print(datetime.datetime.now().__str__() + ' : Start task in the background')
feed_data_into_model(queue,shared_dict,lock)
time.sleep(self.interval)
if __name__ != "__main__":
# since uvicorn will init and reload the file, and __name__ will change, not as __main__, so I init variable here
# otherwise, we will have 2 background thread (one is empty) , it doesn't run but hard to debug due to the confusion
global queue, shared_dict, lock
queue = Queue(maxsize=64) #
shared_dict = {} # model result saved here!
lock = threading.Lock()
tr = TestThreading(0.9, queue,shared_dict,lock)
def handle_dict(key, value = None, action = "put", lock = None, shared_dict = None):
lock.acquire()
try:
if action == "put":
shared_dict[key] = value
elif action == "delete":
del shared_dict[key]
elif action == "get":
value = shared_dict[key]
elif action == "exist":
value = key in shared_dict
else:
pass
finally:
# Always called, even if exception is raised in try block
lock.release()
return value
def model_work(x:Union[str,List[str]]):
time.sleep(3)
if isinstance(x,str):
result = [len(x)]
else:
result = [len(_) for _ in x]
return result
async def get_response(task_id, lock, shared_dict):
not_exist_flag = True
while not_exist_flag:
not_exist_flag = handle_dict(task_id, None, action= "exist",lock=lock, shared_dict = shared_dict) is False
await asyncio.sleep(0.02)
value = handle_dict(task_id, None, action= "get", lock=lock, shared_dict = shared_dict)
handle_dict(task_id, None, action= "delete",lock=lock, shared_dict = shared_dict)
return value
@app.get("/{text}")
async def demo(text:str):
global queue, shared_dict, lock
task_id = str(uuid.uuid4())
logger.info(task_id)
state = "pending"
item= [task_id,text,state,""]
queue.put(item)
# TODO: await query_from_answer_dict , need to change since it's ugly to while wait the answer
value = await get_response(task_id, lock, shared_dict)
return 1
if __name__ == "__main__":
# what I want to do:
# single process run every 900ms, if queue is not empty then pop them out to model
# and model will save result in thread-safe dict, key is task-id
uvicorn.run("api:app", host="0.0.0.0", port=5555)
客户端代码:
for n in {1..5}; do curl http://localhost:5555/a & ; done
解决方案
在Asyncio代码中运行阻塞任务的通常方法是使用Ayncio的内置run_in_executor
为您处理IF。您可以设置一个执行器,也可以让它为您执行此操作:
import asyncio
from time import sleep
def proc(t):
print("in thread")
sleep(t)
return f"Slept for {t} seconds"
async def submit_task(t):
print("submitting:", t)
res = await loop.run_in_executor(None, proc, t)
print("got:", res)
async def other_task():
for _ in range(4):
print("poll!")
await asyncio.sleep(1)
loop = asyncio.new_event_loop()
loop.create_task(other_task())
loop.run_until_complete(submit_task(3))
请注意,如果loop
不是全局定义的,则可以使用asyncio.get_event_loop()
在函数内部获取它。我特意使用了一个不带fast api/uvicorn的简单示例来说明这一点,但其思想是相同的:fast api(等)只在事件循环中运行,这就是您为端点定义协程的原因。
这样做的好处是,我们可以直接等待响应,而不需要费力地等待事件,然后使用一些其他方法(与互斥、管道、队列等共享判决)来获得结果,这保持了代码的整洁和可读性,而且速度可能要快得多。如果出于某种原因,我们希望确保它在进程而不是线程中运行,我们可以创建自己的执行器:
from concurrent.futures import ProcessPoolExecutor
e = ProcessPoolExecutor()
...
res = await loop.run_in_executor(e, proc, t)
有关详细信息,请参阅the docs。
另一种选择是使用multiprocessing.Pool
运行任务,然后使用apply_async
。但你不能直接等待多重处理期货。有一个库aiomultiprocessing
可以让这两个库一起使用,但我没有使用它的经验,也看不出有什么理由在这种情况下更喜欢它而不是内置的执行器(每次调用coro都要运行一个后台任务)。
最后请注意,避免轮询While循环的主要原因不是因为它很难看(尽管它确实很难看),而是它的性能几乎没有任何其他解决方案那么好。
相关文章