建立多线程环境

python只有在运行module thread的start_new_thread方法后会启动多线程模式,即创建GIL锁。start_new_thread对应的实现是threadmdule.c里的thread_PyThread_start_new_thread,这个函数首先完成下面操作:

1
2
3
4
5
6
7
8
9
boot = PyMem_NEW(struct bootstate, 1);
boot->interp = PyThreadState_GET()->interp;
boot->func = func;
boot->args = args;
boot->keyw = keyw;

// 多线程环境
PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
  1. 初始化bootstate结构,这个结构保存线程信息
  2. 初始化python多线程环境,PyEval_InitThreads()
  3. 使用bootstate结构作为参数,创建系统原生线程,PyThread_start_new_thread

PyEval_InitThreads()会查看是否已经新建GIL,如果没有,使用PyThread_allocate_lock新建,然后主线程请求锁PyThread_acquire_lock这些函数的实现与操作系统相关。同时专门会有一个main_thread变量存放主线程pid。

子线程创建

在创建多线程环境的最后一行调用PyThread_start_new_thread来创建线程,这个函数根据不同系统有着不同的实现,下面是pthread的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
long
PyThread_start_new_thread(void (*func)(void *), void *arg)
{
pthread_t th;
int status;
pthread_attr_t attrs;
size_t tss;
if (pthread_attr_init(&attrs) != 0)
return -1;
tss = (_pythread_stacksize != 0) ? _pythread_stacksize
: THREAD_STACK_SIZE;
if (tss != 0) {
if (pthread_attr_setstacksize(&attrs, tss) != 0) {
pthread_attr_destroy(&attrs);
return -1;
}
}
pthread_attr_setscope(&attrs, PTHREAD_SCOPE_SYSTEM);

status = pthread_create(&th,
&attrs,
(pthread_attr_t*)NULL,
(void* (*)(void *))func,
(void *)arg
);
pthread_attr_destroy(&attrs);
if (status != 0)
return -1;
pthread_detach(th);

return (long) th;
}

关键在pthread_create上调用传入的func以及传入arg,这两个参数分别是t_bootstrapbootstate结构的boot,boot里面包括要执行的python函数、解释器。

子线程运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void
t_bootstrap(void *boot_raw)
{
struct bootstate *boot = (struct bootstate *) boot_raw;
PyThreadState *tstate;
PyObject *res;

tstate = PyThreadState_New(boot->interp);
PyEval_AcquireThread(tstate); // 请求GIL锁,子进程自己将自己挂起
res = PyEval_CallObjectWithKeywords(boot->func, boot->args, boot->keyw);
Py_DECREF(boot->func);
Py_DECREF(boot->args);
Py_XDECREF(boot->keyw);
PyMem_DEL(boot_raw);
PyThreadState_Clear(tstate);
PyThreadState_DeleteCurrent();
PyThread_exit_thread();
}

在t_bootstrap中分别执行:

  1. PyEval_AcquireThread(), 请求GIL锁,这个锁目前是在main_thread中。

    1. 将current thread state交互为当前子线程的state
  2. 使用PyEval_VallObjectWithKeywords来执行python函数。

    1. 这个函数内部执行PyObject_call
    2. PyObject_call会调用
    1
    2
    3
    4
    5
    6
    7
    8
    PyObject *
    PyObject_Call(PyObject *func, PyObject *arg, PyObject *kw)
    {
    ternaryfunc call;
    call = func->ob_type->tp_call
    PyObject *result = (*call)(func, arg, kw);
    return result;
    }
  3. 执行完成后,释放GIL。

线程运行环境

t_bootstrap中会新建PyThreadState,这个结构体保存frame、interpreter、thread_id、next_state等信息。

Python内部维护了一个State对象链表,当切换当前state时,会到这个链表上寻找。每个链表元素会保存thread_id、key、以及指向ThreadState的指针。

1
2
3
4
5
6
7
8
9
struct key {
/* Next record in the list, or NULL if this is the last record. */
struct key *next;
/* The thread id, according to PyThread_get_thread_ident(). */
long id;
/* The key and its associated value. */
int key;
void *value;
};

同时提供set_key_value、get_key_value、delete_key这几个api来操作这个链表。在子进程t_bootstrap创建函数中使用PyThreadState_New来创建state,同时放入key链表中。

子线程在获取GIL后,赶紧把当前state切换成自己的。之后进入PyEval_CallObjectWithKeywords,最后进入虚拟机的执行。

线程调度

python多线程使用系统原生的线程。python字节码解释器会按照指令的顺序一条条执行,python内部维护一个时钟值,每运行时钟值个指令后会启动线程调度(同时用于检测是否有异步事件发生)。

1
2
3
>>> import sys
>>> sys.getcheckinterval() # 查看内部时钟值
100

在进入虚拟机后,主要靠PyEval_EvalFrameEx来执行信息调度。

