日本黄色一级经典视频|伊人久久精品视频|亚洲黄色色周成人视频九九九|av免费网址黄色小短片|黄色Av无码亚洲成年人|亚洲1区2区3区无码|真人黄片免费观看|无码一级小说欧美日免费三级|日韩中文字幕91在线看|精品久久久无码中文字幕边打电话

當(dāng)前位置:首頁 > 單片機(jī) > 程序喵大人

開源項(xiàng)目Workflow中有個重要的基礎(chǔ)模塊:
代碼僅300行的C語言線程池。

本文會伴隨源碼分析,而邏輯完備、對稱無差別的特點(diǎn)于第3部分開始
歡迎跳閱, 或直接到Github主頁上圍觀代碼?

https://github.com/sogou/workflow/blob/master/src/kernel/thrdpool.c

? 0 - Workflow的thrdpool

作為目前Github上炙手可熱的異步調(diào)度引擎
Workflow有一個大招是:
計(jì)算通信融為一體。
而計(jì)算的核心:Executor調(diào)度器,
就是基于這個線程池實(shí)現(xiàn)的。
可以說,一個通用而高效的線程池,
是我們寫C/C++代碼時離不開的基礎(chǔ)模塊。

thrdpool代碼位置在src/kernel/
不僅可以直接拿來使用,
同時也適合閱讀學(xué)習(xí)。

而更重要的,秉承Workflow項(xiàng)目本身
一貫的嚴(yán)謹(jǐn)極簡的作風(fēng),
這個thrdpool代碼極致簡潔,
實(shí)現(xiàn)邏輯上亦非常完備,
結(jié)構(gòu)精巧,處處嚴(yán)謹(jǐn),
復(fù)雜的并發(fā)處理依然可以對稱無差別,
不得不讓我驚嘆:妙?。。?!?

你可能會很好奇,
線程池還能寫出什么別致的新思路嗎?
先列出一些,你們細(xì)品:

  • 特點(diǎn)1:創(chuàng)建完線程池后,無需記錄任何線程id或?qū)ο?,線程池可以通過一個等一個的方式優(yōu)雅地去結(jié)束所有線程;? 也就是說,每一個線程都是對等的
  • 特點(diǎn)2:線程任務(wù)可以由另一個線程任務(wù)調(diào)起;甚至線程池正在被銷毀時也可以提交下一個任務(wù);(這很重要,因?yàn)榫€程本身很可能是不知道線程池的狀態(tài)的;? 即,每一個任務(wù)也是對等的
  • 特點(diǎn)3:同理,線程任務(wù)也可以銷毀這個線程池;(非常完整~? 每一種行為也是對等的,包括destroy

我真的迫不及待為大家深層解讀一下,
這個我愿稱之為“邏輯完備”的線程池。

? 1 - 前置知識

第一部分我先梳理一些基本內(nèi)容梳理,
有基礎(chǔ)的小伙伴可以直接跳過。
如果有不準(zhǔn)確的地方,歡迎大家指正交流~

Question: 為什么需要線程池?
其實(shí)思路不僅對線程池,
對任何有限資源的調(diào)度管理都是類似的。

我們知道,
通過pthread或者std::thread創(chuàng)建線程,
就可以實(shí)現(xiàn)多線程并發(fā)執(zhí)行我們的代碼。

但是CPU的核數(shù)是固定的,
所以真正并行執(zhí)行的最大值也是固定的,
過多的線程除了頻繁創(chuàng)建產(chǎn)生overhead以外,
還會導(dǎo)致對系統(tǒng)資源進(jìn)行爭搶
,
這些都是不必要的浪費(fèi)。

因此我們可以管理有限個線程,
循環(huán)且合理地利用它們。??

那么線程池一般包含哪些內(nèi)容呢?

  • 首先是管理若干個工具人線程;
  • 其次是管理交給線程去執(zhí)行的任務(wù),這個一般會有一個隊(duì)列;
  • 再然后線程之間需要一些同步機(jī)制,比如mutex、condition等;
  • 最后就是各線程池實(shí)現(xiàn)上自身需要的其他內(nèi)容了;

接下來我們看看Workflowthrdpool是怎么做的。

? 2 - 代碼概覽

以下共7步常用思路,
足以讓我們把代碼飛快過一遍。

第1步:先看頭文件,有什么接口。

我們打開thrdpool.h,只需關(guān)注這三個:

// 創(chuàng)建線程池 thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize); // 把任務(wù)交給線程池的入口 int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool); // 銷毀線程池 void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), thrdpool_t *pool);

第2步:接口上有什么數(shù)據(jù)結(jié)構(gòu)。

即,我們?nèi)绾蚊枋鲆粋€交給線程池的任務(wù)。

