Gunicorn 源码分析 II - Worker

Gunicorn 19.7.1 版本源码,commit sha 为 328e509260ae70de6c04c5ba885ee17960b3ced5

先来看 worker 是如何产生的

# gunicorn/arbiter.py
class Arbiter(object):  
    def spawn_worker(self):
        self.worker_age += 1
        worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
                                   self.app, self.timeout / 2.0,
                                   self.cfg, self.log)
        self.cfg.pre_fork(self, worker)  # call hook
        pid = os.fork()
        if pid != 0:
            worker.pid = pid
            self.WORKERS[pid] = worker
            return pid

        # Process Child
        worker.pid = os.getpid()
        try:
            util._setproctitle("worker [%s]" % self.proc_name)
            self.log.info("Booting worker with pid: %s", worker.pid)
            self.cfg.post_fork(self, worker)
            worker.init_process()
            sys.exit(0)  # never execute here
        except SystemExit:
            raise
        except AppImportError as e:
            self.log.debug("Exception while loading the application",
                           exc_info=True)
            print("%s" % e, file=sys.stderr)
            sys.stderr.flush()
            sys.exit(self.APP_LOAD_ERROR)
        except:
            self.log.exception("Exception in worker process"),
            if not worker.booted:
                sys.exit(self.WORKER_BOOT_ERROR)
            sys.exit(-1)
        finally:
            self.log.info("Worker exiting (pid: %s)", worker.pid)
            try:
                worker.tmp.close()
                self.cfg.worker_exit(self, worker)
            except:
                self.log.warning("Exception during worker exit:\n%s",
                                  traceback.format_exc())

Gunicorn 支持多种类型的 worker,默认为 gunicorn.workers.sync.SyncWorker,这里便以此为例

Worker  
 +-- AiohttpWorker
 +-- TornadoWorker
 +-- ThreadWorker
 +-- SyncWorker
 +-- AsyncWorker
      +-- EventletWorker
      +-- GeventWorker

Worker 的定义如下

# gunicorn/workers/base.py
class Worker(object):

    SIGNALS = [getattr(signal, "SIG%s" % x)
            for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]

    PIPE = []

    def __init__(self, age, ppid, sockets, app, timeout, cfg, log):
        """\
        This is called pre-fork so it shouldn't do anything to the
        current process. If there's a need to make process wide
        changes you'll want to do that in ``self.init_process()``.
        """
        self.age = age
        self.pid = "[booting]"
        self.ppid = ppid
        self.sockets = sockets
        self.app = app
        self.timeout = timeout
        self.cfg = cfg
        self.booted = False
        self.aborted = False
        self.reloader = None

        self.nr = 0
        # avoid all workers restarting at the same time
        jitter = randint(0, cfg.max_requests_jitter)
        self.max_requests = (cfg.max_requests + jitter) or MAXSIZE
        self.alive = True
        self.log = log
        self.tmp = WorkerTmp(cfg)

nr 属性记录当前处理的 req 数目,如果超过了 max_requests 则会将 alive 置为 Falseworker 便会主动退出。这部分逻辑在 handle_request() 方法中

class Worker(object):  
    def init_process(self):
        # set environment' variables
        if self.cfg.env:
            for k, v in self.cfg.env.items():
                os.environ[k] = v

        # 更改 uid gid
        util.set_owner_process(self.cfg.uid, self.cfg.gid,
                               initgroups=self.cfg.initgroups)

        # Reseed the random number generator
        util.seed()

        # For waking ourselves up
        self.PIPE = os.pipe()
        for p in self.PIPE:
            util.set_non_blocking(p)
            util.close_on_exec(p)

        # Prevent fd inheritance
        [util.close_on_exec(s) for s in self.sockets]
        util.close_on_exec(self.tmp.fileno())
        # include pipe
        self.wait_fds = self.sockets + [self.PIPE[0]]

        self.log.close_on_exec()

        self.init_signals()

        # start the reloader
        if self.cfg.reload:
            def changed(fname):
                self.log.info("Worker reloading: %s modified", fname)
                self.alive = False
                self.cfg.worker_int(self)
                time.sleep(0.1)
                sys.exit(0)

            reloader_cls = reloader_engines[self.cfg.reload_engine]
            self.reloader = reloader_cls(callback=changed)
            self.reloader.start()

        self.load_wsgi()
        self.cfg.post_worker_init(self)

        # Enter main run loop
        self.booted = True
        self.run()

