一個邏輯完備的線程池
掃描二維碼
隨時隨地手機(jī)看文章
開源項(xiàng)目Workflow中有個重要的基礎(chǔ)模塊:
代碼僅300行的C語言線程池。
本文會伴隨源碼分析,而邏輯完備、對稱無差別的特點(diǎn)于第3部分開始
歡迎跳閱, 或直接到Github主頁上圍觀代碼?
https://github.com/sogou/workflow/blob/master/src/kernel/thrdpool.c
? 0 - Workflow的thrdpool
作為目前Github上炙手可熱的異步調(diào)度引擎
Workflow有一個大招是:
計(jì)算通信融為一體。
而計(jì)算的核心:Executor調(diào)度器,
就是基于這個線程池實(shí)現(xiàn)的。
可以說,一個通用而高效的線程池,
是我們寫C/C++代碼時離不開的基礎(chǔ)模塊。
thrdpool代碼位置在src/kernel/,
不僅可以直接拿來使用,
同時也適合閱讀學(xué)習(xí)。
而更重要的,秉承Workflow項(xiàng)目本身
一貫的嚴(yán)謹(jǐn)極簡的作風(fēng),
這個thrdpool代碼極致簡潔,
實(shí)現(xiàn)邏輯上亦非常完備,
結(jié)構(gòu)精巧,處處嚴(yán)謹(jǐn),
復(fù)雜的并發(fā)處理依然可以對稱無差別,
不得不讓我驚嘆:妙?。。?!?
你可能會很好奇,
線程池還能寫出什么別致的新思路嗎?
先列出一些,你們細(xì)品:
-
特點(diǎn)1:創(chuàng)建完線程池后,無需記錄任何線程id或?qū)ο?,線程池可以通過一個等一個的方式優(yōu)雅地去結(jié)束所有線程;? 也就是說,每一個線程都是對等的
-
特點(diǎn)2:線程任務(wù)可以由另一個線程任務(wù)調(diào)起;甚至線程池正在被銷毀時也可以提交下一個任務(wù);(這很重要,因?yàn)榫€程本身很可能是不知道線程池的狀態(tài)的;? 即,每一個任務(wù)也是對等的
-
特點(diǎn)3:同理,線程任務(wù)也可以銷毀這個線程池;(非常完整~? 每一種行為也是對等的,包括destroy
我真的迫不及待為大家深層解讀一下,
這個我愿稱之為“邏輯完備”的線程池。
? 1 - 前置知識
第一部分我先梳理一些基本內(nèi)容梳理,
有基礎(chǔ)的小伙伴可以直接跳過。
如果有不準(zhǔn)確的地方,歡迎大家指正交流~
Question: 為什么需要線程池?
其實(shí)思路不僅對線程池,
對任何有限資源的調(diào)度管理都是類似的。
我們知道,
通過pthread或者std::thread創(chuàng)建線程,
就可以實(shí)現(xiàn)多線程并發(fā)執(zhí)行我們的代碼。
但是CPU的核數(shù)是固定的,
所以真正并行執(zhí)行的最大值也是固定的,
過多的線程除了頻繁創(chuàng)建產(chǎn)生overhead以外,
還會導(dǎo)致對系統(tǒng)資源進(jìn)行爭搶,
這些都是不必要的浪費(fèi)。
因此我們可以管理有限個線程,
循環(huán)且合理地利用它們。??
那么線程池一般包含哪些內(nèi)容呢?
- 首先是管理若干個工具人線程;
- 其次是管理交給線程去執(zhí)行的任務(wù),這個一般會有一個隊(duì)列;
- 再然后線程之間需要一些同步機(jī)制,比如mutex、condition等;
- 最后就是各線程池實(shí)現(xiàn)上自身需要的其他內(nèi)容了;
接下來我們看看Workflow的thrdpool是怎么做的。
? 2 - 代碼概覽
以下共7步常用思路,
足以讓我們把代碼飛快過一遍。
第1步:先看頭文件,有什么接口。
我們打開thrdpool.h,只需關(guān)注這三個:
// 創(chuàng)建線程池 thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize); // 把任務(wù)交給線程池的入口 int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool); // 銷毀線程池 void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), thrdpool_t *pool);
第2步:接口上有什么數(shù)據(jù)結(jié)構(gòu)。
即,我們?nèi)绾蚊枋鲆粋€交給線程池的任務(wù)。
struct thrdpool_task { void (*routine)(void *); // 函數(shù)指針 void *context; // 上下文 };
第3步:再看實(shí)現(xiàn).c,內(nèi)部數(shù)據(jù)結(jié)構(gòu)。
struct __thrdpool { struct list_head task_queue;// 任務(wù)隊(duì)列 size_t nthreads; // 線程個數(shù) size_t stacksize; // 構(gòu)造線程時的參數(shù) pthread_t tid; // 運(yùn)行期間記錄的是個zero值 pthread_mutex_t mutex; pthread_cond_t cond; pthread_key_t key; pthread_cond_t *terminate; };
沒有一個多余,每一個成員都很到位:
- tid:線程id,整個線程池只有一個,它不會奇怪地去記錄任何一個線程的id,這樣就不完美了?,它平時運(yùn)行的時候是空值,退出的時候,它是用來實(shí)現(xiàn)鏈?zhǔn)降却年P(guān)鍵。
- mutex 和 cond是常見的線程間同步的工具,其中這個cond是用來給生產(chǎn)者和消費(fèi)者去操作任務(wù)隊(duì)列用的。
- key:是線程池的key,然后會賦予給每個由線程池創(chuàng)建的線程作為他們的thread local,用于區(qū)分這個線程是否是線程池創(chuàng)建的。
- 一個pthread_cond_t *terminate,這有兩個用途:不僅是退出時的標(biāo)記位 ,而且還是調(diào)用退出的那個人要等待的condition。
以上各個成員的用途,
好像說了,又好像沒說,?
是因?yàn)?strong>幾乎每一個成員都值得深挖一下,
所以我們記住它們,
后面看代碼的時候就會豁然開朗!?
第4步:接口都調(diào)用了什么核心函數(shù)。
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize) { thrdpool_t *pool; ret = pthread_key_create(&pool->key, NULL); if (ret == 0) { // 去掉了其他代碼,但是注意到剛才的tid和terminate的賦值 memset(&pool->tid, 0, sizeof (pthread_t)); pool->terminate = NULL; if (__thrdpool_create_threads(nthreads, pool) >= 0) return pool; ...
這里可以看到
__thrdpool_create_threads()里邊
最關(guān)鍵的,就是循環(huán)創(chuàng)建nthreads個線程。
while (pool->nthreads < nthreads) { ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); ...
第5步:略讀核心函數(shù)的功能。
所以我們在上一步知道了,
每個線程執(zhí)行的是__thrdpool_routine()
不難想象,它會不停從隊(duì)列拿任務(wù)出來執(zhí)行:
static void *__thrdpool_routine(void *arg) { ... while (1) { // 1. 從隊(duì)列里拿一個任務(wù)出來,沒有就等待 pthread_mutex_lock(&pool->mutex); while (!pool->terminate && list_empty(&pool->task_queue)) pthread_cond_wait(&pool->cond, &pool->mutex); // 2. 線程池結(jié)束的標(biāo)志位,記住它,先跳過 if (pool->terminate) break; // 3. 如果能走到這里,恭喜你,拿到了任務(wù)~ entry = list_entry(*pos, struct __thrdpool_task_entry, list); list_del(*pos); // 4. 先解鎖 pthread_mutex_unlock(&pool->mutex); task_routine = entry->task.routine; task_context = entry->task.context; free(entry); // 5. 再執(zhí)行 task_routine(task_context); // 6. 這里也先記住它,意思是線程池里的線程可以銷毀線程池 if (pool->nthreads == 0) { /* Thread pool was destroyed by the task. */ free(pool); return NULL; } } ... // 后面還有魔法,留下一章解讀~~~
第6步:把函數(shù)之間的關(guān)系聯(lián)系起來。
剛才看到的__thrdpool_routine()
就是線程的核心函數(shù)了,
它可以和誰關(guān)聯(lián)起來呢?
可以和接口thrdpool_schedule()關(guān)聯(lián)上
我們說過,線程池上有個隊(duì)列管理任務(wù):
- 每個執(zhí)行routine的線程,都是消費(fèi)者
- 每個發(fā)起schedule的線程,都是生產(chǎn)者
我們已經(jīng)看過消費(fèi)者了,來看看生產(chǎn)者的代碼:
inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf, thrdpool_t *pool) { struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf; entry->task = *task; pthread_mutex_lock(&pool->mutex); // 添加到隊(duì)列里 list_add_tail(&entry->list, &pool->task_queue); // 叫醒在等待的線程 pthread_cond_signal(&pool->cond); pthread_mutex_unlock(&pool->mutex); }
說到這里,特點(diǎn)2就非常清晰了:
開篇說的特點(diǎn)2是說,
”線程任務(wù)可以由另一個線程任務(wù)調(diào)起”。
只要對隊(duì)列的管理做得好,
顯然消費(fèi)者所執(zhí)行的函數(shù)里也可以生產(chǎn)
第7步:看其他情況的處理
對于線程池來說就是比如銷毀的情況。
接口thrdpool_destroy()實(shí)現(xiàn)非常簡單:
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), thrdpool_t *pool) { ... // 1. 內(nèi)部會設(shè)置pool->terminate,并叫醒所有等在隊(duì)列拿任務(wù)的線程 __thrdpool_terminate(in_pool, pool); // 2. 把隊(duì)列里還沒有執(zhí)行的任務(wù)都拿出來,通過pending返回給用戶 list_for_each_safe(pos, tmp, &pool->task_queue) { entry = list_entry(pos, struct __thrdpool_task_entry, list); list_del(pos); if (pending) pending(&entry->task); ... // 后面就是銷毀各種內(nèi)存,同樣有魔法~
在退出的時候,
我們那些已經(jīng)提交但是還沒有被執(zhí)行的任務(wù)
是絕對不能就這么扔掉了的,
于是我們可以傳入一個pending()函數(shù),
上層可以做自己的回收、回調(diào)、
或任何保證上層邏輯完備的事情。
設(shè)計(jì)的完整性,無處不在。
接下來我們就可以跟著我們的核心問題,
針對性地看看每個特點(diǎn)都是怎么實(shí)現(xiàn)的。
? 3 - 特點(diǎn)1: 一個等待一個的優(yōu)雅退出
這里提出一個問題:
線程池要退出,如何結(jié)束所有線程?
一般線程池的實(shí)現(xiàn)都是
需要記錄下所有的線程id,
或者thread對象,
以便于我們?nèi)ビ?strong>join方法等待它們結(jié)束。
不嚴(yán)格地用join收拾干凈會有什么問題?
最直觀的,模塊退出時很可能會報(bào)內(nèi)存泄漏
但是我們剛才看,
pool里并沒有記錄所有的tid呀?
正如開篇說的,
pool上只有一個tid,而且還是個空的值。
而特點(diǎn)1正是Workflow thrdpool的答案:
無需記錄所有線程,我可以讓線程挨個自動退出、且一個等待下一個,最終達(dá)到調(diào)用完thrdpool_destroy()后內(nèi)存回收干凈的目的。
這里先給一個簡單的圖,
假設(shè)發(fā)起destroy的人是main線程,
我們?nèi)绾巫龅揭粋€等一個退出:
步驟如下:
- 線程的退出,由thrdpool_destroy()設(shè)置pool->terminate開始。
- 我們每個線程,在while(1) 里會第一時間發(fā)現(xiàn)terminate,線程池要退出了,然后會break出這個while循環(huán)。
- 注意這個時候,還持有著mutex鎖,我們拿出pool上唯一的那個tid,放到我的臨時變量,我會根據(jù)拿出來的值做不同的處理。且我會把我自己的tid放上去,然后再解mutex鎖。
- 那么很顯然,第一個從pool上拿tid的人,會發(fā)現(xiàn)這是個0值,就可以直接結(jié)束了,不用負(fù)責(zé)等待任何其他人,但我在完全結(jié)束之前需要有人負(fù)責(zé)等待我的結(jié)束,所以我會把我的id放上去。
- 而如果發(fā)現(xiàn)自己從pool里拿到的tid不是0值,說明我要負(fù)責(zé)join上一個人,并且把我的tid放上去,讓下一個人負(fù)責(zé)我。
- 最后的那個人,是那個發(fā)現(xiàn)pool->nthreads為0的人,那么我就可以通過這個terminate(它本身是個condition)去通知發(fā)起destroy的人。
- 最后發(fā)起者就可以退了。?
是不是非常有意思!!!
非常優(yōu)雅的做法!??!?
所以我們會發(fā)現(xiàn),
其實(shí)大家不太需要知道太多信息,
只需要知道我要負(fù)責(zé)的上一個人。
當(dāng)然每一步都是非常嚴(yán)謹(jǐn)?shù)模?br /> 結(jié)合剛才跳過的第一段魔法?感受一下:
static void *__thrdpool_routine(void *arg) { while (1) { // 1.注意這里還持有鎖 pthread_mutex_lock(&pool->mutex); ... // 等著隊(duì)列拿任務(wù)出來 // 2. 這既是標(biāo)識位,也是發(fā)起銷毀的那個人所等待的condition if (pool->terminate) break; ... // 執(zhí)行拿到的任務(wù) } /* One thread joins another. Don't need to keep all thread IDs. */ // 3. 把線程池上記錄的那個tid拿下來,我來負(fù)責(zé)上一人 tid = pool->tid; // 4. 把我自己記錄到線程池上,下一個人來負(fù)責(zé)我 pool->tid = pthread_self(); // 5. 每個人都減1,最后一個人負(fù)責(zé)叫醒發(fā)起detroy的人 if (--pool->nthreads == 0) pthread_cond_signal(pool->terminate); // 6. 這里可以解鎖進(jìn)行等待了 pthread_mutex_unlock(&pool->mutex); // 7. 只有第一個人拿到0值 if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0) // 8. 只要不0值,我就要負(fù)責(zé)等上一個結(jié)束才能退 pthread_join(tid, NULL); return NULL; // 9. 退出,干干凈凈~ }
? 4 - 特點(diǎn)2:線程任務(wù)可以由另一個線程任務(wù)調(diào)起
在第二部分我們看過源碼,
只要隊(duì)列管理得好,
線程任務(wù)里提交下一個任務(wù)是完全OK的。
這很合理。?
那么問題來了,
特點(diǎn)1又說,我們每個線程,
是不需要知道太多線程池的狀態(tài)和信息的。
而線程池的銷毀是個過程,
如果在這個過程間提交任務(wù)會怎么樣呢?
因此特點(diǎn)2的一個重要解讀是:
線程池被銷毀時也可以提交下一個任務(wù)。
而且剛才提過,
還沒有被執(zhí)行的任務(wù),
可以通過我們傳入的pending()函數(shù)拿回來。
簡單看看銷毀時的嚴(yán)謹(jǐn)做法:
static void __thrdpool_terminate(int in_pool, thrdpool_t *pool) { pthread_cond_t term = PTHREAD_COND_INITIALIZER; pthread_mutex_lock(&pool->mutex); // 1. 加鎖設(shè)置標(biāo)志位,之后的添加任務(wù)不會被執(zhí)行,但可以pending拿到 pool->terminate = &term; // 2. 廣播所有等待的消費(fèi)者 pthread_cond_broadcast(&pool->cond); if (in_pool) // 3. 這里的魔法等下講>_<~ { /* Thread pool destroyed in a pool thread is legal. */ pthread_detach(pthread_self()); pool->nthreads--; } // 4. 如果還有線程沒有退完,我會等,注意這里是while while (pool->nthreads > 0) pthread_cond_wait(&term, &pool->mutex); pthread_mutex_unlock(&pool->mutex); // 5.同樣地等待打算退出的上一個人 if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0) pthread_join(pool->tid, NULL); }
? 5 - 特點(diǎn)3:同樣可以在線程任務(wù)里銷毀這個線程池
既然線程任務(wù)可以做任何事情,
理論上,線程任務(wù)也可以銷毀線程池?
作為一個邏輯完備的線程池,
大膽一點(diǎn),
我們把問號去掉。
而且,銷毀并不會結(jié)束當(dāng)前任務(wù),
它會等這個任務(wù)執(zhí)行完。
想象一下,剛才的__thrdpool_routine(),
while(1)里拿出來的那個任務(wù),
做的事情竟然是發(fā)起thrdpool_destroy()...
把上面的圖大膽改一下:?
如果發(fā)起銷毀的人,
是我們自己內(nèi)部的線程,
那么我們就不是等n個,而是等n-1,
少了一個外部線程等待我們。
如何實(shí)現(xiàn)才能讓這些邏輯都完美融合呢?
我們把剛才跳過的三段魔法串起來看看。
? 第一段魔法,銷毀的發(fā)起者。
如果發(fā)起銷毀的人是線程池內(nèi)部的線程,
那么它具有較強(qiáng)的自我管理意識
(因?yàn)榍懊嬲f了,會等它這個任務(wù)執(zhí)行完)
而我們可以放心大膽地pthread_detach,
無需任何人join它等待它結(jié)束。
static void __thrdpool_terminate(int in_pool, thrdpool_t *pool) { ... // 每個由線程池創(chuàng)建的線程都設(shè)置了一個key,由此判斷是否是in_pool if (in_pool) { /* Thread pool destroyed in a pool thread is legal. */ pthread_detach(pthread_self()); pool->nthreads--; }
? 第二段魔法:線程池誰來free?
一定是發(fā)起銷毀的那個人。
所以這里用in_pool來判斷是否是外部的人:
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), thrdpool_t *pool) { // 已經(jīng)調(diào)用完第一段,且挨個pending(未執(zhí)行的task)了 // 銷毀其他內(nèi)部分配的內(nèi)存 ... // 如果不是內(nèi)部線程發(fā)起的銷毀,要負(fù)責(zé)回收線程池內(nèi)存 if (!in_pool) free(pool); }
那現(xiàn)在不是main線程發(fā)起的銷毀呢?
發(fā)起的銷毀的那個內(nèi)部線程,
怎么能保證我可以在最后關(guān)頭
把所有資源回收干凈、調(diào)free(pool)、
最后功成身退呢?
在前面閱讀源碼第5步,其實(shí)我們看過,
__thrdpool_routine()里有free的地方。
于是現(xiàn)在三段魔法終于串起來了。
? 第三段魔法:嚴(yán)謹(jǐn)?shù)牟l(fā)。
static void *__thrdpool_routine(void *arg) { while (1) { // ... task_routine(task_context); // 如果routine里做的事情,是銷毀線程池... // 注意這個時候,其他內(nèi)存都已經(jīng)被destroy里清掉了,萬萬不可以再用什么mutex、cond if (pool->nthreads == 0) { /* Thread pool was destroyed by the task. */ free(pool); return NULL; } ...
非常重要的一點(diǎn),
由于并發(fā),我們是不知道誰先操作的。
假設(shè)我們稍微改一改這個順序,
就又是另一番邏輯。
比如我作為一個內(nèi)部線程,
在routine()里調(diào)用destroy()期間,
發(fā)現(xiàn)還有線程沒有執(zhí)行完,
我就要等在我的terminate上,
待最后看到nthreads==0的那個人叫醒我。
然后,我的代碼繼續(xù)執(zhí)行,
函數(shù)棧就會從destroy()回到routine(),
也就是上面那幾行,
再然后就可以free(pool);
由于這時候我已經(jīng)放飛自我detach了,
于是一切順利結(jié)束。
你看,無論如何都可以完美地銷毀線程池:
并發(fā)是復(fù)雜多變的,代碼是簡潔統(tǒng)一的
是不是太妙了!
我寫到這里已經(jīng)要感動哭了!?
? 6 - 簡單的用法
這個線程池只有兩個文件:
thrdpool.h和thrdpool.c,
而且只依賴內(nèi)核的數(shù)據(jù)結(jié)構(gòu)list.h。
我們把它拿出來玩,自己寫一段代碼:
void my_routine(void *context) { // 我們要執(zhí)行的函數(shù) printf("task-%llu start.\n", reinterpret_cast<unsigned long long>(context); ); } void my_pending(const struct thrdpool_task *task) { // 線程池銷毀后,沒執(zhí)行的任務(wù)會到這里 printf("pending task-%llu.\n", reinterpret_cast<unsigned long long>(task->context);); } int main() { thrdpool_t *thrd_pool = thrdpool_create(3, 1024); // 創(chuàng)建 struct thrdpool_task task; unsigned long long i; for (i = 0; i < 5; i++) { task.routine = &my_routine; task.context = reinterpret_cast<void *>(i); thrdpool_schedule(&task, thrd_pool); // 調(diào)用 } getchar(); // 卡住主線程,按回車?yán)^續(xù) thrdpool_destroy(&my_pending, thrd_pool); // 結(jié)束 return 0; }
再打印幾行l(wèi)og,直接編譯就可以跑起來:
簡單程度堪比大一上學(xué)期C語言作業(yè)。?
? 7 - 并發(fā)與結(jié)構(gòu)之美
最后談?wù)劯惺堋?
看完之后我很后悔為什么沒有早點(diǎn)看
為什么不早點(diǎn)就可以獲得知識的感覺,
并且在淺層看懂之際,
我知道自己肯定沒有完全理解到里邊的精髓,
畢竟我不能深刻地理解到
設(shè)計(jì)者當(dāng)時對并發(fā)的構(gòu)思和模型上的選擇。
我只能說,
沒有十多年頂級的系統(tǒng)調(diào)用和并發(fā)編程的功底
寫不出這樣的代碼,
沒有極致的審美與對品控的偏執(zhí)
也寫不出這樣的代碼。
并發(fā)編程有很多說道,
就正如退出這個這么簡單的事情,
想要做到退出時回收干凈卻很難。
如果說你寫業(yè)務(wù)邏輯自己管線程,
退出什么的sleep(1)都無所謂,
但如果說做框架的人
不能把自己的框架做得完美無暇邏輯自洽
就難免讓人感覺差點(diǎn)意思。
而這個thrdpool,它作為一個線程池,
是如此的邏輯完備,
用最對稱簡潔的方式去面對復(fù)雜的并發(fā)。
再次讓我深深地感到震撼:
我們身邊那些原始的、底層的、基礎(chǔ)的代碼,
還有很多新思路,
還可以寫得如此美。
Workflow項(xiàng)目源碼地址:
https://github.com/sogou/workflow