struct thrdpool_task { void (*routine)(void *); // 函數(shù)指針 void *context; // 上下文 };

第3步:再看實(shí)現(xiàn).c,內(nèi)部數(shù)據(jù)結(jié)構(gòu)。

struct __thrdpool { struct list_head task_queue;// 任務(wù)隊(duì)列 size_t nthreads; // 線程個數(shù) size_t stacksize; // 構(gòu)造線程時的參數(shù) pthread_t tid; // 運(yùn)行期間記錄的是個zero值 pthread_mutex_t mutex; pthread_cond_t cond; pthread_key_t key; pthread_cond_t *terminate;
};

沒有一個多余,每一個成員都很到位:

  1. tid:線程id,整個線程池只有一個,它不會奇怪地去記錄任何一個線程的id,這樣就不完美了?,它平時運(yùn)行的時候是空值,退出的時候,它是用來實(shí)現(xiàn)鏈?zhǔn)降却年P(guān)鍵。
  2. mutexcond是常見的線程間同步的工具,其中這個cond是用來給生產(chǎn)者和消費(fèi)者去操作任務(wù)隊(duì)列用的。
  3. key:是線程池的key,然后會賦予給每個由線程池創(chuàng)建的線程作為他們的thread local,用于區(qū)分這個線程是否是線程池創(chuàng)建的。
  4. 一個pthread_cond_t *terminate,這有兩個用途:不僅是退出時的標(biāo)記位 ,而且還是調(diào)用退出的那個人要等待的condition。

以上各個成員的用途,
好像說了,又好像沒說,?
是因?yàn)?strong>幾乎每一個成員都值得深挖一下,
所以我們記住它們,
后面看代碼的時候就會豁然開朗!?

第4步:接口都調(diào)用了什么核心函數(shù)。

thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize) { thrdpool_t *pool;
    ret = pthread_key_create(&pool->key, NULL); if (ret == 0)
    { // 去掉了其他代碼,但是注意到剛才的tid和terminate的賦值 memset(&pool->tid, 0, sizeof (pthread_t));
        pool->terminate = NULL; if (__thrdpool_create_threads(nthreads, pool) >= 0) return pool;
        ...

這里可以看到
__thrdpool_create_threads()里邊
最關(guān)鍵的,就是循環(huán)創(chuàng)建nthreads個線程。

while (pool->nthreads < nthreads) { ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); ...

第5步:略讀核心函數(shù)的功能。

所以我們在上一步知道了,
每個線程執(zhí)行的是__thrdpool_routine()
不難想象,它會不停從隊(duì)列拿任務(wù)出來執(zhí)行

static void *__thrdpool_routine(void *arg)                                      
{                                                                               
    ... while (1)                                                                   
    { // 1. 從隊(duì)列里拿一個任務(wù)出來,沒有就等待 pthread_mutex_lock(&pool->mutex); while (!pool->terminate && list_empty(&pool->task_queue))
            pthread_cond_wait(&pool->cond, &pool->mutex); // 2. 線程池結(jié)束的標(biāo)志位,記住它,先跳過 if (pool->terminate) break; // 3. 如果能走到這里,恭喜你,拿到了任務(wù)~  entry = list_entry(*pos, struct __thrdpool_task_entry, list);
        list_del(*pos); // 4. 先解鎖 pthread_mutex_unlock(&pool->mutex); 

        task_routine = entry->task.routine;
        task_context = entry->task.context; free(entry); // 5. 再執(zhí)行 task_routine(task_context); // 6. 這里也先記住它,意思是線程池里的線程可以銷毀線程池 if (pool->nthreads == 0) 
        { /* Thread pool was destroyed by the task. */ free(pool); return NULL;
        }                                                                       
    }
    ... // 后面還有魔法,留下一章解讀~~~ 

第6步:把函數(shù)之間的關(guān)系聯(lián)系起來。

剛才看到的__thrdpool_routine()
就是線程的核心函數(shù)了,
它可以和誰關(guān)聯(lián)起來呢?
可以和接口thrdpool_schedule()關(guān)聯(lián)上

我們說過,線程池上有個隊(duì)列管理任務(wù):

  • 每個執(zhí)行routine的線程,都是消費(fèi)者
  • 每個發(fā)起schedule的線程,都是生產(chǎn)者

我們已經(jīng)看過消費(fèi)者了,來看看生產(chǎn)者的代碼:

inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf, thrdpool_t *pool)
{ struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf; entry->task = *task;
    pthread_mutex_lock(&pool->mutex); // 添加到隊(duì)列里 list_add_tail(&entry->list, &pool->task_queue); // 叫醒在等待的線程 pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
}

說到這里,特點(diǎn)2就非常清晰了:

開篇說的特點(diǎn)2是說,
線程任務(wù)可以由另一個線程任務(wù)調(diào)起”。

只要對隊(duì)列的管理做得好,
顯然消費(fèi)者所執(zhí)行的函數(shù)里也可以生產(chǎn)

第7步:看其他情況的處理

對于線程池來說就是比如銷毀的情況。

接口thrdpool_destroy()實(shí)現(xiàn)非常簡單:

void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), thrdpool_t *pool) {
    ... // 1. 內(nèi)部會設(shè)置pool->terminate,并叫醒所有等在隊(duì)列拿任務(wù)的線程 __thrdpool_terminate(in_pool, pool); // 2. 把隊(duì)列里還沒有執(zhí)行的任務(wù)都拿出來,通過pending返回給用戶 list_for_each_safe(pos, tmp, &pool->task_queue)                             
    {
        entry = list_entry(pos, struct __thrdpool_task_entry, list);            
        list_del(pos); if (pending)                                                            
            pending(&entry->task);                                              
        ... // 后面就是銷毀各種內(nèi)存,同樣有魔法~ 

在退出的時候,
我們那些已經(jīng)提交但是還沒有被執(zhí)行的任務(wù)
是絕對不能就這么扔掉了的,
于是我們可以傳入一個pending()函數(shù),
上層可以做自己的回收、回調(diào)、
或任何保證上層邏輯完備的事情

設(shè)計(jì)的完整性,無處不在。

接下來我們就可以跟著我們的核心問題,
針對性地看看每個特點(diǎn)都是怎么實(shí)現(xiàn)的。

? 3 - 特點(diǎn)1: 一個等待一個的優(yōu)雅退出

這里提出一個問題:
線程池要退出,如何結(jié)束所有線程?

一般線程池的實(shí)現(xiàn)都是
需要記錄下所有的線程id,
或者thread對象,
以便于我們?nèi)ビ?strong>join方法等待它們結(jié)束。

不嚴(yán)格地用join收拾干凈會有什么問題?
最直觀的,模塊退出時很可能會報(bào)內(nèi)存泄漏

但是我們剛才看,
pool里并沒有記錄所有的tid呀?
正如開篇說的,
pool上只有一個tid,而且還是個空的值。

而特點(diǎn)1正是Workflow thrdpool的答案:

無需記錄所有線程,我可以讓線程挨個自動退出、且一個等待下一個,最終達(dá)到調(diào)用完thrdpool_destroy()后內(nèi)存回收干凈的目的。

這里先給一個簡單的圖,
假設(shè)發(fā)起destroy的人是main線程,
我們?nèi)绾巫龅揭粋€等一個退出:

