异步的魔法 — asyncio 源码剖析(1)

源码基于 Python 3.6.3 的 asyncio commit sha 为 2c5fed86e0cbba5a4e34792b0083128ce659909d

asyncio 主要做了一件事:协程的调度。协程是由 Python 提供的支持。调度则是通过 select/epoll 等系统调用结合自身的队列进行实现

本系列文章到此结束(笑)

asyncio 的代码实现有些复杂,很多实例都将自己委身于函数或者方法中,比如 loop。每个 TaskFuture 的实例都携带了 loop,然后又将自己想要执行的任务放到 loop 的队列中。还有 Future 实例注册 callback,然后将自己作为 callback 参数传入的情况。这种实现比较乱,而且有一个严重的后果,那就是循环引用。asyncio 代码中有些地方使用了 self = None 来对此进行规避。鉴于本文不是用来吐槽的,所以回归正题。先来形象地理解一下 asyncio

请想象面前有一条流水线,流水线旁有一个名为 select 的搬运工,负责向流水线上扔 coroutine。coroutine 是一个神奇的东西,你只需要对其进行充电,它便能进行某些内部操作,适当的情况下还会向外部发出信号(有几个指示灯)。更高级地是,它断电不会丢失工作状态,充电后便能恢复作业。由于目前的贫穷现状,只有一个电源放在了流水线的尽头。首先你将一个 coroutine (称为 tosaka 吧)放到上面,然后启动流水线,这个 coroutine 便移动到了流水线的尽头,流水线上的其他 coroutine 不被受理。突然 coroutine 的指示灯亮了,显示其需要等待外部的某个资源,请你先对其他的 coroutine 进行充电吧。coroutine 同时也通知了搬运工 select,如果有 xxx 出现,就将我再次放到流水线上去。整个流水线原则上是不允许插队的,但是因为某些 coroutine 的需求很紧急,那么它们可以插队。接着流水线便一个接一个的充电。没有 coroutine 怎么办,搬运工 select 会主动帮你停下流水线,为公司着想的青年。平稳的日常在搬运工 select 发现了 xxx 事件后被打破,它将 tosaka 重新放在流水线上,然后排队进行充电。tosaka 恢复执行,并且结束时要求我还需要执行一些东西,它叫 callback。真是一个任性的 coroutine

上面我模糊了一些概念,如 FutureTaskHandle 等。而且在下文中也不会具体的涉及具体实现,毕竟这是第一篇。接下来的内容便十分的枯燥了,下面是摘自 文档 的一个例子

import asyncio

async def compute(x, y):  
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):  
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()  
loop.run_until_complete(print_sum(1, 2))  
loop.close()  

如果这是一个同步的代码,那么只要这样即可(迷之突起)

print_sum  
  |
   \   compute
     \
      |
     /
   /
  |

但是在 asyncio 的异步下,执行流程如下图(两个迷之突起)

首先来看 asyncio.get_event_loop() 发生了什么

# events.py

# A TLS for the running event loop, used by _get_running_loop.
class _RunningLoop(threading.local):  
    loop_pid = (None, None)

_running_loop = _RunningLoop()

def _get_running_loop():  
    running_loop, pid = _running_loop.loop_pid
    if running_loop is not None and pid == os.getpid():
        return running_loop

def get_event_loop_policy():  
    # 模块级变量 _event_loop_policy 的默认值为 None
    # 可通过 set_event_loop_policy() 指定其值,比如你可以使用 uvloop
    if _event_loop_policy is None:
        # 初始化 _event_loop_policy,根据不同的 platform 有不同的值
        # 比如 Linux 下则是位于 unix_events.py 的 _UnixDefaultEventLoopPolicy 实例
        _init_event_loop_policy()
    return _event_loop_policy

def get_event_loop():  
    current_loop = _get_running_loop()
    if current_loop is not None:
        return current_loop
    return get_event_loop_policy().get_event_loop()  # 01

这部分代码主要是用于获取当前进程/线程中的 event_loop。当第一次调用时,会调用对应 policy 的 get_event_loop() 方法(即 01 处)。反正就是为了平台兼容做了一个策略模式,饶了一个弯

BaseDefaultEventLoopPolicy 位于 events.py  
|--------- _UnixDefaultEventLoopPolicy 位于 unix_events.py
|--------- _WindowsDefaultEventLoopPolicy 位于 windows_events.py

policy 实例的 get_event_loop() 方法实现位于基类 BaseDefaultEventLoopPolicy