在Python/ceval.c/PyEval_EvalFrameEx函数中有个大循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
for(;;){
if (--_Py_Ticker < 0) { // 每_Py_CheckInterval个时间片段,进行一些需要周期执行的事情
if (*next_instr == SETUP_FINALLY) {
/* Make the last opcode before
a try: finally: block uninterruptable. */
goto fast_next_opcode;
}
_Py_Ticker = _Py_CheckInterval;
tstate->tick_counter++;

#ifdef WITH_THREAD
if (interpreter_lock) {
/* Give another thread a chance */

if (PyThreadState_Swap(NULL) != tstate) // 释放当前thread state
Py_FatalError("ceval: tstate mix-up");
PyThread_release_lock(interpreter_lock); // 当前线程释放GIL

/* Other threads may run now */

PyThread_acquire_lock(interpreter_lock, 1); // 其他线程或父线程获取GIL
if (PyThreadState_Swap(tstate) != NULL)
Py_FatalError("ceval: orphan tstate");

/* Check for thread interrupts */

if (tstate->async_exc != NULL) {
x = tstate->async_exc;
tstate->async_exc = NULL;
PyErr_SetNone(x);
Py_DECREF(x);
why = WHY_EXCEPTION;
goto on_error;
}
}
}

对于主线程创建子线程的python脚本,它们的执行顺序如下:

  1. 一开始只有主线程,它正在执行PyEval_EvalFrameEx,然后新建子线程,此时新建GIL锁,同时主线程获取GIL锁。
  2. 然后子线程执行t_bootstrap,在PyEval_AcquireThread()中阻塞,等待GIL锁。
  3. 主线程继续执行,同时_Py_Ticker不断减少,直到小于0时,主线程会将当前thread_state设置为空,然后释放GIL。
  4. 之前等待GIL的子线程会被操作系统唤醒。然后父线程再次请求GIL,会处于等待。
  5. 之后子线程的执行也会增加_PyTicker,也会经历小于0时的释放过程。

阻塞调用

对于某些调用比如time.sleep(), fp.read(), flock(fd, operation)等,都会将线程阻塞,此时也需要进行线程调度。

比如在flock函数,因为flock在不同的系统里面实现不一样,下面是linux的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static PyObject *
fcntl_flock(PyObject *self, PyObject *args)
{
int fd;
int code;
int ret;

if (!PyArg_ParseTuple(args, "O&i:flock",
conv_descriptor, &fd, &code))
return NULL;

#ifdef HAVE_FLOCK
Py_BEGIN_ALLOW_THREADS // 关键
ret = flock(fd, code);
Py_END_ALLOW_THREADS
#else
// 这里是没有flock()函数的平台代码
#endif /* HAVE_FLOCK */
if (ret < 0) {
PyErr_SetFromErrno(PyExc_IOError);
return NULL;
}
Py_INCREF(Py_None);
return Py_None;
}

在linux中如果这个锁被其他线程拿到,flock会阻塞线程。在ret = flock(fd, code);上下分别有两个宏,这两个宏展开是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#define Py_BEGIN_ALLOW_THREADS { \
PyThreadState *_save; \
_save = PyEval_SaveThread();
#define Py_END_ALLOW_THREADS PyEval_RestoreThread(_save); \
}


PyThreadState *
PyEval_SaveThread(void)
{
PyThreadState *tstate = PyThreadState_Swap(NULL);
if (interpreter_lock)
PyThread_release_lock(interpreter_lock);
return tstate;
}

void
PyEval_RestoreThread(PyThreadState *tstate)
{
if (interpreter_lock) {
PyThread_acquire_lock(interpreter_lock, 1);
}
PyThreadState_Swap(tstate);
}

Py_BEGIN_ALLOW_THREADS中主要释放了GIL,而Py_END_ALLOW_THREADS中子线程申请GIL

线程销毁

t_bootstrap中的PyThreadState_Clear(tstate), PyThreadState_DeleteCurrent(), PyThread_exit_thread()函数用于线程销毁。

  • PyThreadState_Clear(tstate):这个函数清空PyThreadState里面的变量。
  • PyThreadState_DeleteCurrent:删除当前线程对象,然后释放GIL。
  • PyThread_exit_thread():在pthread中是直接exit(0)退出线程。

线程锁与同步

虽然在python中GIL锁住了系统api提供的资源,比如fwrite这种,但是对于python自己的变量来说,需要python用户态的锁来保持一致性。

新建锁

在python中分配一个锁使用thread.allocate_lock()对应的c api是thread_PyThread_allocate_lock,再往里具体实现是newlockobject()

1
2
3
4
5
6
7
8
static lockobject *
newlockobject(void)
{
lockobject *self;
self = PyObject_New(lockobject, &Locktype);
self->lock_lock = PyThread_allocate_lock();
return self;
}

这里面的PyThread_allocate_lock()与平台相关,下面使用pthread来实现

1
2
3
4
5
6
7
8
9
10
11
PyThread_type_lock 
PyThread_allocate_lock(void)
{
sem_t *lock;
int status, error = 0;

lock = (sem_t *)malloc(sizeof(sem_t));
status = sem_init(lock,0,1);

return (PyThread_type_lock)lock;
}

获取锁

lock.acquire()lock_PyThread_acquire_lock实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static PyObject *
lock_PyThread_acquire_lock(lockobject *self, PyObject *args)
{
int i = 1;

if (!PyArg_ParseTuple(args, "|i:acquire", &i))
return NULL;

Py_BEGIN_ALLOW_THREADS
i = PyThread_acquire_lock(self->lock_lock, i);
Py_END_ALLOW_THREADS

return PyBool_FromLong((long)i);
}

因为获取锁会被阻塞,所以需要在执行前释放GIL,之后再获取。

释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static PyObject *
lock_PyThread_release_lock(lockobject *self)
{
/* Sanity check: the lock must be locked */
if (PyThread_acquire_lock(self->lock_lock, 0)) {
PyThread_release_lock(self->lock_lock);
PyErr_SetString(ThreadError, "release unlocked lock");
return NULL;
}

PyThread_release_lock(self->lock_lock);
Py_INCREF(Py_None);
return Py_None;
}

在释放前需要确保当前线程获取了锁。

总结

通过上面的分析,我觉得python的多线程更多的被GIL制约,只有阻塞的系统调用,和请求锁的时候GIL才会被释放,这导致同一时间只有一个线程处于执行这会导致python在多线程计算时没有效果。