外部線程,比如main,發(fā)起destroy

步驟如下:

  1. 線程的退出,由thrdpool_destroy()設(shè)置pool->terminate開始。
  2. 我們每個線程,在while(1) 里會第一時間發(fā)現(xiàn)terminate,線程池要退出了,然后會break出這個while循環(huán)。
  3. 注意這個時候,還持有著mutex鎖,我們拿出pool上唯一的那個tid,放到我的臨時變量,我會根據(jù)拿出來的值做不同的處理。且我會把我自己的tid放上去,然后再解mutex鎖。
  4. 那么很顯然,第一個從pool上拿tid的人,會發(fā)現(xiàn)這是個0值,就可以直接結(jié)束了,不用負(fù)責(zé)等待任何其他人,但我在完全結(jié)束之前需要有人負(fù)責(zé)等待我的結(jié)束,所以我會把我的id放上去。
  5. 而如果發(fā)現(xiàn)自己從pool里拿到的tid不是0值,說明我要負(fù)責(zé)join上一個人,并且把我的tid放上去,讓下一個人負(fù)責(zé)我。
  6. 最后的那個人,是那個發(fā)現(xiàn)pool->nthreads為0的人,那么我就可以通過這個terminate(它本身是個condition)去通知發(fā)起destroy的人。
  7. 最后發(fā)起者就可以退了。?

是不是非常有意思!!!
非常優(yōu)雅的做法!??!?

所以我們會發(fā)現(xiàn),
其實(shí)大家不太需要知道太多信息,
只需要知道我要負(fù)責(zé)的上一個人
。

當(dāng)然每一步都是非常嚴(yán)謹(jǐn)?shù)模?br /> 結(jié)合剛才跳過的第一段魔法?感受一下:

static void *__thrdpool_routine(void *arg)                                         
{ while (1)
    { // 1.注意這里還持有鎖 pthread_mutex_lock(&pool->mutex); 
        ... // 等著隊(duì)列拿任務(wù)出來 // 2. 這既是標(biāo)識位,也是發(fā)起銷毀的那個人所等待的condition if (pool->terminate) break;
        ... // 執(zhí)行拿到的任務(wù) } /* One thread joins another. Don't need to keep all thread IDs. */ // 3. 把線程池上記錄的那個tid拿下來,我來負(fù)責(zé)上一人 tid = pool->tid; // 4. 把我自己記錄到線程池上,下一個人來負(fù)責(zé)我 pool->tid = pthread_self(); // 5. 每個人都減1,最后一個人負(fù)責(zé)叫醒發(fā)起detroy的人 if (--pool->nthreads == 0) 
        pthread_cond_signal(pool->terminate); // 6. 這里可以解鎖進(jìn)行等待了 pthread_mutex_unlock(&pool->mutex); // 7. 只有第一個人拿到0值 if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0) // 8. 只要不0值,我就要負(fù)責(zé)等上一個結(jié)束才能退 pthread_join(tid, NULL); return NULL; // 9. 退出,干干凈凈~ }

? 4 - 特點(diǎn)2:線程任務(wù)可以由另一個線程任務(wù)調(diào)起

在第二部分我們看過源碼,
只要隊(duì)列管理得好,
線程任務(wù)里提交下一個任務(wù)是完全OK的。

這很合理。?

那么問題來了,
特點(diǎn)1又說,我們每個線程,
是不需要知道太多線程池的狀態(tài)和信息的。
而線程池的銷毀是個過程,
如果在這個過程間提交任務(wù)會怎么樣呢?

因此特點(diǎn)2的一個重要解讀是:
線程池被銷毀時也可以提交下一個任務(wù)。
而且剛才提過,
還沒有被執(zhí)行的任務(wù),
可以通過我們傳入的pending()函數(shù)拿回來。

簡單看看銷毀時的嚴(yán)謹(jǐn)做法:

static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)                 
{ pthread_cond_t term = PTHREAD_COND_INITIALIZER;
    pthread_mutex_lock(&pool->mutex); // 1. 加鎖設(shè)置標(biāo)志位,之后的添加任務(wù)不會被執(zhí)行,但可以pending拿到 pool->terminate = &term; // 2. 廣播所有等待的消費(fèi)者 pthread_cond_broadcast(&pool->cond); if (in_pool) // 3. 這里的魔法等下講>_<~ { /* Thread pool destroyed in a pool thread is legal. */ pthread_detach(pthread_self());                                         
        pool->nthreads--;                                                       
    } // 4. 如果還有線程沒有退完,我會等,注意這里是while while (pool->nthreads > 0) 
        pthread_cond_wait(&term, &pool->mutex);                                 

    pthread_mutex_unlock(&pool->mutex); // 5.同樣地等待打算退出的上一個人  if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0)               
        pthread_join(pool->tid, NULL); 
}

