Watcher — asyncio 源码剖析(8)

源码基于 Python 3.6.3 的 asyncio commit sha 为 2c5fed86e0cbba5a4e34792b0083128ce659909d

在 Linux 平台上 asyncio 提供了一组 watcher 来监控子进程。具体作用是接收子进程的信号,然后调度对应的 callback

默认情况下,当我们调用 asyncio.create_subprocess_exec()asyncio.create_subprocess_shell() 会自动创建 watcher

# subprocess.py
@coroutine
def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,  
                            loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
    if loop is None:
        loop = events.get_event_loop()
    protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
                                                        loop=loop)
    transport, protocol = yield from loop.subprocess_shell(
                                            protocol_factory,
                                            cmd, stdin=stdin, stdout=stdout,
                                            stderr=stderr, **kwds)
    return Process(transport, protocol, loop)

loop.subprocess_shell 位于基类 BaseEventLoop

# base_events.py
class BaseEventLoop(events.AbstractEventLoop):  
    @coroutine
    def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                         universal_newlines=False, shell=True, bufsize=0,
                         **kwargs):
        # 有省略
        protocol = protocol_factory()
        transport = yield from self._make_subprocess_transport(
            protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
        return transport, protocol
# unix_events.py
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):  
    @coroutine
    def _make_subprocess_transport(self, protocol, args, shell,
                                   stdin, stdout, stderr, bufsize,
                                   extra=None, **kwargs):
        with events.get_child_watcher() as watcher:
            waiter = self.create_future()
            transp = _UnixSubprocessTransport(self, protocol, args, shell,
                                              stdin, stdout, stderr, bufsize,
                                              waiter=waiter, extra=extra,
                                              **kwargs)

            watcher.add_child_handler(transp.get_pid(),
                                      self._child_watcher_callback, transp)
            try:
                # _UnixSubprocessTransport 中创建了一个 _connect_pipes 的 task
                # 当此 task 执行完后 waiter 会被 set_result()
                yield from waiter
            except Exception as exc:
                # Workaround CPython bug #23353: using yield/yield-from in an
                # except block of a generator doesn't clear properly
                # sys.exc_info()
                err = exc
            else:
                err = None

            if err is not None:
                transp.close()
                yield from transp._wait()
                raise err
        return transp
# events.py
def get_child_watcher():  
    return get_event_loop_policy().get_child_watcher()

使用了策略模式,调用相应平台的 get_child_watcher() 实现。Linux 下为 _UnixDefaultEventLoopPolicy

# unix_events.py
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):  
    def _init_watcher(self):
        with events._lock:
            if self._watcher is None:
                self._watcher = SafeChildWatcher()
                if isinstance(threading.current_thread(),
                              threading._MainThread):
                    self._watcher.attach_loop(self._local._loop)

    def get_child_watcher(self):
        # 默认情况下为 None,_init_watcher() 会初始化一个 SafeChildWatcher 实例
        # 可以通过 set_child_watcher 进行 watcher 的设置
        # 一个 Policy 实例只会有一个 watcher 实例
        if self._watcher is None:
            self._init_watcher()
        return self._watcher

下面看一下 watcher 的具体实现,位于 unix_events.py

# watcher hierarchy
AbstractChildWatcher  
 +-- BaseChildWatcher
     +-- SafeChildWatcher
     +-- FastChildWatcher

AbstractChildWatcher 中都是抽象方法,所以先来看一下 BaseChildWatcher 的实现

# unix_events.py
class BaseChildWatcher(AbstractChildWatcher):  
    def __init__(self):
        self._loop = None
        self._callbacks = {}

    def attach_loop(self, loop):
        if self._loop is not None and loop is None and self._callbacks:
            warnings.warn(
                'A loop is being detached '
                'from a child watcher with pending handlers',
                RuntimeWarning)
        # 如果之前绑定过 loop 则取消已经注册的 handler,然后绑定新的 loop
        if self._loop is not None:
            self._loop.remove_signal_handler(signal.SIGCHLD)
        self._loop = loop
        if loop is not None:
            loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
            # Prevent a race condition in case a child terminated
            # during the switch.
            self._do_waitpid_all()

    def _sig_chld(self):
        try:
            self._do_waitpid_all()
        except Exception as exc:
            # self._loop should always be available here
            # as '_sig_chld' is added as a signal handler
            # in 'attach_loop'
            self._loop.call_exception_handler({
                'message': 'Unknown exception in SIGCHLD handler',
                'exception': exc,
            })
    # 省略了几个方法

这里调用了 loop.add_signal_handlersignal.SIGCHLD 信号注册了对应的 handler _sig_chld。其工作原理在 Thread — asyncio 源码剖析(3) 中已经提及。就是通过一对 socketpair,一端在 select 中进行注册,监听事件;另一端由 signal.set_wakeup_fd 绑定。当有信号来临时,会写入对应的 signum,然后经由 select 调度对应的 handler(具体步骤为 loop._read_from_self() -> loop._process_self_data() -> loop._handle_signal())。signumhandler 以字典的形式存储在 loop._signal_handlers 属性中

_do_waitpid_all() 由子类进行实现

