生產(chǎn)者消費者模型已經(jīng)很古老了吧,最近寫了個OpenMP版的此模型之實現(xiàn),來分享下。
先說一下模型的大致做法是:
1、生產(chǎn)者需要取任務(wù),生產(chǎn)產(chǎn)品。
2、消費者需要取產(chǎn)品,消費產(chǎn)品。
生產(chǎn)者在生產(chǎn)某個產(chǎn)品之后,要告知消費者此產(chǎn)品已經(jīng)可以使用了。消費者通過獲得可以使用這個信號來取得產(chǎn)品,進一步消費產(chǎn)品。
比如,我們有N個圖像需要對每一個圖像作濾波或者變換等處理,并且把處理后的結(jié)果存到硬盤上。
那么生產(chǎn)者可以將N個圖像看成N個任務(wù),每個任務(wù)都是獨立的,每個任務(wù)的計算結(jié)果可以看成是產(chǎn)品,消費者就是取這個產(chǎn)品來寫入硬盤。
先貼出一個實例代碼再作解釋。
#include#include#include#include#include#define?jobs?1000
#define?sz?102000
#if?defined(_WIN32)?&&?defined(_MSC_VER)
#includedouble?abtic()?{
__int64?freq;
__int64?clock;
QueryPerformanceFrequency(?(LARGE_INTEGER?*)&freq?);
QueryPerformanceCounter(?(LARGE_INTEGER?*)&clock?);
return?(double)clock/freq*1000*1000;
}
#else
#include#includedouble?abtic()?{
double?result?=?0.0;
struct?timeval?tv;
gettimeofday(?&tv,?NULL?);
result?=?tv.tv_sec*1000*1000?+?tv.tv_usec;
return?result;
}
#endif?/*?_WIN32?*/
#if?1
double?timer;
#define?ABTMS?timer=abtic();fprintf(stdout,"%4d??",__LINE__)
#define?ABTME?fprintf(stdout,"%4d??%8.8fmsn",__LINE__,(abtic()-timer)/1000.0f)
#else
#define?ABTMS?
#define?ABTME?
#endif
int?main()
{
??char?*jbNotReady;
??double?*a;
??double?*as;
??double?*pa;
??int?j,?k;
char?jbnr;
??a?=?(double*)malloc(sz*jobs*sizeof(double));
??as?=?(double*)malloc(jobs*sizeof(double));
??jbNotReady?=?(char*)malloc(jobs*sizeof(char));
??for?(j?=?0;?j?<?jobs;?j++)
??{
????jbNotReady[j]?=?1;
????
??}
??memset(a,?0,?sz*jobs*sizeof(double));
??memset(as,?0,?jobs*sizeof(double));
??ABTMS;
#pragma?omp?parallel?sections?private(j,k,pa)?shared(jbNotReady,as,a)
??{
????//?producer
#pragma?omp?section
????{
??????for?(j?=?0;?j?<?jobs;?j++)
??????{
????????pa?=?a+j*sz;
????????for?(k?=?0;?k?<?sz;?k++)
????????{
??????????pa[k]?=?1.0;
????????}
????????jbNotReady[j]?=?0;
#pragma?omp?flush
??????}
????}
????//?consumer
#pragma?omp?section
????{
??????for?(j?=?0;?j?<?jobs;?j++)
??????{
#pragma?omp?flush
????????while?(jbNotReady[j]){
#pragma?omp?flush
}
????????as[j]?=?0.0;
????????pa?=?a+j*sz;
????????for?(k?=?0;?k?<?sz;?k++)
????????{
??????????as[j]?+=?pa[k];
????????}
????????if?((int)(as[j])!=sz)fprintf(stdout,?"job?id?%3d?:%fn",?j,?as[j]);
??????}
????}
??}
??ABTME;
??free(a);
??free(as);
??free(jbNotReady);
??return?0;
}
源代碼中,第一個section創(chuàng)建的線程扮演的就是生產(chǎn)者的角色,第二個section扮演消費者角色。j這個變量模擬的是任務(wù)編號,第一個section中的循環(huán)模擬產(chǎn)生產(chǎn)品。第二個section每次取一個任務(wù),而且是順序取,通過驗證任務(wù)是否已經(jīng)準備好來獲得正確的產(chǎn)品。
使用flush制導語句是為了將每個線程的緩存和內(nèi)存強制保持一致,注意生產(chǎn)者向jbNotReady里寫,而消費者只是讀數(shù)據(jù),不會出現(xiàn)內(nèi)存中的數(shù)據(jù)寫后讀,讀后寫的問題,每個線程獲得的數(shù)據(jù)都是安全的。
以上代碼支持Windows和Linux,GCC4.4以后的版本都可以執(zhí)行,Windows下只要支持OpenMP的編譯器,都可行。