init_signals() 方法重置了信号处理事件,并为 SIGQUITSIGTERMSIGINTSIGWINCHSIGUSR1SIGABRT 这些信号重新注册了 handler

load_wsgi() 方法调用了 app.wsgi() 加载了我们的 wsgi app

# gunicorn/app/base.py
class BaseApplication(object):  
    def wsgi(self):
        if self.callable is None:
            self.callable = self.load()
        return self.callable

# gunicorn/app/wsgiapp.py
class WSGIApplication(Application):  
    def load_wsgiapp(self):
        self.chdir()
        # load the app
        return util.import_app(self.app_uri)

    def load(self):
        if self.cfg.paste is not None:
            return self.load_pasteapp()
        else:
            return self.load_wsgiapp()

# gunicorn/util.py
def import_app(module):  
    parts = module.split(":", 1)
    if len(parts) == 1:
        module, obj = module, "application"
    else:
        module, obj = parts[0], parts[1]

    try:
        __import__(module)
    except ImportError:
        if module.endswith(".py") and os.path.exists(module):
            msg = "Failed to find application, did you mean '%s:%s'?"
            raise ImportError(msg % (module.rsplit(".", 1)[0], obj))
        else:
            raise

    mod = sys.modules[module]

    is_debug = logging.root.level == logging.DEBUG
    try:
        # specify context
        app = eval(obj, vars(mod))
    except NameError:
        if is_debug:
            traceback.print_exception(*sys.exc_info())
        raise AppImportError("Failed to find application: %r" % module)

    if app is None:
        raise AppImportError("Failed to find application object: %r" % obj)

    if not callable(app):
        raise AppImportError("Application object must be callable.")
    return app

接下来看 run() 方法的实现,这是由对应的子类实现的

# gunicorn/app/sync.py
class SyncWorker(base.Worker):  
    def run(self):
        # if no timeout is given the worker will never wait and will
        # use the CPU for nothing. This minimal timeout prevent it.
        timeout = self.timeout or 0.5

        # self.socket appears to lose its blocking status after
        # we fork in the arbiter. Reset it here.
        for s in self.sockets:
            s.setblocking(0)

        if len(self.sockets) > 1:
            self.run_for_multiple(timeout)
        else:
            self.run_for_one(timeout)

根据 socket 的数量使用了不同策略,以仅有一个 socket 的情况为例

# gunicorn/app/sync.py
class SyncWorker(base.Worker):  
    def accept(self, listener):
        client, addr = listener.accept()
        client.setblocking(1)
        util.close_on_exec(client)
        # parse http req, call wsgi app, make resp
        self.handle(listener, client, addr)

    def run_for_one(self, timeout):
        listener = self.sockets[0]
        while self.alive:
            self.notify()
            # Accept a connection. If we get an error telling us
            # that no connection is waiting we fall down to the
            # select which is where we'll wait for a bit for new
            # workers to come give us some love.
            try:
                self.accept(listener)
                # Keep processing clients until no one is waiting. This
                # prevents the need to select() for every client that we
                # process.
                continue
            except EnvironmentError as e:
                if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK):
                    raise

            if not self.is_parent_alive():
                return

            try:
                self.wait(timeout)
            except StopWaiting:
                return

需要注意的是我们的 socket 在先前被置为 nonblocking,所以这里会异常 socket.error: [Errno 11] Resource temporarily unavailable。而这里使用 EnvironmentError 则是为了版本兼容,可以参考 文档,在 Python 3.3 后 EnvironmentErrorIOError 都是 OSError 的 Alias

接下来的 is_parent_alive() 方法通过检查目前的 ppidworker 诞生时的 ppid 是否相同来判断父进程是否还活着。因为如果父进程挂掉,子进程会被 init 进程收留,这时 ppid 会变成 1

wait() 中则是通过 select (并未没有使用 epoll) 来等待 socket 或者 PIPE 就绪

# gunicorn/app/sync.py
class SyncWorker(base.Worker):  
    def wait(self, timeout):
        try:
            self.notify()
            # wait_fds include PIPE
            ret = select.select(self.wait_fds, [], [], timeout)
            if ret[0]:
                if self.PIPE[0] in ret[0]:
                    os.read(self.PIPE[0], 1)
                return ret[0]
        except select.error as e:
            if e.args[0] == errno.EINTR:
                return self.sockets
            if e.args[0] == errno.EBADF:
                if self.nr < 0:
                    return self.sockets
                else:
                    raise StopWaiting
            raise