# unix_events.py
class SafeChildWatcher(BaseChildWatcher):  
    def close(self):
        self._callbacks.clear()
        super().close()

    def __enter__(self):
        return self

    def __exit__(self, a, b, c):
        pass

    def add_child_handler(self, pid, callback, *args):
        if self._loop is None:
            raise RuntimeError("Cannot add child handler, "
                "the child watcher does not have a loop attached")

        self._callbacks[pid] = (callback, args)

        # Prevent a race condition in case the child is already terminated.
        self._do_waitpid(pid)

    def remove_child_handler(self, pid):
        try:
            del self._callbacks[pid]
            return True
        except KeyError:
            return False

    def _do_waitpid_all(self):
        for pid in list(self._callbacks):
            self._do_waitpid(pid)

    def _do_waitpid(self, expected_pid):
        try:
            # 等待 pid 的进程结束,指定 os.WNOHANG 则不会产生阻塞
            pid, status = os.waitpid(expected_pid, os.WNOHANG)
        except ChildProcessError:
            # The child process is already reaped
            # (may happen if waitpid() is called elsewhere).
            pid = expected_pid
            returncode = 255
            logger.warning(
                "Unknown child process pid %d, will report returncode 255", pid)
        else:
            if pid == 0:
                # The child process is still alive.
                return

            returncode = self._compute_returncode(status)
            if self._loop.get_debug():
                logger.debug('process %s exited with returncode %s',
                             expected_pid, returncode)

        try:
            callback, args = self._callbacks.pop(pid)
        except KeyError:
            # May happen if .remove_child_handler() is called
            # after os.waitpid() returns.
            if self._loop.get_debug():
                logger.warning("Child watcher got an unexpected pid: %r",
                               pid, exc_info=True)
        else:
            callback(pid, returncode, *args)

再来看一下先前注册的 callback

# unix_events.py
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):  
    def _child_watcher_callback(self, pid, returncode, transp):
        self.call_soon_threadsafe(transp._process_exited, returncode)

# base_subprocess.py
class BaseSubprocessTransport(transports.SubprocessTransport):  
    def _process_exited(self, returncode):
        # 此处有省略
        self._returncode = returncode
        if self._proc.returncode is None:
            # asyncio uses a child watcher: copy the status into the Popen
            # object. On Python 3.6, it is required to avoid a ResourceWarning.
            self._proc.returncode = returncode
        self._call(self._protocol.process_exited)
        self._try_finish()

        # wake up futures waiting for wait()
        for waiter in self._exit_waiters:
            if not waiter.cancelled():
                waiter.set_result(returncode)
        self._exit_waiters = None

之所以这样是因为 asyncio.Process.wait 并不是一个真正的阻塞操作。当我们调用 wait() 仅是去创建了一个 future,如果此 future 完成则说明子进程已经结束,才会再度调度这个 coroutine。也就说不被调度即视为阻塞。

# subprocess.py
class Process:  
    @coroutine
    def wait(self):
        """Wait until the process exit and return the process return code.
        This method is a coroutine."""
        return (yield from self._transport._wait())
# base_subprocess.py
class BaseSubprocessTransport(transports.SubprocessTransport):  
    @coroutine
    def _wait(self):
        """Wait until the process exit and return the process return code.
        This method is a coroutine."""
        if self._returncode is not None:
            return self._returncode

        waiter = self._loop.create_future()
        self._exit_waiters.append(waiter)
        return (yield from waiter)

另外还有一个 FastChildWatcher

class FastChildWatcher(BaseChildWatcher):  
    def __init__(self):
        super().__init__()
        self._lock = threading.Lock()
        self._zombies = {}
        self._forks = 0

    def close(self):
        self._callbacks.clear()
        self._zombies.clear()
        super().close()

    def __enter__(self):
        with self._lock:
            self._forks += 1

            return self

    def __exit__(self, a, b, c):
        with self._lock:
            self._forks -= 1

            if self._forks or not self._zombies:
                return

            collateral_victims = str(self._zombies)
            self._zombies.clear()

        logger.warning(
            "Caught subprocesses termination from unknown pids: %s",
            collateral_victims)

    def add_child_handler(self, pid, callback, *args):
        assert self._forks, "Must use the context manager"

        if self._loop is None:
            raise RuntimeError(
                "Cannot add child handler, "
                "the child watcher does not have a loop attached")

        with self._lock:
            try:
                returncode = self._zombies.pop(pid)
            except KeyError:
                # The child is running.
                self._callbacks[pid] = callback, args
                return

        # The child is dead already. We can fire the callback.
        callback(pid, returncode, *args)

    def remove_child_handler(self, pid):
        try:
            del self._callbacks[pid]
            return True
        except KeyError:
            return False

    def _do_waitpid_all(self):
        # Because of signal coalescing, we must keep calling waitpid() as
        # long as we're able to reap a child.
        while True:
            try:
                pid, status = os.waitpid(-1, os.WNOHANG)
            except ChildProcessError:
                # No more child processes exist.
                return
            else:
                if pid == 0:
                    # A child process is still alive.
                    return

                returncode = self._compute_returncode(status)

            with self._lock:
                try:
                    callback, args = self._callbacks.pop(pid)
                except KeyError:
                    # unknown child
                    if self._forks:
                        # It may not be registered yet.
                        self._zombies[pid] = returncode
                        if self._loop.get_debug():
                            logger.debug('unknown process %s exited '
                                         'with returncode %s',
                                         pid, returncode)
                        continue
                    callback = None
                else:
                    if self._loop.get_debug():
                        logger.debug('process %s exited with returncode %s',
                                     pid, returncode)

            if callback is None:
                logger.warning(
                    "Caught subprocess termination from unknown pid: "
                    "%d -> %d", pid, returncode)
            else:
                callback(pid, returncode, *args)

FastChildWatcher 相较于 SafeChildWatcher 使用了一个循环去收割所有已经退出的子进程