# events.py
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):  
    """Default policy implementation for accessing the event loop.
    In this policy, each thread has its own event loop. However, we
    only automatically create an event loop by default for the main
    thread; other threads by default have no event loop.
    Other policies may have different rules (e.g. a single global
    event loop, or automatically creating an event loop per thread, or
    using some other notion of context to which an event loop is
    associated).
    """
    _loop_factory = None

    class _Local(threading.local):
        _loop = None
        _set_called = False

    def __init__(self):
        self._local = self._Local()

    def get_event_loop(self):
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):  # 01
            self.set_event_loop(self.new_event_loop())
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
        return self._local._loop

    def set_event_loop(self, loop):
        self._local._set_called = True
        assert loop is None or isinstance(loop, AbstractEventLoop)
        self._local._loop = loop

    def new_event_loop(self):
        return self._loop_factory()  # 02

01 处约束了当前线程为主线程的情况下才会设置 event_loop。注意这里的 set_event_loop() 等方法子类进行了 override,添加了 watcher。_loop_factory 则由子类进行实现

# unix_events.py
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):  
    _loop_factory = _UnixSelectorEventLoop
    # 省略

所以说 _UnixSelectorEventLoop 是 Linux 下 event_loop 的真正实现。因为考虑文章篇幅,所以这里不在展开

既然得到了 loop 对象,那么根据示例代码,接下来是调用了 loop 对象的 run_until_complete() 方法。这里会涉及 asyncio 的核心部分——事件循环。run_until_complete 的实现位于其基类 BaseSelectorEventLoop 的基类 BaseEventLoop

# base_events.py
class BaseEventLoop(events.AbstractEventLoop):

    def run_until_complete(self, future):
        # 如果 event_loop 已经 close 则会抛出 RuntimeError
        self._check_closed()

        # isfuture 的实现位于 base_futures.py 中
        # 是否为 future 取决于对象具有 _asyncio_future_blocking 且值非 None
        # 而不是简单的使用 isinstance,主要为了 duck typing
        new_task = not futures.isfuture(future)
        future = tasks.ensure_future(future, loop=self)  # 01
        if new_task:
            # An exception is raised if the future didn't complete, so there
            # is no need to log the "destroy pending task" message
            future._log_destroy_pending = False

        future.add_done_callback(_run_until_complete_cb)  # 02
        # 这里忽略了异常处理
        self.run_forever()
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
        return future.result()

01 处的代码实现

# tasks.py
def ensure_future(coro_or_future, *, loop=None):  
    # 已经是 future 的情况下不能再绑定不同的 loop
    if futures.isfuture(coro_or_future):
        if loop is not None and loop is not coro_or_future._loop:
            raise ValueError('loop argument must agree with Future')
        return coro_or_future
    elif coroutines.iscoroutine(coro_or_future):
        if loop is None:
            loop = events.get_event_loop()
        task = loop.create_task(coro_or_future)
        if task._source_traceback:
            del task._source_traceback[-1]
        return task
    elif compat.PY35 and inspect.isawaitable(coro_or_future):
        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    else:
        raise TypeError('An asyncio.Future, a coroutine or an awaitable is required')

@coroutine
def _wrap_awaitable(awaitable):  
    return (yield from awaitable.__await__())

我们可以发现使用 async def 定义的函数最终也要被 coroutine 装饰器装饰。另外,ensure_future() 函数对于参数为 futurecoroutine 表现的行为不同,换句话说这个函数有可能返回 Future 对象也有可能返回 Task 对象,不过这没有关系因为 TaskFuture 的子类。下面是 loop.create_task 的实现,调用的是基类的方法

# base_events.py
class BaseEventLoop(events.AbstractEventLoop):  
    def create_task(self, coro):
        self._check_closed()
        if self._task_factory is None:
            task = tasks.Task(coro, loop=self)
            if task._source_traceback:
                del task._source_traceback[-1]
        else:
            task = self._task_factory(self, coro)
        return task

可以看到一个 Task 包含了 loop 自身。这里就不展开 Future/Task 的实现了。不过你需要知道可以通过 task_obj._loop 来获取其所绑定的 loop

Task__init__() 方法中,调用了 loop 对象的 call_soon() 方法,传入了自身了 _step() 方法。loop.call_soon() 则会将其包裹成了 handle 对象,然后加入了 loop._ready 中。loop._ready 实际上是一个双端队列 deque。这里不展开了

02 处的 add_done_callback() 所注册的回调函数 _run_until_complete_cb 依赖了 _loop,完成了 loop 的 stop。根据函数签名我们也可以看出回调可以以 future 作为参数

# base_events.py
def _run_until_complete_cb(fut):  
    # 省略
    fut._loop.stop()  # loop_obj._stopping = True

有了这一点我们也能够明白为什么 run_until_complete 实际上调用的是 run_forever

