大师兄的Python源码学习笔记(三十九): Python的多线程机制(一)
大师兄的Python源码学习笔记(四十一): Python的多线程机制(三)
二、关于_thread包和threading
- Python提供的最基础的多线程机制接口是_thread,用C实现。
- threading包提供更高层的多线程机制接口,用Python实现。
- 我们先从更底层的_thread接口开始:
Modules\_threadmodule.c
static PyMethodDef thread_methods[] = {
{"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS, start_new_doc},
{"start_new", (PyCFunction)thread_PyThread_start_new_thread,
METH_VARARGS, start_new_doc},
{"allocate_lock", (PyCFunction)thread_PyThread_allocate_lock,
METH_NOARGS, allocate_doc},
{"allocate", (PyCFunction)thread_PyThread_allocate_lock,
METH_NOARGS, allocate_doc},
{"exit_thread", (PyCFunction)thread_PyThread_exit_thread,
METH_NOARGS, exit_doc},
{"exit", (PyCFunction)thread_PyThread_exit_thread,
METH_NOARGS, exit_doc},
{"interrupt_main", (PyCFunction)thread_PyThread_interrupt_main,
METH_NOARGS, interrupt_doc},
{"get_ident", (PyCFunction)thread_get_ident,
METH_NOARGS, get_ident_doc},
{"_count", (PyCFunction)thread__count,
METH_NOARGS, _count_doc},
{"stack_size", (PyCFunction)thread_stack_size,
METH_VARARGS, stack_size_doc},
{"_set_sentinel", (PyCFunction)thread__set_sentinel,
METH_NOARGS, _set_sentinel_doc},
{NULL, NULL} /* sentinel */
};
- 可以发现,
threadmodule
中有的接口以不同形式出现了两次,比如allocate_lock
和allocate
,但实际在他们都对应的是thread_PyThread_allocate_lock函数。
三、Python线程的创建
- 观察thread_methods,可以发现其中创建线程的接口对应的是thread_PyThread_start_new_thread函数:
Modules\_threadmodule.c
static PyObject *
thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
{
PyObject *func, *args, *keyw = NULL;
struct bootstate *boot;
unsigned long ident;
... ...
boot = PyMem_NEW(struct bootstate, 1);
if (boot == NULL)
return PyErr_NoMemory();
boot->interp = PyThreadState_GET()->interp;
boot->func = func;
boot->args = args;
boot->keyw = keyw;
boot->tstate = _PyThreadState_Prealloc(boot->interp);
if (boot->tstate == NULL) {
PyMem_DEL(boot);
return PyErr_NoMemory();
}
Py_INCREF(func);
Py_INCREF(args);
Py_XINCREF(keyw);
PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
...
return PyLong_FromUnsignedLong(ident);
}
- 在thread_PyThread_start_new_thread中,虚拟机通过三个主要动作来完成线程的创建:
1. 创建并初始化boot,其中保存着线程的所有信息:
boot->interp = PyThreadState_GET()->interp; boot->func = func; boot->args = args; boot->keyw = keyw; boot->tstate = _PyThreadState_Prealloc(boot->interp);
2. 初始化Python的多线程环境:
PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
3. 以boot为参数,创建操作系统的原生线程:
ident = PyThread_start_new_thread(t_bootstrap, (void*) boot); ... return PyLong_FromUnsignedLong(ident);
- 可以看到boot->interp中保存了Python的PyInterpreterState对象,这个对象中携带了module pool这样的全局信息,所有的线程会共享这些全局信息。
1. 建立多线程环境
- 建立多线程环境的核心是建立GIL,GIL是一个_gil_runtime_state结构体:
struct _gil_runtime_state {
/* microseconds (the Python API uses seconds, though) */
unsigned long interval;
/* Last PyThreadState holding / having held the GIL. This helps us
know whether anyone else was scheduled after we dropped the GIL. */
_Py_atomic_address last_holder;
/* Whether the GIL is already taken (-1 if uninitialized). This is
atomic because it can be read without any lock taken in ceval.c. */
_Py_atomic_int locked;
/* Number of GIL switches since the beginning. */
unsigned long switch_number;
/* This condition variable allows one or several threads to wait
until the GIL is released. In addition, the mutex also protects
the above variables. */
PyCOND_T cond;
PyMUTEX_T mutex;
#ifdef FORCE_SWITCHING
/* This condition variable helps the GIL-releasing thread wait for
a GIL-awaiting thread to be scheduled and take the GIL. */
PyCOND_T switch_cond;
PyMUTEX_T switch_mutex;
#endif
};
值 | 含义 |
---|---|
interval | 一个线程拥有gil的间隔,默认是5000毫秒 |
last_holder | 最后一个持有GIL的PyThreadState(线程), |
locked | GIL是否被获取,如果未被获取这个值为-1,这个是原子性的,因为在ceval.c中不需要任何锁就能够读取它 |
switch_number | 从GIL创建之后,总共切换的次数 |
cond | 允许一个或多个线程等待,直到GIL被释放 |
mutex | 负责保护上面的变量 |
- Python在初始化解释器时,会创建一把未初始化的GIL:
ceval.c
void
_PyEval_Initialize(struct _ceval_runtime_state *state)
{
state->recursion_limit = Py_DEFAULT_RECURSION_LIMIT;
_Py_CheckRecursionLimit = Py_DEFAULT_RECURSION_LIMIT;
_gil_initialize(&state->gil);
}
Python\ceval_gil.h
#define DEFAULT_INTERVAL 5000
static void _gil_initialize(struct _gil_runtime_state *state)
{
_Py_atomic_int uninitialized = {-1};
state->locked = uninitialized;
state->interval = DEFAULT_INTERVAL;
}
- 在激活多线程机制前,除了上面提到的未初始化的GIL,Python实际是不支持多线程的,这是因为Python选择了让用户激活多线程机制的策略,一旦用户调用
thread.start_new_thread
,才会开始实际初始化多线程环境。 - 而我们知道,start_new_thread会对应调用thread_PyThread_start_new_thread函数,并在其中调用PyEval_InitThreads初始化多线程环境:
ceval.c
void
PyEval_InitThreads(void)
{
if (gil_created())
return;
create_gil();
take_gil(PyThreadState_GET());
_PyRuntime.ceval.pending.main_thread = PyThread_get_thread_ident();
if (!_PyRuntime.ceval.pending.lock)
_PyRuntime.ceval.pending.lock = PyThread_allocate_lock();
}
- PyEval_InitThreads会通过gil_created以原子读操作检查GIL在线程中是否已经初始化:
ceval.c
static int gil_created(void)
{
return (_Py_atomic_load_explicit(&_PyRuntime.ceval.gil.locked,
_Py_memory_order_acquire)
) >= 0;
}
- 如果没有初始化,则通过create_gil初始化GIL:
ceval.c
static void create_gil(void)
{
MUTEX_INIT(_PyRuntime.ceval.gil.mutex);
#ifdef FORCE_SWITCHING
MUTEX_INIT(_PyRuntime.ceval.gil.switch_mutex);
#endif
COND_INIT(_PyRuntime.ceval.gil.cond);
#ifdef FORCE_SWITCHING
COND_INIT(_PyRuntime.ceval.gil.switch_cond);
#endif
_Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder, 0);
_Py_ANNOTATE_RWLOCK_CREATE(&_PyRuntime.ceval.gil.locked);
_Py_atomic_store_explicit(&_PyRuntime.ceval.gil.locked, 0,
_Py_memory_order_release);
}
- 在初始化GIL后,会通过take_gil函数获取GIL:
ceval.c
static void take_gil(PyThreadState *tstate)
{
int err;
if (tstate == NULL)
Py_FatalError("take_gil: NULL tstate");
err = errno;
MUTEX_LOCK(_PyRuntime.ceval.gil.mutex);
if (!_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked))
goto _ready;
while (_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked)) {
int timed_out = 0;
unsigned long saved_switchnum;
saved_switchnum = _PyRuntime.ceval.gil.switch_number;
COND_TIMED_WAIT(_PyRuntime.ceval.gil.cond, _PyRuntime.ceval.gil.mutex,
INTERVAL, timed_out);
/* If we timed out and no switch occurred in the meantime, it is time
to ask the GIL-holding thread to drop it. */
if (timed_out &&
_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked) &&
_PyRuntime.ceval.gil.switch_number == saved_switchnum) {
SET_GIL_DROP_REQUEST();
}
}
_ready:
#ifdef FORCE_SWITCHING
/* This mutex must be taken before modifying
_PyRuntime.ceval.gil.last_holder (see drop_gil()). */
MUTEX_LOCK(_PyRuntime.ceval.gil.switch_mutex);
#endif
/* We now hold the GIL */
_Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.locked, 1);
_Py_ANNOTATE_RWLOCK_ACQUIRED(&_PyRuntime.ceval.gil.locked, /*is_write=*/1);
if (tstate != (PyThreadState*)_Py_atomic_load_relaxed(
&_PyRuntime.ceval.gil.last_holder))
{
_Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder,
(uintptr_t)tstate);
++_PyRuntime.ceval.gil.switch_number;
}
#ifdef FORCE_SWITCHING
COND_SIGNAL(_PyRuntime.ceval.gil.switch_cond);
MUTEX_UNLOCK(_PyRuntime.ceval.gil.switch_mutex);
#endif
if (_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil_drop_request)) {
RESET_GIL_DROP_REQUEST();
}
if (tstate->async_exc != NULL) {
_PyEval_SignalAsyncExc();
}
MUTEX_UNLOCK(_PyRuntime.ceval.gil.mutex);
errno = err;
}
- 在take_gil时,首先获取互斥锁(mutex),并检查是否有线程持有GIL。
- 如果GIL被占用,则设置信号量(cond)等待,并将线程挂起,并释放互斥锁(mutex)。
- 当信号量(cond)被唤醒时,互斥锁(mutex)会被自动加锁。
- 大多数情况下通过条件满足唤醒,while循环用于避免意外唤醒时条件不足。
- 如果等待超时并且期间没有发生线程切换,则通过SET_GIL_DROP_REQUEST请求last_holder释放GIL。
- 如果GIL没有被占用,则将locked设置为1占用GIL,并将当前线程状态对象保存到last_holder,最后将切换次数(switch_number)加1。
- 获取GIL后,会通过PyThread_get_thread_ident函数获取线程id,PyThread_get_thread_ident会根据编译器和平台来确认初始化动作:
Include\pythread.h
unsigned long
PyThread_get_thread_ident(void)
{
if (!initialized)
PyThread_init_thread();
return GetCurrentThreadId();
}
Python\thread_pthread.h
unsigned long
PyThread_get_thread_ident(void)
{
volatile pthread_t threadid;
if (!initialized)
PyThread_init_thread();
threadid = pthread_self();
return (unsigned long) threadid;
}
- 在获取主线程id之前,会先检查
initialized
,也就是底层平台所提供的原生线程,如果原生线程没有建立,则通过PyThread_init_thread创建。Python\thread.c void PyThread_init_thread(void) { #ifdef Py_DEBUG const char *p = Py_GETENV("PYTHONTHREADDEBUG"); if (p) { if (*p) thread_debug = atoi(p); else thread_debug = 1; } #endif /* Py_DEBUG */ if (initialized) return; initialized = 1; dprintf(("PyThread_init_thread called\n")); PyThread__init_thread(); }
- 最后,创建GIL对应的互斥锁的工作由PyThread_allocate_lock完成,PyThread_allocate_lock会根据平台来确认动作:
Python\thread_nt.h
PyThread_type_lock
PyThread_allocate_lock(void)
{
PNRMUTEX aLock;
dprintf(("PyThread_allocate_lock called\n"));
if (!initialized)
PyThread_init_thread();
aLock = AllocNonRecursiveMutex() ;
dprintf(("%lu: PyThread_allocate_lock() -> %p\n", PyThread_get_thread_ident(), aLock));
return (PyThread_type_lock) aLock;
}
Python\thread_pthread.h
PyThread_type_lock
PyThread_allocate_lock(void)
{
pthread_lock *lock;
int status, error = 0;
dprintf(("PyThread_allocate_lock called\n"));
if (!initialized)
PyThread_init_thread();
lock = (pthread_lock *) PyMem_RawMalloc(sizeof(pthread_lock));
if (lock) {
memset((void *)lock, '\0', sizeof(pthread_lock));
lock->locked = 0;
status = pthread_mutex_init(&lock->mut,
pthread_mutexattr_default);
CHECK_STATUS_PTHREAD("pthread_mutex_init");
/* Mark the pthread mutex underlying a Python mutex as
pure happens-before. We can't simply mark the
Python-level mutex as a mutex because it can be
acquired and released in different threads, which
will cause errors. */
_Py_ANNOTATE_PURE_HAPPENS_BEFORE_MUTEX(&lock->mut);
status = pthread_cond_init(&lock->lock_released,
pthread_condattr_default);
CHECK_STATUS_PTHREAD("pthread_cond_init");
if (error) {
PyMem_RawFree((void *)lock);
lock = 0;
}
}
dprintf(("PyThread_allocate_lock() -> %p\n", lock));
return (PyThread_type_lock) lock;
}
- 创建的互斥锁结构体也根据平台并不相同:
Python\thread_nt.h typedef struct _NRMUTEX { PyMUTEX_T cs; PyCOND_T cv; int locked; } NRMUTEX; typedef NRMUTEX *PNRMUTEX;
Python\thread_pthread.h typedef struct { char locked; /* 0=unlocked, 1=locked */ /* a <cond, mutex> pair to handle an acquire of a locked lock */ pthread_cond_t lock_released; pthread_mutex_t mut; } pthread_lock;