Python3异步asyncio问题
官方文档:
https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.run
看了一大堆相关的资料和教程,针对的Python版本不同,写法也各不一致,翻了翻官方的文档,发现其实越高版本的Python对异步进行封装的越方便,官方说法叫高层级API,甚至都不用去理解什么Future\task\loop之类的概念了,我现在用的是Python 3.7.5,可以这样很简单的实现阻塞等待\异步并发:如果没有复杂需求的话,用高层级API就可以了。
如果涉及到回调的话貌似还得用低层级的API,后面单独记录。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
import asyncio
async def first():
print('first函数调用开始,下面将会等待3秒模拟函数运行完毕')
await asyncio.sleep(3)
print('first函数执行完毕')
async def last():
print('last函数调用开始')
await asyncio.sleep(2)
print('last函数执行完毕')
async def func(delay):
print('开始异步同时执行的函数+延迟: ' + str(delay))
await asyncio.sleep(delay)
print('--异步函数执行完毕+延迟: ' + str(delay))
async def main():
await first() # 这里先调用first()函数,并且等它执行完了才会开始
await asyncio.gather(
func(1),
func(2),
func(3)
)
await last()
asyncio.run(main())
|
上面代码实际执行的过程是:
- 开始执行first()函数
- 等first()执行完毕后开始并发执行下面gather中加入的
- 三个函数(任务)三个函数全部并发执行完毕后执行last()
官方文档中给的建议是只创建一个主入口的main()函数(当然这个函数名可以自定义的),将要调用的其他函数都放在这个函数中,然后再使用asyncio.run()启动,理想情况下应当只被调用一次.
上图:
更新
上面所谓的高阶层API用法最后一行asyncio.run(main())和下面使用低阶层API实现效果是一样的:
1
2
3
|
loop = asyncio.get_event_loop()
task = loop.create_task(main())
loop.run_until_complete(task)
|
下面是学习过程中记录的偏低层实现的资料
最基本的定义和应用
1
2
3
4
5
6
7
8
9
10
11
|
import asyncio
# 定义一个可以异步调用的函数,其类型为coroutine
async def func1():
pass
if __name__ == '__main__':
loop = asyncio.get_event_loop() # 定义一个用来循环异步函数的loop对象
task = asyncio.ensure_future(func1()) # 创建一个调用异步函数的任务,task类型为future
# task = loop.create_task(func1()) # 使用loop的.create_task()创建任务,效果一样
loop.run_until_complete(task) # 开始在loop循环中执行异步函数,直到该函数运行完毕
|
asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task,run_until_complete的参数是一个futrue对象。
当传入一个协程,其内部会自动封装成task,task是Future的子类。
isinstance(task, asyncio.Future)将会输出True。
future类型的任务可以在loop.run_until_complete中执行,也可以直接用await+任务变量名阻塞?调用
什么时候使用异步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import asyncio
# 这是一个耗时很少的异步函数
async def msg(text):
await asyncio.sleep(0.1)
print(text)
# 这是一个耗时较长的异步函数
async def long_operation():
print('long_operation started')
await asyncio.sleep(3)
print('long_operation finished')
# 主函数部分,同样需要声明为async异步类型
async def main():
await msg('first')
# 现在需要调用一个耗时较长的函数操作,不希望阻塞的等待它执行完毕
# 希望long_operation在开始执行后,立即调用msg,这里就可以将long_operation封装到task任务中
task = asyncio.ensure_future(long_operation())
await msg('second')
# 开始task中的long_operation函数
await task
# task执行完毕后会继续下面的代码
print('All done.')
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
|
运行结果:
并发和并行
并发和并行一直是容易混淆的概念。并发通常指有多个任务需要同时进行,并行则是同一时刻有多个任务执行。
用上课来举例就是,并发情况下是一个老师在同一时间段辅助不同的人功课。
并行则是好几个老师分别同时辅助多个学生功课。
简而言之就是一个人同时吃三个馒头还是三个人同时分别吃一个的情况,吃一个馒头算一个任务。
asyncio实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。
创建多个协程的列表,然后将这些协程注册到事件循环中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
start = now()
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task ret: ', task.result())
print('TIME: ', now() - start)
|
结果如下
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
TIME: 4.003541946411133
总时间为4s左右。4s的阻塞时间,足够前面两个协程执行完毕。如果是同步顺序的任务,那么至少需要7s。此时我们使用了aysncio实现了并发。
asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一个task列表,后者接收一堆task。
异步结果回调
找了个别人写的例子,大致理解了下实现过程:
- 定义async异步函数
- 定义普通函数用于处理回调
- 获取异步函数的coroutine协程对象(其实就是不带await修饰直接执行异步函数返回的那个对象)
- 获取loop循环对象
- 使用低阶层API手动创建task任务
- 为task任务对象注册回调函数
- 启动loop循环调用异步函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print('Waiting: ', x)
return 'Done after {}s'.format(x)
def callback(future):
print('Callback: ', future.result())
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
#task = asyncio.ensure_future(coroutine) # 貌似和上面用loop创建任务效果一样
task.add_done_callback(callback)
loop.run_until_complete(task)
print('TIME: ', now() - start)
|
这里需要注意,在使用低层级API手动创建异步任务的时候,不能同时使用高层级API的简单操作了,比如这里创建task任务对象的时候,就不能用asyncio.create_task(),否则会找不到loop对象,返回下面的错误
RuntimeError: no running event loop
翻了一下asyncio\tasks.py源代码,原来所谓的高层级API就是这么简单的封装啊…
调用的时候在函数内部又定义了一遍loop
1
2
3
4
5
6
7
|
def create_task(coro):
"""Schedule the execution of a coroutine object in a spawn task.
Return a Task object.
"""
loop = events.get_running_loop()
return loop.create_task(coro)
|
我自己写的例子
创建4个异步任务同时开始执行,每个任务执行完成后将结果追加到result_list数组变量中.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import asyncio
result_list = []
async def fun(var):
return var + 1
def callbackFun(future):
result_list.append(future.result())
task_list = []
for i in range(1, 5):
cor = fun(i)
task = asyncio.ensure_future(cor)
task.add_done_callback(callbackFun)
task_list.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task_list))
print(result_list)
|
当然,如果不需要使用回调函数,而是等所有提交的异步任务都执行完成后获取它们的结果,可以这样写:
1
2
3
4
5
|
# 前面省略
loop.run_until_complete(asyncio.wait(task_list))
# task_list中的每个任务执行完成后可以调用它的.result()方法来获取结果
for task in task_list:
print('每个task执行完的结果:', task.result())
|