建立多线程环境 python只有在运行module thread的start_new_thread
方法后会启动多线程模式,即创建GIL锁。start_new_thread
对应的实现是threadmdule.c里的thread_PyThread_start_new_thread
,这个函数首先完成下面操作:
1 2 3 4 5 6 7 8 9 boot = PyMem_NEW(struct bootstate, 1 ); boot->interp = PyThreadState_GET()->interp; boot->func = func; boot->args = args; boot->keyw = keyw; PyEval_InitThreads(); ident = PyThread_start_new_thread(t_bootstrap, (void *) boot);
初始化bootstate结构,这个结构保存线程信息
初始化python多线程环境,PyEval_InitThreads()
使用bootstate结构作为参数,创建系统原生线程,PyThread_start_new_thread
在PyEval_InitThreads()
会查看是否已经新建GIL,如果没有,使用PyThread_allocate_lock
新建,然后主线程请求锁PyThread_acquire_lock
这些函数的实现与操作系统相关。同时专门会有一个main_thread变量存放主线程pid。
子线程创建 在创建多线程环境的最后一行调用PyThread_start_new_thread
来创建线程,这个函数根据不同系统有着不同的实现,下面是pthread的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 long PyThread_start_new_thread (void (*func)(void *), void *arg) { pthread_t th; int status; pthread_attr_t attrs; size_t tss; if (pthread_attr_init(&attrs) != 0 ) return -1 ; tss = (_pythread_stacksize != 0 ) ? _pythread_stacksize : THREAD_STACK_SIZE; if (tss != 0 ) { if (pthread_attr_setstacksize(&attrs, tss) != 0 ) { pthread_attr_destroy(&attrs); return -1 ; } } pthread_attr_setscope(&attrs, PTHREAD_SCOPE_SYSTEM); status = pthread_create(&th, &attrs, (pthread_attr_t *)NULL , (void * (*)(void *))func, (void *)arg ); pthread_attr_destroy(&attrs); if (status != 0 ) return -1 ; pthread_detach(th); return (long ) th; }
关键在pthread_create
上调用传入的func以及传入arg,这两个参数分别是t_bootstrap
和bootstate
结构的boot,boot里面包括要执行的python函数、解释器。
子线程运行 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static void t_bootstrap (void *boot_raw) { struct bootstate *boot = (struct bootstate *) boot_raw; PyThreadState *tstate; PyObject *res; tstate = PyThreadState_New(boot->interp); PyEval_AcquireThread(tstate); res = PyEval_CallObjectWithKeywords(boot->func, boot->args, boot->keyw); Py_DECREF(boot->func); Py_DECREF(boot->args); Py_XDECREF(boot->keyw); PyMem_DEL(boot_raw); PyThreadState_Clear(tstate); PyThreadState_DeleteCurrent(); PyThread_exit_thread(); }
在t_bootstrap中分别执行:
PyEval_AcquireThread()
, 请求GIL锁,这个锁目前是在main_thread
中。
将current thread state交互为当前子线程的state
使用PyEval_VallObjectWithKeywords
来执行python函数。
这个函数内部执行PyObject_call
PyObject_call
会调用
1 2 3 4 5 6 7 8 PyObject * PyObject_Call (PyObject *func, PyObject *arg, PyObject *kw) { ternaryfunc call; call = func->ob_type->tp_call PyObject *result = (*call)(func, arg, kw); return result; }
执行完成后,释放GIL。
线程运行环境 在t_bootstrap
中会新建PyThreadState
,这个结构体保存frame、interpreter、thread_id、next_state等信息。
Python内部维护了一个State对象链表,当切换当前state时,会到这个链表上寻找。每个链表元素会保存thread_id、key、以及指向ThreadState的指针。
1 2 3 4 5 6 7 8 9 struct key { struct key *next ; long id; int key; void *value; };
同时提供set_key_value、get_key_value、delete_key这几个api来操作这个链表。在子进程t_bootstrap
创建函数中使用PyThreadState_New
来创建state,同时放入key链表中。
子线程在获取GIL后,赶紧把当前state切换成自己的。之后进入PyEval_CallObjectWithKeywords
,最后进入虚拟机的执行。
线程调度 python多线程使用系统原生的线程。python字节码解释器会按照指令的顺序一条条执行,python内部维护一个时钟值,每运行时钟值个指令后会启动线程调度(同时用于检测是否有异步事件发生)。
1 2 3 >>> import sys>>> sys.getcheckinterval() 100
在进入虚拟机后,主要靠PyEval_EvalFrameEx
来执行信息调度。
在Python/ceval.c/PyEval_EvalFrameEx函数中有个大循环:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 for (;;){ if (--_Py_Ticker < 0 ) { if (*next_instr == SETUP_FINALLY) { goto fast_next_opcode; } _Py_Ticker = _Py_CheckInterval; tstate->tick_counter++; #ifdef WITH_THREAD if (interpreter_lock) { if (PyThreadState_Swap(NULL ) != tstate) Py_FatalError("ceval: tstate mix-up" ); PyThread_release_lock(interpreter_lock); PyThread_acquire_lock(interpreter_lock, 1 ); if (PyThreadState_Swap(tstate) != NULL ) Py_FatalError("ceval: orphan tstate" ); if (tstate->async_exc != NULL ) { x = tstate->async_exc; tstate->async_exc = NULL ; PyErr_SetNone(x); Py_DECREF(x); why = WHY_EXCEPTION; goto on_error; } } }
对于主线程创建子线程的python脚本,它们的执行顺序如下:
一开始只有主线程,它正在执行PyEval_EvalFrameEx
,然后新建子线程,此时新建GIL锁,同时主线程获取GIL锁。
然后子线程执行t_bootstrap
,在PyEval_AcquireThread()
中阻塞,等待GIL锁。
主线程继续执行,同时_Py_Ticker不断减少,直到小于0时,主线程会将当前thread_state设置为空,然后释放GIL。
之前等待GIL的子线程会被操作系统唤醒。然后父线程再次请求GIL,会处于等待。
之后子线程的执行也会增加_PyTicker,也会经历小于0时的释放过程。
阻塞调用 对于某些调用比如time.sleep(), fp.read(), flock(fd, operation)
等,都会将线程阻塞,此时也需要进行线程调度。
比如在flock函数,因为flock在不同的系统里面实现不一样,下面是linux的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 static PyObject *fcntl_flock (PyObject *self, PyObject *args) { int fd; int code; int ret; if (!PyArg_ParseTuple(args, "O&i:flock" , conv_descriptor, &fd, &code)) return NULL ; #ifdef HAVE_FLOCK Py_BEGIN_ALLOW_THREADS ret = flock(fd, code); Py_END_ALLOW_THREADS #else #endif if (ret < 0 ) { PyErr_SetFromErrno(PyExc_IOError); return NULL ; } Py_INCREF(Py_None); return Py_None; }
在linux中如果这个锁被其他线程拿到,flock
会阻塞线程。在ret = flock(fd, code);
上下分别有两个宏,这两个宏展开是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #define Py_BEGIN_ALLOW_THREADS { \ PyThreadState *_save; \ _save = PyEval_SaveThread(); #define Py_END_ALLOW_THREADS PyEval_RestoreThread(_save); \ } PyThreadState * PyEval_SaveThread (void ) { PyThreadState *tstate = PyThreadState_Swap(NULL ); if (interpreter_lock) PyThread_release_lock(interpreter_lock); return tstate; } void PyEval_RestoreThread (PyThreadState *tstate) { if (interpreter_lock) { PyThread_acquire_lock(interpreter_lock, 1 ); } PyThreadState_Swap(tstate); }
在Py_BEGIN_ALLOW_THREADS
中主要释放了GIL,而Py_END_ALLOW_THREADS
中子线程申请GIL
线程销毁 在t_bootstrap
中的PyThreadState_Clear(tstate)
, PyThreadState_DeleteCurrent()
, PyThread_exit_thread()
函数用于线程销毁。
PyThreadState_Clear(tstate)
:这个函数清空PyThreadState里面的变量。
PyThreadState_DeleteCurrent
:删除当前线程对象,然后释放GIL。
PyThread_exit_thread()
:在pthread中是直接exit(0)退出线程。
线程锁与同步 虽然在python中GIL锁住了系统api提供的资源,比如fwrite这种,但是对于python自己的变量来说,需要python用户态的锁来保持一致性。
新建锁 在python中分配一个锁使用thread.allocate_lock()
对应的c api是thread_PyThread_allocate_lock
,再往里具体实现是newlockobject()
1 2 3 4 5 6 7 8 static lockobject *newlockobject (void ) { lockobject *self; self = PyObject_New(lockobject, &Locktype); self->lock_lock = PyThread_allocate_lock(); return self; }
这里面的PyThread_allocate_lock()
与平台相关,下面使用pthread来实现
1 2 3 4 5 6 7 8 9 10 11 PyThread_type_lock PyThread_allocate_lock (void ) { sem_t *lock; int status, error = 0 ; lock = (sem_t *)malloc (sizeof (sem_t )); status = sem_init(lock,0 ,1 ); return (PyThread_type_lock)lock; }
获取锁 lock.acquire()
由lock_PyThread_acquire_lock
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 static PyObject *lock_PyThread_acquire_lock (lockobject *self, PyObject *args) { int i = 1 ; if (!PyArg_ParseTuple(args, "|i:acquire" , &i)) return NULL ; Py_BEGIN_ALLOW_THREADS i = PyThread_acquire_lock(self->lock_lock, i); Py_END_ALLOW_THREADS return PyBool_FromLong ((long )i) ;}
因为获取锁会被阻塞,所以需要在执行前释放GIL,之后再获取。
释放锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 static PyObject *lock_PyThread_release_lock (lockobject *self) { if (PyThread_acquire_lock(self->lock_lock, 0 )) { PyThread_release_lock(self->lock_lock); PyErr_SetString(ThreadError, "release unlocked lock" ); return NULL ; } PyThread_release_lock(self->lock_lock); Py_INCREF(Py_None); return Py_None; }
在释放前需要确保当前线程获取了锁。
总结 通过上面的分析,我觉得python的多线程更多的被GIL制约,只有阻塞的系统调用,和请求锁的时候GIL才会被释放,这导致同一时间只有一个线程处于执行这会导致python在多线程计算时没有效果。