在大部分的高级语言中都有回调函数,这里我们看下asyncio中的的函数回调。
成功回调
可以给Task(Future)添加回调函数,等Task完成后就会自动调用这个(些)回调:
async def a():
await asyncio.sleep(1)
return 'A'
In : loop = asyncio.get_event_loop()
In : task = loop.create_task(a())
In : def callback(future):
...: print(f'Result: {future.result()}')
...:
In : task.add_done_callback(callback)
In : await task
Result: A
Out: 'A'
可以看到在任务完成后执行了callback函数。我这里顺便解释一个问题,不知道有没有人注意到。
为什么之前一直推荐大家用asyncio.create_task,但是很多例子却用了loop.create_task?
这是因为在IPython里面支持方便的使用await执行协程,但如果直接用asyncio.create_task会报「no running event loop」:
In : asyncio.create_task(a())
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-2-2a742a8da161> in <module>
----> 1 asyncio.create_task(a())
/usr/local/lib/python3.7/asyncio/tasks.py in create_task(coro)
322 Return a Task object.
323 """
--> 324 loop = events.get_running_loop()
325 return loop.create_task(coro)
326
RuntimeError: no running event loop
Eventloop是在单进程里面的单线程中的,在IPython里面await的时候会把协程注册到一个线程的Eventloop上,但是REPL环境是另外一个线程,不是一个线程,所以会提示这个错误,即便asyncio.events._set_running_loop(loop)设置了loop,任务可以创建倒是不能await:因为task是在线程X的Eventloop上注册的,但是await时却到线程Y的Eventloop上去执行。这部分是C实现的,可以看延伸阅读链接1。
所以现在你就会看到很多loop.create_task的代码片段,别担心,在代码项目里面都是用asyncio.create_task的,如果你非常想要在IPython里面使用asyncio.create_task也不是没有办法,可以这样做:
In : loop = asyncio.get_event_loop()
In : def loop_runner(coro):
...: asyncio.events._set_running_loop(None)
...: loop.run_until_complete(coro)
...: asyncio.events._set_running_loop(loop)
...:
In : %autoawait loop_runner
In : asyncio.events._set_running_loop(loop)
In : task = asyncio.create_task(a())
In : await task
Out: 'A'
这样就可以啦。我解释下为什么:
IPython里面能运行await是由于loop_runner函数,这个函数能运行协程(延伸阅读链接2),默认的效果大概是asyncio.get_event_loop().run_until_complete(coro)。为了让asyncio.create_task正常运行我定义了新的loop_runner
通过autoawait这个magic函数就可以重新设置loop_runner
上面的报错是「no running event loop」,所以通过events._set_running_loop(loop)设置一个正在运行的loop,但是在默认的loop_runner中也无法运行,会报「Cannot run the event loop while another loop is running」,所以重置await里面那个running的loop,运行结束再设置回去。
如果你觉得有必要,可以在IPython配置文件中设置这个loop_runner到c.InteractiveShell.loop_runner上~
好,我们说回来,add_done_callback方法也是支持参数的,但是需要用到functools.partial:
def callback2(future, n):
print(f'Result: {future.result()}, N: {n}')
In : task = loop.create_task(a())
In : task.add_done_callback(partial(callback2, n=1))
In : await task
Result: A, N: 1
Out: 'A'
调度回调
asyncio提供了3个按需回调的方法,都在Eventloop对象上,而且也支持参数:
call_soon
在下一次事件循环中被回调,回调是按其注册顺序被调用的:
def mark_done(future, result):
print(f'Set to: {result}')
future.set_result(result)
async def b1():
loop = asyncio.get_event_loop()
fut = asyncio.Future()
loop.call_soon(mark_done, fut, 'the result')
loop.call_soon(partial(print, 'Hello', flush=True))
loop.call_soon(partial(print, 'Greeting', flush=True))
print(f'Done: {fut.done()}')
await asyncio.sleep(0)
print(f'Done: {fut.done()}, Result: {fut.result()}')
In : await b1()
Done: False
Set to: the result
Hello
Greeting
Done: True, Result: the result
这个例子输出的比较复杂,我挨个分析:
call_soon可以用来设置任务的结果: 在mark_done里面设置
通过2个print可以感受到call_soon支持参数。
最重要的就是输出部分了,首先fut.done()的结果是False,因为还没到下个事件循环,sleep(0)就可以切到下次循环,这样就会调用三个call_soon回调,最后再看fut.done()的结果就是True,而且fut.result()可以拿到之前在mark_done设置的值了
call_later
安排回调在给定的时间(单位秒)后执行:
async def b2():
loop = asyncio.get_event_loop()
fut = asyncio.Future()
loop.call_later(2, mark_done, fut, 'the result')
loop.call_later(1, partial(print, 'Hello'))
loop.call_later(1, partial(print, 'Greeting'))
print(f'Done: {fut.done()}')
await asyncio.sleep(2)
print(f'Done: {fut.done()}, Result: {fut.result()}')
In : await b2()
Done: False
Hello
Greeting
Set to: the result
Done: True, Result: the result
这次要注意3个回调的延迟时间时间要<=sleep的,要不然还没来及的回调程序就结束了
call_at
安排回调在给定的时间执行,注意这个时间要基于 loop.time() 获取当前时间:
async def b3():
loop = asyncio.get_event_loop()
now = loop.time()
fut = asyncio.Future()
loop.call_at(now + 2, mark_done, fut, 'the result')
loop.call_at(now + 1, partial(print, 'Hello', flush=True))
loop.call_at(now + 1, partial(print, 'Greeting', flush=True))
print(f'Done: {fut.done()}')
await asyncio.sleep(2)
print(f'Done: {fut.done()}, Result: {fut.result()}')
In : await b3()
Done: False
Hello
Greeting
Set to: the result
Done: True, Result: the result
下一节:深究Python中的asyncio库-线程同步