之前提到过 worker 也重新注册了部分 signal 的 handler,因为 Arbiterworker 间也通过信号进行通信

# gunicorn/arbiter.py
class Arbiter(object):  
    def kill_worker(self, pid, sig):
        try:
            os.kill(pid, sig)
        except OSError as e:
            if e.errno == errno.ESRCH:
                try:
                    worker = self.WORKERS.pop(pid)
                    worker.tmp.close()
                    # server hook
                    self.cfg.worker_exit(self, worker)
                    return
                except (KeyError, OSError):
                    return
            raise

类似下图表示

                        ----------
                        | worker | (wait connection)
           (manage)     ----------
          -----------  /
signal -> | Arbiter |  
          -----------  \
                        ----------
                        | worker | (wait connection)
                        ----------

这里拿 reload 操作为例。通过向 Arbiter 进程发送 HUP 信号可以 reload 我们的 worker,而且是平滑的 reload。不会导致现有的服务中断(自称)

根据上一篇的分析,Arbiter 进程接收到信号后,经过入队、出队后会执行对应的 handler handle_hup()。此方法去调用 reload() 方法

# gunicorn/arbiter.py
class Arbiter(object):  
    def reload(self):
        old_address = self.cfg.address

        # reset old environment
        for k in self.cfg.env:
            if k in self.cfg.env_orig:
                # reset the key to the value it had before
                # we launched gunicorn
                os.environ[k] = self.cfg.env_orig[k]
            else:
                # delete the value set by gunicorn
                try:
                    del os.environ[k]
                except KeyError:
                    pass

        # reload conf
        self.app.reload()
        self.setup(self.app)

        # reopen log files
        self.log.reopen_files()

        # do we need to change listener ?
        if old_address != self.cfg.address:
            # close all listeners
            [l.close() for l in self.LISTENERS]
            # init new listeners
            self.LISTENERS = sock.create_sockets(self.cfg, self.log)
            listeners_str = ",".join([str(l) for l in self.LISTENERS])
            self.log.info("Listening at: %s", listeners_str)

        # call server hook
        self.cfg.on_reload(self)

        # unlink pidfile
        if self.pidfile is not None:
            self.pidfile.unlink()

        # create new pidfile
        if self.cfg.pidfile is not None:
            self.pidfile = Pidfile(self.cfg.pidfile)
            self.pidfile.create(self.pid)

        # set new proc_name
        util._setproctitle("master [%s]" % self.proc_name)

        # spawn new workers
        for i in range(self.cfg.workers):
            self.spawn_worker()

        # manage workers
        self.manage_workers()

可以看到首先是通过 app.reload() 实现配置文件的 reload,然后调用 setup() 方法覆盖了之前的一些属性比如 worker_classworker_num 等。如果需要更换 ADDR 则关闭先前的 LISTENERS 然后重新创建

接着是创建指定数量的 worker。此时 Gunicorn 是新旧 worker 共同存在的状态,二者同时接入流量。调用 manage_workers() 方法,显然当前的 worker 数量已经超出,所以 kill 掉那些历史残留

# gunicorn/arbiter.py
class Arbiter(object):  
    def manage_workers(self):
        workers = self.WORKERS.items()
        workers = sorted(workers, key=lambda w: w[1].age)
        while len(workers) > self.num_workers:
            (pid, _) = workers.pop(0)
        self.kill_worker(pid, signal.SIGTERM)

worker 会接收到 SIGTERM 信号,调用 handle_exit() 方法

# gunicorn/workers/base.py
class Worker(object):  
    def handle_exit(self, sig, frame):
        self.alive = False

此方法就是简单的将 FLAG 置为 False。参考 run_for_one()worker 会主动退出。之后 master 进程会收到 SIGCHLD 信号去 wait 已经退出的子进程,避免出现僵尸进程。如此一般,便实现了 worker 的新旧交替

Gunicorn 的基本工作流程就这样了。现在提一个问题,在使用 SyncWorkerworker 数量为 9 且均空闲的情况下,如果有连接到来会有几个进程被唤醒?答案所有进程。不过如果你添一个 print debug 时,可能只会看到 1~9 个输出。全部唤醒是由于 Gunicorn 没有处理 epoll 惊群,但由于 kernel 层面处理了 accept 惊群所以虽然有多个进程被唤醒,不过最终还是只有一个进程 accept 成功。而 debug 时不能每次浮现所有进程被唤醒的情形是因为已经有进程 accpet 成功了,事件被清除,其余进程不会唤醒