Skip to main content

CPython 源码阅读 - Signal

·1951 words·10 mins
Table of Contents

本文代码取自 CPython 3.6.3, commit sha 为 2c5fed86e0cbba5a4e34792b0083128ce659909d

About Signal
#

On Unix systems, there are several ways to send signals to processes—with a kill command, with a keyboard sequence (like control-C), or through your own program (e.g., using a kill command in C). Signals are also generated by hardware exceptions such as segmentation faults and illegal instructions, timers and child process termination.
When a signal is sent, the operating system interrupts the target process’ normal flow of execution to deliver the signal. Execution can be interrupted during any non-atomic instruction. If the process has previously registered a signal handler, that routine is executed. Otherwise, the default signal handler is executed.

Linux 下可以通过 /proc 目录来确定进程对信号的处理方式,下面是一个普通 Python 进程的采样

$ cat /proc/<pid>/status | grep Sig
SigQ:	  0/15318
SigPnd:	0000000000000000
ShdPnd:	0000000000000000
SigBlk:	0000000000000000
SigIgn:	0000000001001000
SigCgt:	0000000180000002
  • SigQ Current/max. queued signals (since 2.6.12)
  • SigPnd Signals pending for thread (线程级未决信号)
  • ShdPnd Signals pending for process (since 2.6)
  • SigBlk Signals being blocked
  • SigIgn Signals being ignored
  • SigCgt Signals being caught

值均以十六进制表示,是一个 bitmap,所以总共有 64(4*16) 种信号存在

以上例中的 SigCgt 为例(转换成二级制后)