# base_events.py
class BaseEventLoop(events.AbstractEventLoop):  
    def run_forever(self):
        # 省略大量代码
        events._set_running_loop(self)
        while True:
            self._run_once()
            if self._stopping:
                break
        events._set_running_loop(None)

run_forever 方法设置了 _running_loop 并且一次一次的调用 _run_once() 方法。_run_once() 完成了核心的任务调度,即通过不断的循环调用 select/epoll 等系统调用,得出被唤醒的 fd 然后将对应的 handler 放入队列中。那么你肯定会问,我们的示例代码中并没有 fd 啊。没有 fd 便会返回一个空的 event_list。也可以通过其他方式进行入队,比如之前的 call_soon。下面是 _run_once() 方法的实现:

# base_events.py
class BaseEventLoop(events.AbstractEventLoop):  
    def _run_once(self):
        """Run one full iteration of the event loop.
        This calls all currently ready callbacks, polls for I/O,
        schedules the resulting callbacks, and finally schedules
        'call_later' callbacks.
        """
        # 代码有删改
        sched_count = len(self._scheduled)
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count >
                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            # Remove delayed calls that were cancelled if their number is too high
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False

        timeout = None
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            when = self._scheduled[0]._when
            timeout = max(0, when - self.time())

        event_list = self._selector.select(timeout)
        # 会将任务添加到 _ready 队列中
        self._process_events(event_list)

        # Handle 'later' callbacks that are ready.
        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)

        # This is the only place where callbacks are actually *called*.
        # All other places just add them to ready.
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            handle._run()
        handle = None  # Needed to break cycles when an exception occurs.

_ready 是一个很重要的队列,所有的得到调度(将要执行)的 Task 都会被放到这里,然后依次执行。对于我们的示例代码说,第一次 _ready 队列中只有一个任务,那便是包裹了 Task._step() 方法的 Handle 对象实例。handle._run() 会去调用 Task._step() 方法。其主要是调用了 result = coro.send(None)coro 对象即为我们的被装饰为 coroutineprint_sum

# tasks.py
class Task(futures.Future):  
    def _step(self, exc=None):
        coro = self._coro
        try:
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                self.set_exception(futures.CancelledError())
            else:
                self.set_result(exc.value)
        # 省略大量代码

那么 result 是什么呢?需要查看一下 sleep() 的源代码

# tasks.py
@coroutine
def sleep(delay, result=None, *, loop=None):  
    """Coroutine that completes after a given time (in seconds)."""
    if delay == 0:
        yield
        return result

    if loop is None:
        loop = events.get_event_loop()
    future = loop.create_future()
    h = future._loop.call_later(delay, futures._set_result_unless_cancelled,
                                future, result)
    try:
        return (yield from future)
    finally:
        h.cancel()

它生成了一个 Future 对象,并调用了 loopcall_later 方法,在 1 秒之后调用 futures._set_result_unless_cancelled。它会为 future 赋值 resultcall_later 内部调用了 loopcall_at() 方法,会在 loop_scheduled 队列中放置一个包裹了回调函数 futures._set_result_unless_cancelled 的特殊 Handle——TimerHandle 对象。注意这里的 yield from future 会调用 future 对象的 __iter__ 方法

# futures.py
class Future:  
    def __iter__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        return self.result() # May raise too.

可以看到如果状态非 DONE 则会 yield 自身。_step() 方法会对得到的这个 future 添加 self._wakeup 这个回调函数(上面代码略去了这一部分)

接着便再次回到事件循环 _run_once 当中,此时将要执行的便是 TimerHandle_run() 方法,触发回调 futures._set_result_unless_cancelledfuture 设置结果并将其状态置为 _FINISHED。这里会对此 future 的所有 callback 进行调度

# futures.py
def _set_result_unless_cancelled(fut, result):  
    """Helper setting the result only if the future was not cancelled."""
    if fut.cancelled():
        return
    fut.set_result(result)

class Future:  
    def _schedule_callbacks(self):
        callbacks = self._callbacks[:]
        if not callbacks:
            return
        self._callbacks[:] = []
        for callback in callbacks:
            self._loop.call_soon(callback, self)  # 将 future 自身传入

具体而言是将我们刚才在 _step() 中注册的 _wakeup() 方法通过 call_soon() 进行调度。所以当我们再次回到事件循环时我们会执行 Task._wakup(),可以简单地理解这个方法再度调用了 _step()。此时再次激活了我们的 coroutine print_sum。对于示例代码 coroutine 而言,现在已经 exhausted,会抛出 StopIteration 异常。_step() 方法捕捉到此一样后会执行 self.set_result(exc.value),进而执行 _schedule_callbacks() 方法将自身的 callback _run_until_complete_cb() 进行调度。正如我们之前所分析的,loop 对象的 close() 被调用,run_forever 被结束,接着程序结束