? 5 - 特點(diǎn)3:同樣可以在線程任務(wù)里銷毀這個線程池

既然線程任務(wù)可以做任何事情,
理論上,線程任務(wù)也可以銷毀線程池?

作為一個邏輯完備的線程池,
大膽一點(diǎn),
我們把問號去掉。

而且,銷毀并不會結(jié)束當(dāng)前任務(wù),
它會等這個任務(wù)執(zhí)行完
。

想象一下,剛才的__thrdpool_routine(),
while(1)里拿出來的那個任務(wù),
做的事情竟然是發(fā)起thrdpool_destroy()...

把上面的圖大膽改一下:?

我們讓一個routine來destroy線程池

如果發(fā)起銷毀的人,
是我們自己內(nèi)部的線程,
那么我們就不是等n個,而是等n-1
少了一個外部線程等待我們。
如何實(shí)現(xiàn)才能讓這些邏輯都完美融合呢?
我們把剛才跳過的三段魔法串起來看看。

? 第一段魔法,銷毀的發(fā)起者。

如果發(fā)起銷毀的人是線程池內(nèi)部的線程,
那么它具有較強(qiáng)的自我管理意識

(因?yàn)榍懊嬲f了,會等它這個任務(wù)執(zhí)行完)
而我們可以放心大膽地pthread_detach,
無需任何人join它等待它結(jié)束。

static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)                 
{ 
    ... // 每個由線程池創(chuàng)建的線程都設(shè)置了一個key,由此判斷是否是in_pool if (in_pool) 
    { /* Thread pool destroyed in a pool thread is legal. */ pthread_detach(pthread_self());
        pool->nthreads--;
    }

? 第二段魔法:線程池誰來free?

一定是發(fā)起銷毀的那個人。
所以這里用in_pool來判斷是否是外部的人:

void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), thrdpool_t *pool) { // 已經(jīng)調(diào)用完第一段,且挨個pending(未執(zhí)行的task)了 // 銷毀其他內(nèi)部分配的內(nèi)存 ... // 如果不是內(nèi)部線程發(fā)起的銷毀,要負(fù)責(zé)回收線程池內(nèi)存 if (!in_pool) free(pool);
}

那現(xiàn)在不是main線程發(fā)起的銷毀呢
發(fā)起的銷毀的那個內(nèi)部線程,
怎么能保證我可以在最后關(guān)頭
把所有資源回收干凈、調(diào)free(pool)、
最后功成身退呢

在前面閱讀源碼第5步,其實(shí)我們看過,
__thrdpool_routine()里有free的地方。

于是現(xiàn)在三段魔法終于串起來了。

? 第三段魔法:嚴(yán)謹(jǐn)?shù)牟l(fā)。

