提起 subprocess 执行 shell 命令,最大的坑就是不去 wait;或者 buf 满的时候父进程没有及时读取数据而去 wait,子进程又想继续写入,从而造成死锁,此问题 文档 中已经写明:建议使用 communicate。但是它是一下子读取完的,数据量大的时候效率不高。所以可以用个 select 自己撸一遍,做一个流式处理
communicate 的内部实现其实也是使用了 IO Multiplexing 去轮询多个管道。嗯,我指的是 *nix 下;Windows 下是开了线程的
本文仅以 Linux 部分举例,重点放在父子进程间的数据交互,代码基于 CPython 3.6.3, commit sha 为 2c5fed86e0cbba5a4e34792b0083128ce659909d
子进程创建#
subprocess.Popen 是一个类,当我们实例化它时会创建一个对应的进程。而这个实例是此进程的对象表示
class Popen(object):
_child_created = False # Set here since __del__ checks it
def __init__(self, args, bufsize=-1, executable=None,
stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=_PLATFORM_DEFAULT_CLOSE_FDS,
shell=False, cwd=None, env=None, universal_newlines=False,
startupinfo=None, creationflags=0,
restore_signals=True, start_new_session=False,
pass_fds=(), *, encoding=None, errors=None):
_cleanup() # 清理遗留问题
# Held while anything is calling waitpid before returncode has been
# updated to prevent clobbering returncode if wait() or poll() are
# called from multiple threads at once. After acquiring the lock,
# code must re-check self.returncode to see if another thread just
# finished a waitpid() call.
self._waitpid_lock = threading.Lock()
self._input = None
self._communication_started = False
if bufsize is None:
bufsize = -1 # Restore default
if not isinstance(bufsize, int):
raise TypeError("bufsize must be an integer")
if close_fds is _PLATFORM_DEFAULT_CLOSE_FDS:
close_fds = True
if pass_fds and not close_fds:
warnings.warn("pass_fds overriding close_fds.", RuntimeWarning)
close_fds = True
if startupinfo is not None:
raise ValueError("startupinfo is only supported on Windows "
"platforms")
if creationflags != 0:
raise ValueError("creationflags is only supported on Windows "
"platforms")
self.args = args
self.stdin = None
self.stdout = None
self.stderr = None
self.pid = None
self.returncode = None
self.universal_newlines = universal_newlines
self.encoding = encoding
self.errors = errors
(p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite) = self._get_handles(stdin, stdout, stderr)
text_mode = encoding or errors or universal_newlines
self._closed_child_pipe_fds = False
# 从 fd 转换为文件对象
try:
if p2cwrite != -1:
self.stdin = io.open(p2cwrite, 'wb', bufsize)
if text_mode:
self.stdin = io.TextIOWrapper(self.stdin, write_through=True,
line_buffering=(bufsize == 1),
encoding=encoding, errors=errors)
if c2pread != -1:
self.stdout = io.open(c2pread, 'rb', bufsize)
if text_mode:
self.stdout = io.TextIOWrapper(self.stdout,
encoding=encoding, errors=errors)
if errread != -1:
self.stderr = io.open(errread, 'rb', bufsize)
if text_mode:
self.stderr = io.TextIOWrapper(self.stderr,
encoding=encoding, errors=errors)
self._execute_child(args, executable, preexec_fn, close_fds,
pass_fds, cwd, env,
startupinfo, creationflags, shell,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite,
restore_signals, start_new_session)
except:
# Cleanup if the child failed starting.
for f in filter(None, (self.stdin, self.stdout, self.stderr)):
try:
f.close()
except OSError:
pass # Ignore EBADF or other errors.
if not self._closed_child_pipe_fds:
to_close = []
# 这里仅关闭我们内部创建的那些管道,不能将用户传入的管道关闭掉
if stdin == PIPE:
to_close.append(p2cread)
if stdout == PIPE:
to_close.append(c2pwrite)
if stderr == PIPE:
to_close.append(errwrite)
if hasattr(self, '_devnull'):
to_close.append(self._devnull)
for fd in to_close:
try:
os.close(fd)
except OSError:
pass
raise
_cleanup 感觉像是在结束时调用的,这里却是在最开始的时候调用。那么来看一下它做了哪些清理的工作
# This lists holds Popen instances for which the underlying process had not
# exited at the time its __del__ method got called: those processes are wait()ed
# for synchronously from _cleanup() when a new Popen object is created, to avoid
# zombie processes.
_active = []
def _cleanup():
for inst in _active[:]:
res = inst._internal_poll(_deadstate=sys.maxsize)
if res is not None:
try:
_active.remove(inst)
except ValueError:
# This can happen if two threads create a new Popen instance.
# It's harmless that it was already removed, so ignore.
pass
请注意代码中写的是 for inst in _active[:] 而不是 for inst in _active。这是因为考虑到了在迭代过程中,其他线程有可能去修改 _active,所以这里是获取整个 slice
如果你不明白可以体会一下下面的示例
l = list(range(10))
for i in l:
print(i)
l.append(i)
# 对比
l = list(range(10))
for i in l[:]:
print(i)
l.append(i)
_active 中存放的是被销毁时底层对应进程还未结束的 Popen 实例。当我们再一次创建 Popen 实例时,会尝试去 wait 它们以免产生 zombie process
关于 _internal_poll 我们稍后再说
_get_handles 方法按需初始化了一堆管道,用于和子进程进行通信
class Popen(object):
def _get_handles(self, stdin, stdout, stderr):
p2cread, p2cwrite = -1, -1
c2pread, c2pwrite = -1, -1
errread, errwrite = -1, -1
# 下面的代码分别初始化了 stdin, stdout, stderr 三个管道
if stdin is None: # 默认情况,即 subprocess.Popen 时未指定 stdin
pass
elif stdin == PIPE: # 即 subprocess.PIPE,自动创建管道
p2cread, p2cwrite = os.pipe()
elif stdin == DEVNULL:
p2cread = self._get_devnull() # 子进程从 /dev/null 读取
elif isinstance(stdin, int): # 用户传入自定义管道,如 os.pipe()
p2cread = stdin
else:
# Assuming file-like object
p2cread = stdin.fileno() # 当自定义管道非 fd 时,如 socket.socketpair()
if stdout is None:
pass
elif stdout == PIPE:
c2pread, c2pwrite = os.pipe()
elif stdout == DEVNULL:
c2pwrite = self._get_devnull() # 子进程向 /dev/null 写入
elif isinstance(stdout, int):
c2pwrite = stdout
else:
# Assuming file-like object
c2pwrite = stdout.fileno()
if stderr is None:
pass
elif stderr == PIPE:
errread, errwrite = os.pipe()
elif stderr == STDOUT: # 将子进程的 stderr 定向到 stdout 中
if c2pwrite != -1:
errwrite = c2pwrite
else: # child's stdout is not set, use parent's stdout
errwrite = sys.__stdout__.fileno()
elif stderr == DEVNULL:
errwrite = self._get_devnull()
elif isinstance(stderr, int):
errwrite = stderr
else:
# Assuming file-like object
errwrite = stderr.fileno()
return (p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite)
即建立了如下的管道
# Parent Child
# ------ -----
# p2cwrite ---stdin---> p2cread
# c2pread <--stdout--- c2pwrite
# errread <--stderr--- errwrite
随后打开这些 fd,封装一层 io.TextIOWrapper。接下来便是 fork-exec
class Popen(object):
def _execute_child(self, args, executable, preexec_fn, close_fds,
pass_fds, cwd, env,
startupinfo, creationflags, shell,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite,
restore_signals, start_new_session):
if isinstance(args, (str, bytes)):
args = [args]
else:
args = list(args)
if shell:
args = ["/bin/sh", "-c"] + args # execute through the shell
if executable:
args[0] = executable
if executable is None:
executable = args[0]
orig_executable = executable
# For transferring possible exec failure from child to parent.
# Data format: "exception name:hex errno:description"
# Pickle is not used; it is complex and involves memory allocation.
errpipe_read, errpipe_write = os.pipe()
# errpipe_write must not be in the standard io 0, 1, or 2 fd range.
low_fds_to_close = []
while errpipe_write < 3: # 此处参考 https://bugs.python.org/issue15798
low_fds_to_close.append(errpipe_write)
errpipe_write = os.dup(errpipe_write)
for low_fd in low_fds_to_close:
os.close(low_fd)
try:
try:
# We must avoid complex work that could involve
# malloc or free in the child process to avoid
# potential deadlocks, thus we do all this here.
# and pass it to fork_exec()
# 处理环境变量
if env is not None:
env_list = []
for k, v in env.items():
k = os.fsencode(k)
if b'=' in k:
raise ValueError("illegal environment variable name")
env_list.append(k + b'=' + os.fsencode(v))
else:
env_list = None # Use execv instead of execve.
executable = os.fsencode(executable)
if os.path.dirname(executable):
executable_list = (executable,)
else:
# This matches the behavior of os._execvpe().
executable_list = tuple(
os.path.join(os.fsencode(dir), executable)
for dir in os.get_exec_path(env))
fds_to_keep = set(pass_fds)
fds_to_keep.add(errpipe_write)
# 调用 C 实现
self.pid = _posixsubprocess.fork_exec(
args, executable_list,
close_fds, tuple(sorted(map(int, fds_to_keep))),
cwd, env_list,
p2cread, p2cwrite, c2pread, c2pwrite,
errread, errwrite,
errpipe_read, errpipe_write,
restore_signals, start_new_session, preexec_fn)
self._child_created = True
finally:
# be sure the FD is closed no matter what
os.close(errpipe_write)
# self._devnull is not always defined.
devnull_fd = getattr(self, '_devnull', None)
# 这里关闭了管道中不需要使用的那一端
if p2cread != -1 and p2cwrite != -1 and p2cread != devnull_fd:
os.close(p2cread)
if c2pwrite != -1 and c2pread != -1 and c2pwrite != devnull_fd:
os.close(c2pwrite)
if errwrite != -1 and errread != -1 and errwrite != devnull_fd:
os.close(errwrite)
if devnull_fd is not None:
os.close(devnull_fd)
# Prevent a double close of these fds from __init__ on error.
self._closed_child_pipe_fds = True
# Wait for exec to fail or succeed; possibly raising an
# exception (limited in size)
errpipe_data = bytearray()
while True:
part = os.read(errpipe_read, 50000)
errpipe_data += part
if not part or len(errpipe_data) > 50000:
break
finally:
# be sure the FD is closed no matter what
os.close(errpipe_read)
# exec 失败
if errpipe_data:
try:
pid, sts = os.waitpid(self.pid, 0)
if pid == self.pid:
self._handle_exitstatus(sts)
else:
self.returncode = sys.maxsize
except ChildProcessError:
pass
try:
exception_name, hex_errno, err_msg = (
errpipe_data.split(b':', 2))
# The encoding here should match the encoding
# written in by the subprocess implementations
# like _posixsubprocess
err_msg = err_msg.decode()
except ValueError:
exception_name = b'SubprocessError'
hex_errno = b'0'
err_msg = 'Bad exception data from child: {!r}'.format(
bytes(errpipe_data))
child_exception_type = getattr(
builtins, exception_name.decode('ascii'),
SubprocessError)
if issubclass(child_exception_type, OSError) and hex_errno:
errno_num = int(hex_errno, 16)
child_exec_never_called = (err_msg == "noexec")
if child_exec_never_called:
err_msg = ""
# The error must be from chdir(cwd).
err_filename = cwd
else:
err_filename = orig_executable
if errno_num != 0:
err_msg = os.strerror(errno_num)
if errno_num == errno.ENOENT:
err_msg += ': ' + repr(err_filename)
raise child_exception_type(errno_num, err_msg, err_filename)
raise child_exception_type(err_msg)
errpipe_read 和 errpipe_write 这条管道区别于 errread 和 errwrite,用于传递 exec 时的错误
_handle_exitstatus 针对不同的退出情况做了不同的处理
class Popen(object):
def _handle_exitstatus(self, sts, _WIFSIGNALED=os.WIFSIGNALED,
_WTERMSIG=os.WTERMSIG, _WIFEXITED=os.WIFEXITED,
_WEXITSTATUS=os.WEXITSTATUS, _WIFSTOPPED=os.WIFSTOPPED,
_WSTOPSIG=os.WSTOPSIG):
"""All callers to this function MUST hold self._waitpid_lock."""
# This method is called (indirectly) by __del__, so it cannot
# refer to anything outside of its local scope.
if _WIFSIGNALED(sts): # if the process exited due to a signal
# return the signal which caused the process to exit
self.returncode = -_WTERMSIG(sts)
elif _WIFEXITED(sts): # if the process exited using the exit(2) system call
# return the integer parameter to the exit(2) system call
self.returncode = _WEXITSTATUS(sts)
elif _WIFSTOPPED(sts): # if the process has been stopped
# return the signal which caused the process to stop.
self.returncode = -_WSTOPSIG(sts)
else:
# Should never happen
raise SubprocessError("Unknown child exit status!")
之前从来没想过 returncode 的获取是如此繁琐,于是翻了一遍 wait(2),发现自己的理解还是不到位的
首先 wait, waitpid, waitid 这一组系统调用是用于等待进程状态改变(wait for process to change state),而不是先前理解的结束
All of these system calls are used to wait for state changes in a child of the calling process, and obtain information about the child whose state has changed. A state change is considered to be: the child terminated; the child was stopped by a signal; or the child was resumed by a signal. In the case of a terminated child, performing a wait allows the system to release the resources associated with the child; if a wait is not performed, then the terminated child remains in a “zombie” state. The
waitpid()system call suspends execution of the calling process until a child specified by pid argument has changed state. By default,waitpid()waits only for terminated children, but this behavior is modifiable via the options argument, as described below.
比如可以检测子进程的 stop 和 resume
while True:
_, sts = os.waitpid(pid, os.WUNTRACED | os.WCONTINUED)
if os.WIFSTOPPED(sts):
print('stop')
elif os.WIFCONTINUED(sts):
print('continue')
else:
break
communicate#
communicate 的实现如下
class Popen(object):
def communicate(self, input=None, timeout=None):
if self._communication_started and input:
raise ValueError("Cannot send input after starting communication")
# Optimization: If we are not worried about timeouts, we haven't
# started communicating, and we have one or zero pipes, using select()
# or threads is unnecessary.
if (timeout is None and not self._communication_started and
[self.stdin, self.stdout, self.stderr].count(None) >= 2):
stdout = None
stderr = None
if self.stdin:
self._stdin_write(input)
elif self.stdout:
stdout = self.stdout.read()
self.stdout.close()
elif self.stderr:
stderr = self.stderr.read()
self.stderr.close()
self.wait()
else: # 需要同时处理多个管道
if timeout is not None:
endtime = _time() + timeout
else:
endtime = None
try:
stdout, stderr = self._communicate(input, endtime, timeout)
finally:
self._communication_started = True
sts = self.wait(timeout=self._remaining_time(endtime))
return (stdout, stderr)
这里有一个优化:当我们仅需要处理一个管道,或者根本对这些不关心时,直接读取数据然后 wait 即可。但是当我们需要处理多个管道且需要做限制时间时便需要通过 _communicate 来完成
# poll/select have the advantage of not requiring any extra file
# descriptor, contrarily to epoll/kqueue (also, they require a single
# syscall).
if hasattr(selectors, 'PollSelector'):
_PopenSelector = selectors.PollSelector
else:
_PopenSelector = selectors.SelectSelector
class Popen(object):
def _communicate(self, input, endtime, orig_timeout):
if self.stdin and not self._communication_started:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
try:
self.stdin.flush()
except BrokenPipeError:
pass # communicate() must ignore BrokenPipeError.
if not input:
try:
self.stdin.close()
except BrokenPipeError:
pass # communicate() must ignore BrokenPipeError.
stdout = None
stderr = None
# Only create this mapping if we haven't already.
if not self._communication_started:
self._fileobj2output = {}
if self.stdout:
self._fileobj2output[self.stdout] = []
if self.stderr:
self._fileobj2output[self.stderr] = []
if self.stdout:
stdout = self._fileobj2output[self.stdout]
if self.stderr:
stderr = self._fileobj2output[self.stderr]
self._save_input(input)
if self._input:
input_view = memoryview(self._input)
with _PopenSelector() as selector:
# 注册
if self.stdin and input:
selector.register(self.stdin, selectors.EVENT_WRITE)
if self.stdout:
selector.register(self.stdout, selectors.EVENT_READ)
if self.stderr:
selector.register(self.stderr, selectors.EVENT_READ)
while selector.get_map(): # 当前还存在需要监听的时间
timeout = self._remaining_time(endtime) # endtime - time.monotonic()
if timeout is not None and timeout < 0:
raise TimeoutExpired(self.args, orig_timeout)
ready = selector.select(timeout)
self._check_timeout(endtime, orig_timeout) # 二次检测,是否超时
for key, events in ready:
if key.fileobj is self.stdin:
chunk = input_view[self._input_offset :
self._input_offset + _PIPE_BUF]
try:
self._input_offset += os.write(key.fd, chunk)
except BrokenPipeError:
selector.unregister(key.fileobj)
key.fileobj.close()
else:
if self._input_offset >= len(self._input):
selector.unregister(key.fileobj)
key.fileobj.close()
elif key.fileobj in (self.stdout, self.stderr):
# 32KB 优化,参考 https://bugs.python.org/issue19929
data = os.read(key.fd, 32768)
if not data:
selector.unregister(key.fileobj)
key.fileobj.close()
self._fileobj2output[key.fileobj].append(data)
self.wait(timeout=self._remaining_time(endtime))
# All data exchanged. Translate lists into strings.
if stdout is not None:
stdout = b''.join(stdout)
if stderr is not None:
stderr = b''.join(stderr)
# Translate newlines, if requested.
# This also turns bytes into strings.
if self.encoding or self.errors or self.universal_newlines:
if stdout is not None:
stdout = self._translate_newlines(stdout,
self.stdout.encoding,
self.stdout.errors)
if stderr is not None:
stderr = self._translate_newlines(stderr,
self.stderr.encoding,
self.stderr.errors)
return (stdout, stderr)
_communicate 使用了 IO Multiplexing 来处理 stdin, stdout, stderr。因为至多会处理两个,所以用不着 epoll(需要三个系统调用)
wait#
wait 的实现
class Popen(object):
def wait(self, timeout=None, endtime=None):
if self.returncode is not None:
return self.returncode
if endtime is not None:
warnings.warn(
"'endtime' argument is deprecated; use 'timeout'.",
DeprecationWarning,
stacklevel=2)
if endtime is not None or timeout is not None:
if endtime is None:
endtime = _time() + timeout
elif timeout is None:
timeout = self._remaining_time(endtime)
if endtime is not None:
# Enter a busy loop if we have a timeout. This busy loop was
# cribbed from Lib/threading.py in Thread.wait() at r71065.
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
if self._waitpid_lock.acquire(False): # 非阻塞
try:
if self.returncode is not None:
break # Another thread waited.
(pid, sts) = self._try_wait(os.WNOHANG)
assert pid == self.pid or pid == 0
if pid == self.pid:
self._handle_exitstatus(sts)
break
finally:
self._waitpid_lock.release()
remaining = self._remaining_time(endtime)
if remaining <= 0:
raise TimeoutExpired(self.args, timeout)
delay = min(delay * 2, remaining, .05)
time.sleep(delay)
else:
while self.returncode is None:
with self._waitpid_lock:
if self.returncode is not None:
break # Another thread waited.
(pid, sts) = self._try_wait(0)
# Check the pid and loop as waitpid has been known to
# return 0 even without WNOHANG in odd situations.
# http://bugs.python.org/issue14396.
if pid == self.pid:
self._handle_exitstatus(sts)
return self.returncode
wait 通过 _waitpid_lock 保证了同一时刻只有一个线程在去 wait 子进程。wait 的核心部分在 _try_wait 中,实际上就是对 os.waitpid 的封装。对于设置超时的情况,我们采取非阻塞的方式去获取锁,然后不断校对是否超时。os.WNOHANG 即 wait no hange,如果子进程没有结束则返回 (0, 0)
_try_wait 的实现如下
class Popen(object):
def _try_wait(self, wait_flags):
try:
(pid, sts) = os.waitpid(self.pid, wait_flags)
except ChildProcessError:
# This happens if SIGCLD is set to be ignored or waiting
# for child processes has otherwise been disabled for our
# process. This child is dead, we can't get the status.
pid = self.pid
sts = 0
return (pid, sts)
poll 可以去检查子进程是否结束,它就是调用的 _internal_poll。_internal_poll 的实现如下
class Popen(object):
def _internal_poll(self, _deadstate=None, _waitpid=os.waitpid,
_WNOHANG=os.WNOHANG, _ECHILD=errno.ECHILD):
"""Check if child process has terminated. Returns returncode
attribute.
This method is called by __del__, so it cannot reference anything
outside of the local scope (nor can any methods it calls).
"""
if self.returncode is None:
if not self._waitpid_lock.acquire(False):
# Something else is busy calling waitpid. Don't allow two
# at once. We know nothing yet.
return None
try:
if self.returncode is not None:
return self.returncode # Another thread waited.
pid, sts = _waitpid(self.pid, _WNOHANG)
if pid == self.pid:
self._handle_exitstatus(sts)
except OSError as e:
if _deadstate is not None:
self.returncode = _deadstate
elif e.errno == _ECHILD:
# This happens if SIGCLD is set to be ignored or
# waiting for child processes has otherwise been
# disabled for our process. This child is dead, we
# can't get the status.
# http://bugs.python.org/issue15756
self.returncode = 0
finally:
self._waitpid_lock.release()
return self.returncode
当然如果一个 Popen 实例的引用计数归零,也会尝试去 wait。若失败则添加到 _active 中
class Popen(object):
def __del__(self, _maxsize=sys.maxsize, _warn=warnings.warn):
if not self._child_created:
# We didn't get to successfully create a child process.
return
if self.returncode is None:
# Not reading subprocess exit status creates a zombie process which
# is only destroyed at the parent python process exit
_warn("subprocess %s is still running" % self.pid,
ResourceWarning, source=self)
# In case the child hasn't been waited on, check if it's done.
self._internal_poll(_deadstate=_maxsize)
if self.returncode is None and _active is not None:
# Child is still running, keep us alive until we can wait on it.
_active.append(self)
执行 _active.append(self) 后,我们对此实例进行了新的引用,所以并不会被销毁,这种手段被称为 object resurrection。CPython 中当此实例被再次销毁时,并不会再次调用 __del__,参考 Python DataModel
It is implementation-dependent whether
__del__()is called a second time when a resurrected object is about to be destroyed; the current CPython implementation only calls it once.
另外需要注意 __del__ 并不是一定会被调用的,参考 Python DataModel
It is not guaranteed that
__del__()methods are called for objects that still exist when the interpreter exits.
此时未结束的子进程会被 init 接管