Gunicorn 19.7.1 版本源码,commit sha 为 328e509260ae70de6c04c5ba885ee17960b3ced5
先来看 worker
是如何产生的
# gunicorn/arbiter.py
class Arbiter(object):
def spawn_worker(self):
self.worker_age += 1
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker) # call hook
pid = os.fork()
if pid != 0:
worker.pid = pid
self.WORKERS[pid] = worker
return pid
# Process Child
worker.pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker.pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0) # never execute here
except SystemExit:
raise
except AppImportError as e:
self.log.debug("Exception while loading the application",
exc_info=True)
print("%s" % e, file=sys.stderr)
sys.stderr.flush()
sys.exit(self.APP_LOAD_ERROR)
except:
self.log.exception("Exception in worker process"),
if not worker.booted:
sys.exit(self.WORKER_BOOT_ERROR)
sys.exit(-1)
finally:
self.log.info("Worker exiting (pid: %s)", worker.pid)
try:
worker.tmp.close()
self.cfg.worker_exit(self, worker)
except:
self.log.warning("Exception during worker exit:\n%s",
traceback.format_exc())
Gunicorn 支持多种类型的 worker
,默认为 gunicorn.workers.sync.SyncWorker
,这里便以此为例
Worker
+-- AiohttpWorker
+-- TornadoWorker
+-- ThreadWorker
+-- SyncWorker
+-- AsyncWorker
+-- EventletWorker
+-- GeventWorker
Worker
的定义如下
# gunicorn/workers/base.py
class Worker(object):
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]
PIPE = []
def __init__(self, age, ppid, sockets, app, timeout, cfg, log):
"""\
This is called pre-fork so it shouldn't do anything to the
current process. If there's a need to make process wide
changes you'll want to do that in ``self.init_process()``.
"""
self.age = age
self.pid = "[booting]"
self.ppid = ppid
self.sockets = sockets
self.app = app
self.timeout = timeout
self.cfg = cfg
self.booted = False
self.aborted = False
self.reloader = None
self.nr = 0
# avoid all workers restarting at the same time
jitter = randint(0, cfg.max_requests_jitter)
self.max_requests = (cfg.max_requests + jitter) or MAXSIZE
self.alive = True
self.log = log
self.tmp = WorkerTmp(cfg)
nr
属性记录当前处理的 req 数目,如果超过了 max_requests
则会将 alive
置为 False
,worker
便会主动退出。这部分逻辑在 handle_request()
方法中
class Worker(object):
def init_process(self):
# set environment' variables
if self.cfg.env:
for k, v in self.cfg.env.items():
os.environ[k] = v
# 更改 uid gid
util.set_owner_process(self.cfg.uid, self.cfg.gid,
initgroups=self.cfg.initgroups)
# Reseed the random number generator
util.seed()
# For waking ourselves up
self.PIPE = os.pipe()
for p in self.PIPE:
util.set_non_blocking(p)
util.close_on_exec(p)
# Prevent fd inheritance
[util.close_on_exec(s) for s in self.sockets]
util.close_on_exec(self.tmp.fileno())
# include pipe
self.wait_fds = self.sockets + [self.PIPE[0]]
self.log.close_on_exec()
self.init_signals()
# start the reloader
if self.cfg.reload:
def changed(fname):
self.log.info("Worker reloading: %s modified", fname)
self.alive = False
self.cfg.worker_int(self)
time.sleep(0.1)
sys.exit(0)
reloader_cls = reloader_engines[self.cfg.reload_engine]
self.reloader = reloader_cls(callback=changed)
self.reloader.start()
self.load_wsgi()
self.cfg.post_worker_init(self)
# Enter main run loop
self.booted = True
self.run()
init_signals()
方法重置了信号处理事件,并为 SIGQUIT
、SIGTERM
、SIGINT
、SIGWINCH
、SIGUSR1
、SIGABRT
这些信号重新注册了 handler
load_wsgi()
方法调用了 app.wsgi()
加载了我们的 wsgi app
# gunicorn/app/base.py
class BaseApplication(object):
def wsgi(self):
if self.callable is None:
self.callable = self.load()
return self.callable
# gunicorn/app/wsgiapp.py
class WSGIApplication(Application):
def load_wsgiapp(self):
self.chdir()
# load the app
return util.import_app(self.app_uri)
def load(self):
if self.cfg.paste is not None:
return self.load_pasteapp()
else:
return self.load_wsgiapp()
# gunicorn/util.py
def import_app(module):
parts = module.split(":", 1)
if len(parts) == 1:
module, obj = module, "application"
else:
module, obj = parts[0], parts[1]
try:
__import__(module)
except ImportError:
if module.endswith(".py") and os.path.exists(module):
msg = "Failed to find application, did you mean '%s:%s'?"
raise ImportError(msg % (module.rsplit(".", 1)[0], obj))
else:
raise
mod = sys.modules[module]
is_debug = logging.root.level == logging.DEBUG
try:
# specify context
app = eval(obj, vars(mod))
except NameError:
if is_debug:
traceback.print_exception(*sys.exc_info())
raise AppImportError("Failed to find application: %r" % module)
if app is None:
raise AppImportError("Failed to find application object: %r" % obj)
if not callable(app):
raise AppImportError("Application object must be callable.")
return app
接下来看 run()
方法的实现,这是由对应的子类实现的
# gunicorn/app/sync.py
class SyncWorker(base.Worker):
def run(self):
# if no timeout is given the worker will never wait and will
# use the CPU for nothing. This minimal timeout prevent it.
timeout = self.timeout or 0.5
# self.socket appears to lose its blocking status after
# we fork in the arbiter. Reset it here.
for s in self.sockets:
s.setblocking(0)
if len(self.sockets) > 1:
self.run_for_multiple(timeout)
else:
self.run_for_one(timeout)
根据 socket
的数量使用了不同策略,以仅有一个 socket
的情况为例
# gunicorn/app/sync.py
class SyncWorker(base.Worker):
def accept(self, listener):
client, addr = listener.accept()
client.setblocking(1)
util.close_on_exec(client)
# parse http req, call wsgi app, make resp
self.handle(listener, client, addr)
def run_for_one(self, timeout):
listener = self.sockets[0]
while self.alive:
self.notify()
# Accept a connection. If we get an error telling us
# that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new
# workers to come give us some love.
try:
self.accept(listener)
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except EnvironmentError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK):
raise
if not self.is_parent_alive():
return
try:
self.wait(timeout)
except StopWaiting:
return
需要注意的是我们的 socket
在先前被置为 nonblocking,所以这里会异常 socket.error: [Errno 11] Resource temporarily unavailable
。而这里使用 EnvironmentError
则是为了版本兼容,可以参考 文档,在 Python 3.3 后 EnvironmentError
和 IOError
都是 OSError
的 Alias
接下来的 is_parent_alive()
方法通过检查目前的 ppid
和 worker
诞生时的 ppid
是否相同来判断父进程是否还活着。因为如果父进程挂掉,子进程会被 init 进程收留,这时 ppid
会变成 1
wait()
中则是通过 select
(并未没有使用 epoll
) 来等待 socket
或者 PIPE
就绪
# gunicorn/app/sync.py
class SyncWorker(base.Worker):
def wait(self, timeout):
try:
self.notify()
# wait_fds include PIPE
ret = select.select(self.wait_fds, [], [], timeout)
if ret[0]:
if self.PIPE[0] in ret[0]:
os.read(self.PIPE[0], 1)
return ret[0]
except select.error as e:
if e.args[0] == errno.EINTR:
return self.sockets
if e.args[0] == errno.EBADF:
if self.nr < 0:
return self.sockets
else:
raise StopWaiting
raise
之前提到过 worker
也重新注册了部分 signal
的 handler,因为 Arbiter
和 worker
间也通过信号进行通信
# gunicorn/arbiter.py
class Arbiter(object):
def kill_worker(self, pid, sig):
try:
os.kill(pid, sig)
except OSError as e:
if e.errno == errno.ESRCH:
try:
worker = self.WORKERS.pop(pid)
worker.tmp.close()
# server hook
self.cfg.worker_exit(self, worker)
return
except (KeyError, OSError):
return
raise
类似下图表示
----------
| worker | (wait connection)
(manage) ----------
----------- /
signal -> | Arbiter |
----------- \
----------
| worker | (wait connection)
----------
这里拿 reload
操作为例。通过向 Arbiter
进程发送 HUP
信号可以 reload
我们的 worker
,而且是平滑的 reload
。不会导致现有的服务中断(自称)
根据上一篇的分析,Arbiter
进程接收到信号后,经过入队、出队后会执行对应的 handler handle_hup()
。此方法去调用 reload()
方法
# gunicorn/arbiter.py
class Arbiter(object):
def reload(self):
old_address = self.cfg.address
# reset old environment
for k in self.cfg.env:
if k in self.cfg.env_orig:
# reset the key to the value it had before
# we launched gunicorn
os.environ[k] = self.cfg.env_orig[k]
else:
# delete the value set by gunicorn
try:
del os.environ[k]
except KeyError:
pass
# reload conf
self.app.reload()
self.setup(self.app)
# reopen log files
self.log.reopen_files()
# do we need to change listener ?
if old_address != self.cfg.address:
# close all listeners
[l.close() for l in self.LISTENERS]
# init new listeners
self.LISTENERS = sock.create_sockets(self.cfg, self.log)
listeners_str = ",".join([str(l) for l in self.LISTENERS])
self.log.info("Listening at: %s", listeners_str)
# call server hook
self.cfg.on_reload(self)
# unlink pidfile
if self.pidfile is not None:
self.pidfile.unlink()
# create new pidfile
if self.cfg.pidfile is not None:
self.pidfile = Pidfile(self.cfg.pidfile)
self.pidfile.create(self.pid)
# set new proc_name
util._setproctitle("master [%s]" % self.proc_name)
# spawn new workers
for i in range(self.cfg.workers):
self.spawn_worker()
# manage workers
self.manage_workers()
可以看到首先是通过 app.reload()
实现配置文件的 reload
,然后调用 setup()
方法覆盖了之前的一些属性比如 worker_class
、worker_num
等。如果需要更换 ADDR
则关闭先前的 LISTENERS
然后重新创建
接着是创建指定数量的 worker
。此时 Gunicorn 是新旧 worker
共同存在的状态,二者同时接入流量。调用 manage_workers()
方法,显然当前的 worker
数量已经超出,所以 kill
掉那些历史残留
# gunicorn/arbiter.py
class Arbiter(object):
def manage_workers(self):
workers = self.WORKERS.items()
workers = sorted(workers, key=lambda w: w[1].age)
while len(workers) > self.num_workers:
(pid, _) = workers.pop(0)
self.kill_worker(pid, signal.SIGTERM)
worker
会接收到 SIGTERM
信号,调用 handle_exit()
方法
# gunicorn/workers/base.py
class Worker(object):
def handle_exit(self, sig, frame):
self.alive = False
此方法就是简单的将 FLAG
置为 False
。参考 run_for_one()
,worker
会主动退出。之后 master 进程会收到 SIGCHLD
信号去 wait
已经退出的子进程,避免出现僵尸进程。如此一般,便实现了 worker
的新旧交替
Gunicorn 的基本工作流程就这样了。现在提一个问题,在使用 SyncWorker
,worker
数量为 9 且均空闲的情况下,如果有连接到来会有几个进程被唤醒?答案所有进程。不过如果你添一个 print
debug 时,可能只会看到 1~9 个输出。全部唤醒是由于 Gunicorn 没有处理 epoll 惊群,但由于 kernel 层面处理了 accept 惊群所以虽然有多个进程被唤醒,不过最终还是只有一个进程 accept 成功。而 debug 时不能每次浮现所有进程被唤醒的情形是因为已经有进程 accpet 成功了,事件被清除,其余进程不会唤醒