static void *__thrdpool_routine(void *arg)
{ while (1) 
    {  // ... task_routine(task_context); // 如果routine里做的事情,是銷毀線程池... // 注意這個時候,其他內(nèi)存都已經(jīng)被destroy里清掉了,萬萬不可以再用什么mutex、cond if (pool->nthreads == 0) 
        { /* Thread pool was destroyed by the task. */ free(pool); return NULL;                                            
        }
        ...

非常重要的一點(diǎn),
由于并發(fā),我們是不知道誰先操作的。
假設(shè)我們稍微改一改這個順序,
就又是另一番邏輯

比如我作為一個內(nèi)部線程,
routine()里調(diào)用destroy()期間,
發(fā)現(xiàn)還有線程沒有執(zhí)行完,
我就要等在我的terminate上,
待最后看到nthreads==0的那個人叫醒我。

然后,我的代碼繼續(xù)執(zhí)行,
函數(shù)棧就會從destroy()回到routine(),
也就是上面那幾行,
再然后就可以free(pool);
由于這時候我已經(jīng)放飛自我detach了,
于是一切順利結(jié)束。

你看,無論如何都可以完美地銷毀線程池:

并發(fā)是復(fù)雜多變的,代碼是簡潔統(tǒng)一的

是不是太妙了!
我寫到這里已經(jīng)要感動哭了!?

? 6 - 簡單的用法

這個線程池只有兩個文件:
thrdpool.h和thrdpool.c,
而且只依賴內(nèi)核的數(shù)據(jù)結(jié)構(gòu)list.h。
我們把它拿出來玩,自己寫一段代碼:

void my_routine(void *context) { // 我們要執(zhí)行的函數(shù)  printf("task-%llu start.\n", reinterpret_cast<unsigned long long>(context); );
} void my_pending(const struct thrdpool_task *task) { // 線程池銷毀后,沒執(zhí)行的任務(wù)會到這里 printf("pending task-%llu.\n", reinterpret_cast<unsigned long long>(task->context););                                    
} int main() { thrdpool_t *thrd_pool = thrdpool_create(3, 1024); // 創(chuàng)建 struct thrdpool_task task; unsigned long long i; for (i = 0; i < 5; i++)
    {
        task.routine = &my_routine;                                             
        task.context = reinterpret_cast<void *>(i);                             
        thrdpool_schedule(&task, thrd_pool); // 調(diào)用 }
    getchar(); // 卡住主線程,按回車?yán)^續(xù) thrdpool_destroy(&my_pending, thrd_pool); // 結(jié)束 return 0;                                                                   
}

再打印幾行l(wèi)og,直接編譯就可以跑起來:

媽媽再也不用擔(dān)心我的C語言作業(yè)

簡單程度堪比大一上學(xué)期C語言作業(yè)。?

? 7 - 并發(fā)與結(jié)構(gòu)之美

最后談?wù)劯惺堋?

看完之后我很后悔為什么沒有早點(diǎn)看
為什么不早點(diǎn)就可以獲得知識的感覺,
并且在淺層看懂之際,
我知道自己肯定沒有完全理解到里邊的精髓,
畢竟我不能深刻地理解到
設(shè)計(jì)者當(dāng)時對并發(fā)的構(gòu)思和模型上的選擇
。

我只能說,
沒有十多年頂級的系統(tǒng)調(diào)用和并發(fā)編程的功底
寫不出這樣的代碼,
沒有極致的審美與對品控的偏執(zhí)
也寫不出這樣的代碼。

并發(fā)編程有很多說道,
就正如退出這個這么簡單的事情,
想要做到退出時回收干凈卻很難。
如果說你寫業(yè)務(wù)邏輯自己管線程,
退出什么的sleep(1)都無所謂,
但如果說做框架的人
不能把自己的框架做得完美無暇邏輯自洽
就難免讓人感覺差點(diǎn)意思
。

而這個thrdpool,它作為一個線程池,
是如此的邏輯完備,
用最對稱簡潔的方式去面對復(fù)雜的并發(fā)

再次讓我深深地感到震撼:
我們身邊那些原始的、底層的、基礎(chǔ)的代碼,
還有很多新思路,
還可以寫得如此美。

Workflow項(xiàng)目源碼地址
https://github.com/sogou/workflow


本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點(diǎn),本站亦不保證或承諾內(nèi)容真實(shí)性等。需要轉(zhuǎn)載請聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請及時聯(lián)系本站刪除。
關(guān)閉