110000000000000000000000000000010
||                             `->  2=SIGINT
|`-------------------------------> 32=SIGRTMIN-2
`--------------------------------> 33=SIGRTMIN-1

如果我们在脚本中注册新的信号处理函数 signal.signal(signal.SIGHUP, signal.default_int_handler),那么此时 SigCgt 的值会变成 0000000180000003

SIGINT 是我们常用到的一个信号,即键盘敲下 所发送的信号。Python 在启动时默认注册了一个信号处理函数,它会抛出 KeyboardInterrupt

Python Implementation
#

文档上对于 Python 的信号处理做了大致的描述

A Python signal handler does not get executed inside the low-level (C) signal handler. Instead, the low-level signal handler sets a flag which tells the virtual machine to execute the corresponding Python signal handler at a later point(for example at the next bytecode instruction). This has consequences:

  • It makes little sense to catch synchronous errors like SIGFPE or SIGSEGV that are caused by an invalid operation in C code. Python will return from the signal handler to the C code, which is likely to raise the same signal again, causing Python to apparently hang. From Python 3.3 onwards, you can use the faulthandler module to report on synchronous errors.
  • A long-running calculation implemented purely in C (such as regular expression matching on a large body of text) may run uninterrupted for an arbitrary amount of time, regardless of any signals received. The Python signal handlers will be called when the calculation finishes.

Python 的入口在 Modules/main.c

// Modules/main.c
int
Py_Main(int argc, wchar_t **argv)
{
   // ...
   Py_Initialize();
   // ...
}

Py_Initialize 初始化 Python 环境,在 Py_ Initialize 中仅有一个函数被调用,即函数 Py_InitializeEx

// Python/pylifecycle.c
void
Py_InitializeEx(int install_sigs)
{
    _Py_InitializeEx_Private(install_sigs, 1);
}

void
Py_Initialize(void)
{
    Py_InitializeEx(1);
}

_Py_InitializeEx_Private 函数挺长的,这里只摘取我们所关心的 signal 部分

// Python/pylifecycle.c
void
_Py_InitializeEx_Private(int install_sigs, int install_importlib)
{   
    // ...
    if (install_sigs)
        initsigs(); /* Signal handling stuff, including initintr() */  
    // ...
}

initsigs 的实现如下

// Python/pylifecycle.c
static void
initsigs(void)
{
#ifdef SIGPIPE
    PyOS_setsig(SIGPIPE, SIG_IGN);  // 忽略 SIGPIPE
#endif
#ifdef SIGXFZ
    PyOS_setsig(SIGXFZ, SIG_IGN);  // 忽略 SIGXFZ
#endif
#ifdef SIGXFSZ
    PyOS_setsig(SIGXFSZ, SIG_IGN);  // 忽略 SIGXFSZ
#endif
    PyOS_InitInterrupts(); /* May imply initsignal() */
    if (PyErr_Occurred()) {
        Py_FatalError("Py_Initialize: can't import signal");
    }
}

PyOS_setsig 是对 OS 层级 signal 的封装。根据这里可以知道为什么本文最开始示例中 SigIgn 为什么是 0X1001000

  • SIGPIPE: 当进程试图写入数据到管道、FIFO、Socket,但却没有相应的读取进程,会触发这个信号。这个信号通常是由于读取进程关闭了 IPC 通道的文件描述符而产生
  • SIGXFZ: 这个 man 手册里没有,不清楚用途
  • SIGXFSZ: 如果进程试图(使用 write()truncate())增加文件的大小,但却超出了进程的文件大小资源限制(RLIMIT_FSIZE),会发送这个信号给该进程
// Modules/signalmodule.c
void
PyOS_InitInterrupts(void)
{
    PyObject *m = PyImport_ImportModule("_signal");
    if (m) {
        Py_DECREF(m);
    }
}

这里通过 PyImport_ImportModule 导入了 _signal 模块,其实现如下

// Modules/signalmodule.c
static struct PyModuleDef signalmodule = {
    PyModuleDef_HEAD_INIT,
    "_signal",
    module_doc,
    -1,
    signal_methods,
    NULL,
    NULL,
    NULL,
    NULL
};

模块初始化

// Modules/signalmodule.c
PyMODINIT_FUNC
PyInit__signal(void)
{
    PyObject *m, *d, *x;
    int i;

#ifdef WITH_THREAD
    main_thread = PyThread_get_thread_ident();
    main_pid = getpid();
#endif

    /* Create the module and add the functions */
    m = PyModule_Create(&signalmodule);
    if (m == NULL)
        return NULL;

#if defined(HAVE_SIGWAITINFO) || defined(HAVE_SIGTIMEDWAIT)
    if (!initialized) {
        if (PyStructSequence_InitType2(&SiginfoType, &struct_siginfo_desc) < 0)
            return NULL;
    }
    Py_INCREF((PyObject*) &SiginfoType);
    PyModule_AddObject(m, "struct_siginfo", (PyObject*) &SiginfoType);
    initialized = 1;
#endif

    /* Add some symbolic constants to the module */
    d = PyModule_GetDict(m);

    x = DefaultHandler = PyLong_FromVoidPtr((void *)SIG_DFL);
    // 相当于 d["SIG_DFL"] = x
    if (!x || PyDict_SetItemString(d, "SIG_DFL", x) < 0)
        goto finally;  // 失败

    x = IgnoreHandler = PyLong_FromVoidPtr((void *)SIG_IGN);
    if (!x || PyDict_SetItemString(d, "SIG_IGN", x) < 0)
        goto finally;
    // NSIG 为定义的 signal 数量(64),但由于从 1 起始,所以 NSIG 的值为 65
    x = PyLong_FromLong((long)NSIG);
    if (!x || PyDict_SetItemString(d, "NSIG", x) < 0)
        goto finally;
    Py_DECREF(x);

#ifdef SIG_BLOCK
    if (PyModule_AddIntMacro(m, SIG_BLOCK))
         goto finally;
#endif
#ifdef SIG_UNBLOCK
    if (PyModule_AddIntMacro(m, SIG_UNBLOCK))
         goto finally;
#endif
#ifdef SIG_SETMASK
    if (PyModule_AddIntMacro(m, SIG_SETMASK))
         goto finally;
#endif

    // 中断信号(SIGINT)的默认处理函数
    x = IntHandler = PyDict_GetItemString(d, "default_int_handler");
    if (!x)
        goto finally;
    Py_INCREF(IntHandler);

    _Py_atomic_store_relaxed(&Handlers[0].tripped, 0);
    // 对 Handlers 进行初始化
    for (i = 1; i < NSIG; i++) {
        void (*t)(int);
        t = PyOS_getsig(i);
        _Py_atomic_store_relaxed(&Handlers[i].tripped, 0);
        if (t == SIG_DFL)
            Handlers[i].func = DefaultHandler;
        else if (t == SIG_IGN)
            Handlers[i].func = IgnoreHandler;
        else
            Handlers[i].func = Py_None; /* None of our business */
        Py_INCREF(Handlers[i].func);
    }
    // 将 SIGINT 默认的信号处理函数替换为 Python 自定义的通用处理函数 signal_handler
    // 并在 Handlers 数组中注册 IntHandler
    if (Handlers[SIGINT].func == DefaultHandler) {
        /* Install default int handler */
        Py_INCREF(IntHandler);
        Py_SETREF(Handlers[SIGINT].func, IntHandler);
        old_siginthandler = PyOS_setsig(SIGINT, signal_handler);
    }

// 为模块添加 SIGHUP 整型常量(signal.SIGHUP 的定义)
#ifdef SIGHUP
    if (PyModule_AddIntMacro(m, SIGHUP))
         goto finally;
#endif
#ifdef SIGINT
    if (PyModule_AddIntMacro(m, SIGINT))
         goto finally;
#endif
// ... 以此类推,这里省略
    if (PyErr_Occurred()) {
        Py_DECREF(m);
        m = NULL;
    }

    finally:
        return m;
}

Handlers 是一个结构体数组,它由一个是否发生该信号的标志 tripped 和用户自定义的信号处理函数 func 组成,定义如下

// Modules/signalmodule.c
static volatile struct {
    _Py_atomic_int tripped;
    PyObject *func;
} Handlers[NSIG];

default_int_handler 的定义如下

// Modules/signalmodule.c
static PyObject *
signal_default_int_handler(PyObject *self, PyObject *args)
{
    // 抛出 KeyboardInterrupt
    PyErr_SetNone(PyExc_KeyboardInterrupt);
    return NULL;
}

接下里看一下 Python 中用于注册信号处理函数的 signal.signal 的实现

// Modules/signalmodule.c
static PyObject *
signal_signal_impl(PyObject *module, int signalnum, PyObject *handler)
{
    PyObject *old_handler;
    void (*func)(int);
#ifdef WITH_THREAD
    // 检测当前线程是否为主线程
    if (PyThread_get_thread_ident() != main_thread) {
        PyErr_SetString(PyExc_ValueError,
                        "signal only works in main thread");
        return NULL;
    }
#endif
    if (signalnum < 1 || signalnum >= NSIG) {
        PyErr_SetString(PyExc_ValueError,
                        "signal number out of range");
        return NULL;
    }
    if (handler == IgnoreHandler)
        func = SIG_IGN;
    else if (handler == DefaultHandler)
        func = SIG_DFL;
    else if (!PyCallable_Check(handler)) {
        PyErr_SetString(PyExc_TypeError,
"signal handler must be signal.SIG_IGN, signal.SIG_DFL, or a callable object");
                return NULL;
    }
    else
        func = signal_handler;  // [1]
    if (PyOS_setsig(signalnum, func) == SIG_ERR) {
        PyErr_SetFromErrno(PyExc_OSError);
        return NULL;
    }
    old_handler = Handlers[signalnum].func;
    _Py_atomic_store_relaxed(&Handlers[signalnum].tripped, 0);
    Py_INCREF(handler);
    Handlers[signalnum].func = handler;  // 将用户自定义 handler 记录在数组中,建立映射关系
    if (old_handler != NULL)
        return old_handler;
    else
        Py_RETURN_NONE;
}

根据 [1] 处可以知道对于传入的所有自定义信号处理函数,实际上在系统上注册的都是 signal_handler,其实现如下

// Modules/signalmodule.c
static void
signal_handler(int sig_num)
{
    int save_errno = errno;

#ifdef WITH_THREAD
    /* See NOTES section above */
    if (getpid() == main_pid)
#endif
    {
        trip_signal(sig_num);
    }
    /* Issue #10311: asynchronously executing signal handlers should not
       mutate errno under the feet of unsuspecting C code. */
    errno = save_errno;
}

trip_signal 的实现如下

// Modules/signalmodule.c
#define INVALID_FD (-1)
static volatile sig_atomic_t wakeup_fd = -1;

static void
trip_signal(int sig_num)
{
    unsigned char byte;
    int fd;
    Py_ssize_t rc;

    // 标志位置 1,代表信号发生
    _Py_atomic_store_relaxed(&Handlers[sig_num].tripped, 1);

    /* Set is_tripped after setting .tripped, as it gets
       cleared in PyErr_CheckSignals() before .tripped. */
    _Py_atomic_store(&is_tripped, 1);

    /* Notify ceval.c */
    _PyEval_SignalReceived();

    /* And then write to the wakeup fd *after* setting all the globals and
       doing the _PyEval_SignalReceived. We used to write to the wakeup fd
       and then set the flag, but this allowed the following sequence of events
       (especially on windows, where trip_signal may run in a new thread):
       - main thread blocks on select([wakeup_fd], ...)
       - signal arrives
       - trip_signal writes to the wakeup fd
       - the main thread wakes up
       - the main thread checks the signal flags, sees that they're unset
       - the main thread empties the wakeup fd
       - the main thread goes back to sleep
       - trip_signal sets the flags to request the Python-level signal handler
         be run
       - the main thread doesn't notice, because it's asleep
       See bpo-30038 for more details.
    */
    // 先置 flag 后写入 wakeup_fd,否则会引起 bug

    fd = wakeup_fd;

    if (fd != INVALID_FD) {
        byte = (unsigned char)sig_num;
        /* _Py_write_noraise() retries write() if write() is interrupted by
           a signal (fails with EINTR). */
        rc = _Py_write_noraise(fd, &byte, 1);  // 写入信号对应的整型表示

        if (rc < 0) {
            /* Py_AddPendingCall() isn't signal-safe, but we
               still use it for this exceptional case. */
            Py_AddPendingCall(report_wakeup_write_error,
                              (void *)(intptr_t)errno);
        }
    }
}

核心部分位于 _PyEval_SignalReceived

wakeup_fd 默认为 INVALID_FD,用户可以通过 signal.set_wakeup_fd 对其进行设置

// Python/ceval.c
void
_PyEval_SignalReceived(void)
{
    /* bpo-30703: Function called when the C signal handler of Python gets a
       signal. We cannot queue a callback using Py_AddPendingCall() since
       that function is not async-signal-safe. */
    // 这里之前是直接调用的 Py_AddPendingCall 但是存在不可重入问题
    // 参考 https://github.com/python/cpython/pull/2408
    SIGNAL_PENDING_CALLS();
}
// Python/ceval.c
/* Pending calls are only modified under pending_lock */
#define SIGNAL_PENDING_CALLS() \
    do { \
        _Py_atomic_store_relaxed(&pendingcalls_to_do, 1); \
        _Py_atomic_store_relaxed(&eval_breaker, 1); \
    } while (0)

这一系列的操作最终影响下一次解释器循环

// Python/ceval.c
PyObject *
_PyEval_EvalFrameDefault(PyFrameObject *f, int throwflag)
{
    for (;;) {
        assert(stack_pointer >= f->f_valuestack); /* else underflow */
        assert(STACK_LEVEL() <= co->co_stacksize);  /* else overflow */
        assert(!PyErr_Occurred());

        /* Do periodic things.  Doing this every time through
           the loop would add too much overhead, so we do it
           only every Nth instruction.  We also do it if
           ``pendingcalls_to_do'' is set, i.e. when an asynchronous
           event needs attention (e.g. a signal handler or
           async I/O handler); see Py_AddPendingCall() and
           Py_MakePendingCalls() above. */

        if (_Py_atomic_load_relaxed(&eval_breaker)) {
            if (_Py_OPCODE(*next_instr) == SETUP_FINALLY ||
                _Py_OPCODE(*next_instr) == YIELD_FROM) {
                /* Two cases where we skip running signal handlers and other
                   pending calls:
                   - If we're about to enter the try: of a try/finally (not
                     *very* useful, but might help in some cases and it's
                     traditional)
                   - If we're resuming a chain of nested 'yield from' or
                     'await' calls, then each frame is parked with YIELD_FROM
                     as its next opcode. If the user hit control-C we want to
                     wait until we've reached the innermost frame before
                     running the signal handler and raising KeyboardInterrupt
                     (see bpo-30039).
                */
                goto fast_next_opcode;
            }
            if (_Py_atomic_load_relaxed(&pendingcalls_to_do)) {
                if (Py_MakePendingCalls() < 0)
                    goto error;
            }
        }
    }
}

Python 中维持了一个 pendingcalls 数组,可以将需要在下一次解释器循环(ceval.c 中 那个超长 for (;;))中调用的函数放在里面

// Python/ceval.c
#define NPENDINGCALLS 32
static struct {
    int (*func)(void *);
    void *arg;
} pendingcalls[NPENDINGCALLS]

Py_MakePendingCalls 是用于调度所有存放在 pendingcalls 中的等候函数,同时它也负责处理信号

// Python/ceval.c
int
Py_MakePendingCalls(void)
{
    // 这里省略掉调度 pendingcall 的部分

    /* unsignal before starting to call callbacks, so that any callback
       added in-between re-signals */
    UNSIGNAL_PENDING_CALLS();  // 与 SIGNAL_PENDING_CALLS 相反,清除标志位

    /* Python signal handler doesn't really queue a callback: it only signals
       that a signal was received, see _PyEval_SignalReceived(). */
    if (PyErr_CheckSignals() < 0) {
        goto error;
    }

error:
    SIGNAL_PENDING_CALLS(); /* We're not done yet */
    return -1;
}

PyErr_CheckSignals 中对用户注册的信号处理函数进行调度

// Modules/signalmodule.c
int
PyErr_CheckSignals(void)
{
    int i;
    PyObject *f;

    // 没有收到信号,直接返回
    if (!_Py_atomic_load(&is_tripped))
        return 0;

#ifdef WITH_THREAD
    if (PyThread_get_thread_ident() != main_thread)
        return 0;
#endif

    /*
     * The is_tripped variable is meant to speed up the calls to
     * PyErr_CheckSignals (both directly or via pending calls) when no
     * signal has arrived. This variable is set to 1 when a signal arrives
     * and it is set to 0 here, when we know some signals arrived. This way
     * we can run the registered handlers with no signals blocked.
     *
     * NOTE: with this approach we can have a situation where is_tripped is
     *       1 but we have no more signals to handle (Handlers[i].tripped
     *       is 0 for every signal i). This won't do us any harm (except
     *       we're gonna spent some cycles for nothing). This happens when
     *       we receive a signal i after we zero is_tripped and before we
     *       check Handlers[i].tripped.
     */
    _Py_atomic_store(&is_tripped, 0);

    if (!(f = (PyObject *)PyEval_GetFrame()))
        f = Py_None;

    // 遍历 Handlers,调用所有处于触发态信号的处理函数
    for (i = 1; i < NSIG; i++) {
        if (_Py_atomic_load_relaxed(&Handlers[i].tripped)) {
            PyObject *result = NULL;
            PyObject *arglist = Py_BuildValue("(iO)", i, f);
            _Py_atomic_store_relaxed(&Handlers[i].tripped, 0);

            if (arglist) {
                // will call PyEval_CallObjectWithKeywords
                result = PyEval_CallObject(Handlers[i].func,
                                           arglist);
                Py_DECREF(arglist);
            }
            if (!result) {
                _Py_atomic_store(&is_tripped, 1);
                return -1;
            }

            Py_DECREF(result);
        }
    }
    return 0;
}

注意即使我们在执行这一段代码时甚至是信号处理函数时,也可能有信号到来。比如在收到 SIGINT 信号到调用信号处理函数这段时间内,也可以再次收到 SIGINT 信号,这种情况下信号处理函数是只会调用一次的。同样的在循环 Handlers 时,如果在循环至某个信号之前接收到了此信号,则会调用对应的信号处理函数,但是此时 is_tripped 被置为 1,接下来会造成一次不会处理任何信号的循环

Summary
#

  1. Python 解释器初始化(Py_InitializeEx)时会为 SIGINT 信号注册 default_int_handler
  2. 当接收到信号时会保存当前上下文,然后调用注册的信号处理函数 trip_signal。此时通过设置 Handlers 数组中对应信号的标志位来标记信号被触发,并且通过 SIGNAL_PENDING_CALLS() 更改解释器的状态变量,然后返回恢复进程上下文。解释器在执行下一条 opcode 时会检测状态变量,遍历 Handlers 执行所有已触发信号的处理函数。这种处理方式和操作系统对于中断的处理相似,分为上半部和下半部两部分执行
  3. Python signal handlers are always executed in the main Python thread, even if the signal was received in another thread. Besides, only the main thread is allowed to set a new signal handler.
  4. 如果发送多次信号可能只会调用一次信号处理函数
  5. 在我们恢复一个 generators/coroutines 链时,除非到达最内层,否则不会调用信号处理函数。 Resuming a ‘yield from’ stack is broken if a signal arrives in the middle

Reference
#

Linux/UNIX系统编程手册
Unix: Dealing with signals