我們使用線(xiàn)程的時(shí)候就去創(chuàng)建一個(gè)線(xiàn)程,這樣實(shí)現(xiàn)起來(lái)非常簡(jiǎn)便,但是就會(huì)有一個(gè)問(wèn)題:如果并發(fā)的線(xiàn)程數(shù)量很多,并且每個(gè)線(xiàn)程都是執(zhí)行一個(gè)時(shí)間很短的任務(wù)就結(jié)束了,這樣頻繁創(chuàng)建線(xiàn)程就會(huì)大大降低系統(tǒng)的效率,因?yàn)轭l繁創(chuàng)建線(xiàn)程和銷(xiāo)毀線(xiàn)程需要時(shí)間。那么有沒(méi)有一種辦法使得線(xiàn)程可以復(fù)用,就是執(zhí)行完一個(gè)任務(wù),并不被銷(xiāo)毀,而是可以繼續(xù)執(zhí)行其他的任務(wù)呢?線(xiàn)程池是一種多線(xiàn)程處理形式,處理過(guò)程中將任務(wù)添加到隊(duì)列,然后在創(chuàng)建線(xiàn)程后自動(dòng)啟動(dòng)這些任務(wù)。線(xiàn)程池線(xiàn)程都是后臺(tái)線(xiàn)程。每個(gè)線(xiàn)程都使用默認(rèn)的堆棧大小,以默認(rèn)的優(yōu)先級(jí)運(yùn)行,并處于多線(xiàn)程單元中。如果某個(gè)線(xiàn)程在托管代碼中空閑(如正在等待某個(gè)事件), 則線(xiàn)程池將插入另一個(gè)輔助線(xiàn)程來(lái)使所有處理器保持繁忙。如果所有線(xiàn)程池線(xiàn)程都始終保持繁忙,但隊(duì)列中包含掛起的工作,則線(xiàn)程池將在一段時(shí)間后創(chuàng)建另一個(gè)輔助線(xiàn)程但線(xiàn)程的數(shù)目永遠(yuǎn)不會(huì)超過(guò)最大值。超過(guò)最大值的線(xiàn)程可以排隊(duì),但他們要等到其他線(xiàn)程完成后才啟動(dòng)。在各個(gè)編程語(yǔ)言的語(yǔ)種中都有線(xiàn)程池的概念,并且很多語(yǔ)言中直接提供了線(xiàn)程池,作為程序猿直接使用就可以了,下面給大家介紹一下線(xiàn)程池的實(shí)現(xiàn)原理:
線(xiàn)程池的組成主要分為 3 個(gè)部分,這三部分配合工作就可以得到一個(gè)完整的線(xiàn)程池: 任務(wù)隊(duì)列,存儲(chǔ)需要處理的任務(wù),由工作的線(xiàn)程來(lái)處理這些任務(wù) 通過(guò)線(xiàn)程池提供的 API 函數(shù),將一個(gè)待處理的任務(wù)添加到任務(wù)隊(duì)列,或者從任務(wù)隊(duì)列中刪除 已處理的任務(wù)會(huì)被從任務(wù)隊(duì)列中刪除 線(xiàn)程池的使用者,也就是調(diào)用線(xiàn)程池函數(shù)往任務(wù)隊(duì)列中添加任務(wù)的線(xiàn)程就是生產(chǎn)者線(xiàn)程 工作的線(xiàn)程(任務(wù)隊(duì)列任務(wù)的消費(fèi)者) ,N個(gè) 線(xiàn)程池中維護(hù)了一定數(shù)量的工作線(xiàn)程,他們的作用是是不停的讀任務(wù)隊(duì)列,從里邊取出任務(wù)并處理 工作的線(xiàn)程相當(dāng)于是任務(wù)隊(duì)列的消費(fèi)者角色, 如果任務(wù)隊(duì)列為空,工作的線(xiàn)程將會(huì)被阻塞 (使用條件變量 / 信號(hào)量阻塞) 如果阻塞之后有了新的任務(wù),由生產(chǎn)者將阻塞解除,工作線(xiàn)程開(kāi)始工作 管理者線(xiàn)程(不處理任務(wù)隊(duì)列中的任務(wù)),1個(gè) 它的任務(wù)是周期性的對(duì)任務(wù)隊(duì)列中的任務(wù)數(shù)量以及處于忙狀態(tài)的工作線(xiàn)程個(gè)數(shù)進(jìn)行檢測(cè) 當(dāng)任務(wù)過(guò)多的時(shí)候,可以適當(dāng)?shù)膭?chuàng)建一些新的工作線(xiàn)程 當(dāng)任務(wù)過(guò)少的時(shí)候,可以適當(dāng)?shù)匿N(xiāo)毀一些工作的線(xiàn)程 2. 任務(wù)隊(duì)列//?任務(wù)結(jié)構(gòu)體 typedef?struct?Task { ????void?(*function )(void*?arg); ????void*?arg; }Task; 3. 線(xiàn)程池定義//?線(xiàn)程池結(jié)構(gòu)體 struct?ThreadPool { ????//?任務(wù)隊(duì)列 ????Task*?taskQ; ????int?queueCapacity;??//?容量 ????int?queueSize;??????//?當(dāng)前任務(wù)個(gè)數(shù) ????int?queueFront;?????//?隊(duì)頭?->?取數(shù)據(jù) ????int?queueRear;??????//?隊(duì)尾?->?放數(shù)據(jù) ????pthread_t?managerID;????//?管理者線(xiàn)程ID ????pthread_t?*threadIDs;???//?工作的線(xiàn)程ID ????int?minNum;?????????????//?最小線(xiàn)程數(shù)量 ????int?maxNum;?????????????//?最大線(xiàn)程數(shù)量 ????int?busyNum;????????????//?忙的線(xiàn)程的個(gè)數(shù) ????int?liveNum;????????????//?存活的線(xiàn)程的個(gè)數(shù) ????int?exitNum;????????????//?要銷(xiāo)毀的線(xiàn)程個(gè)數(shù) ????pthread_mutex_t?mutexPool;??//?鎖整個(gè)的線(xiàn)程池 ????pthread_mutex_t?mutexBusy;??//?鎖busyNum變量 ????pthread_cond_t?notFull;?????//?任務(wù)隊(duì)列是不是滿(mǎn)了 ????pthread_cond_t?notEmpty;????//?任務(wù)隊(duì)列是不是空了 ????int?shutdown;???????????//?是不是要銷(xiāo)毀線(xiàn)程池,?銷(xiāo)毀為1,?不銷(xiāo)毀為0 }; 4. 頭文件聲明#ifndef?_THREADPOOL_H #define?_THREADPOOL_H typedef?struct?ThreadPool?ThreadPool; //?創(chuàng)建線(xiàn)程池并初始化 ThreadPool?*threadPoolCreate(int?min,?int?max,?int?queueSize); //?銷(xiāo)毀線(xiàn)程池 int?threadPoolDestroy(ThreadPool*?pool); //?給線(xiàn)程池添加任務(wù) void?threadPoolAdd(ThreadPool*?pool,?void(*func)(void*),?void*?arg); //?獲取線(xiàn)程池中工作的線(xiàn)程的個(gè)數(shù) int?threadPoolBusyNum(ThreadPool*?pool); //?獲取線(xiàn)程池 中活著的線(xiàn)程的個(gè)數(shù) int?threadPoolAliveNum(ThreadPool*?pool); ////////////////////// //?工作的線(xiàn)程(消費(fèi)者線(xiàn)程)任務(wù)函數(shù) void*?worker(void*?arg); //?管理者線(xiàn)程任務(wù)函數(shù) void*?manager(void*?arg); //?單個(gè)線(xiàn)程退出 void?threadExit(ThreadPool*?pool);#endif??//?_THREADPOOL_H 5. 源文件定義ThreadPool*?threadPoolCreate(int?min,?int?max,?int?queueSize) { ????ThreadPool*?pool?=?(ThreadPool*)malloc(sizeof(ThreadPool)); ????do ? ????{ ????????if ?(pool?==?NULL) ????????{ ????????????printf ("malloc?threadpool?fail...\n" ); ????????????break ; ????????} ????????pool->threadIDs?=?(pthread_t*)malloc(sizeof(pthread_t)?*?max); ????????if ?(pool->threadIDs?==?NULL) ????????{ ????????????printf ("malloc?threadIDs?fail...\n" ); ????????????break ; ????????} ????????memset(pool->threadIDs,?0,?sizeof(pthread_t)?*?max); ????????pool->minNum?=?min; ????????pool->maxNum?=?max; ????????pool->busyNum?=?0; ????????pool->liveNum?=?min;????//?和最小個(gè)數(shù)相等 ????????pool->exitNum?=?0; ????????if ?(pthread_mutex_init(