版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
第epoll封裝reactor原理剖析示例詳解目錄reactor是什么?reactor模型三個(gè)重要組件與流程分析組件流程將epoll封裝成reactor事件驅(qū)動(dòng)封裝每一個(gè)連接sockfd變成ntyevent封裝epfd和ntyevent變成ntyreactor封裝讀、寫、接收連接等事件對(duì)應(yīng)的操作變成callback給每個(gè)客戶端的ntyevent設(shè)置屬性將ntyevent加入到epoll中由內(nèi)核監(jiān)聽(tīng)將ntyevent從epoll中去除讀事件回調(diào)函數(shù)寫事件回調(diào)函數(shù)接受新連接事件回調(diào)函數(shù)reactor運(yùn)行reactor簡(jiǎn)單版代碼與測(cè)試reactor優(yōu)點(diǎn)reactor多種模型單reactor+單線程模型單reactor+線程池(ThreadPool)模型多reactor+多線程模型多reactor+線程池(ThreadPool)模型注意點(diǎn)reactor完善版代碼
reactor是什么?
本文將由淺入深的介紹reactor,深入淺出的封裝epoll,一步步變成reactor模型,并在文末介紹reactor的四種模型。
reactor是一種高并發(fā)服務(wù)器模型,是一種框架,一個(gè)概念,所以reactor沒(méi)有一個(gè)固定的代碼,可以有很多變種,后續(xù)會(huì)介紹到。
組成:?阻塞的IO(如果是阻塞IO,發(fā)送緩沖區(qū)滿了怎么辦,就阻塞了)+io多路復(fù)?;特征:基于事件循環(huán),以事件驅(qū)動(dòng)或者事件回調(diào)的?式來(lái)實(shí)現(xiàn)業(yè)務(wù)邏輯。
reactor中的IO使用的是select,poll,epoll這些IO多路復(fù)用,使用IO多路復(fù)用系統(tǒng)不必創(chuàng)建維護(hù)大量線程,只使用一個(gè)線程、一個(gè)選擇器就可同時(shí)處理成千上萬(wàn)連接,大大減少了系統(tǒng)開(kāi)銷。
reactor中文譯為反應(yīng)堆,將epoll中的IO變成事件驅(qū)動(dòng),比如讀事件,寫事件。來(lái)了個(gè)讀事件,立馬進(jìn)行反應(yīng),執(zhí)行提前注冊(cè)好的事件回調(diào)函數(shù)。
回想一下普通函數(shù)調(diào)用的機(jī)制:程序調(diào)用某函數(shù),函數(shù)執(zhí)行,程序等待,函數(shù)將結(jié)果和控制權(quán)返回給程序,程序繼續(xù)處理。reactor反應(yīng)堆,是一種事件驅(qū)動(dòng)機(jī)制,和普通函數(shù)調(diào)用的不同之處在于:應(yīng)用程序不是主動(dòng)的調(diào)用某個(gè)API完成處理,而是恰恰相反,reactor逆置了事件處理流程,應(yīng)用程序需要提供相應(yīng)的接口并注冊(cè)到reactor上,如果相應(yīng)的事件發(fā)生,reactor將主動(dòng)調(diào)用應(yīng)用程序注冊(cè)的接口,這些接口又稱為回調(diào)函數(shù)。
說(shuō)白了,reactor就是對(duì)epoll進(jìn)行封裝,進(jìn)行網(wǎng)絡(luò)IO與業(yè)務(wù)的解耦,將epoll管理IO變成管理事件,整個(gè)程序由事件進(jìn)行驅(qū)動(dòng)執(zhí)行。就像下圖一樣,有就緒事件返回,reactor:由事件驅(qū)動(dòng)執(zhí)行對(duì)應(yīng)的回調(diào)函數(shù);epoll:需要自己判斷。
reactor模型三個(gè)重要組件與流程分析
reactor是處理并發(fā)I/O比較常見(jiàn)的一種模式,用于同步I/O,中心思想是將所有要處理的I/O事件注冊(cè)到一個(gè)中心I/O多路復(fù)用器(epoll)上,同時(shí)主線程/進(jìn)程阻塞在多路復(fù)用器上;
一旦有I/O事件到來(lái)或是準(zhǔn)備就緒(文件描述符或socket可讀、寫),多路復(fù)用器返回并將事先注冊(cè)的相應(yīng)I/O事件分發(fā)到對(duì)應(yīng)的處理器中。
組件
reactor模型有三個(gè)重要的組件
多路復(fù)用器:由操作系統(tǒng)提供,在linux上一般是select,poll,epoll等系統(tǒng)調(diào)用。
事件分發(fā)器:將多路復(fù)用器中返回的就緒事件分到對(duì)應(yīng)的處理函數(shù)中。
事件處理器:負(fù)責(zé)處理特定事件的處理函數(shù)。
流程
具體流程:
注冊(cè)相應(yīng)的事件處理器(剛開(kāi)始listenfd注冊(cè)都就緒事件)多路復(fù)用器等待事件事件到來(lái),激活事件分發(fā)器,分發(fā)器調(diào)用事件到對(duì)應(yīng)的處理器事件處理器處理事件,然后注冊(cè)新的事件(比如讀事件,完成讀操作后,根據(jù)業(yè)務(wù)處理數(shù)據(jù),注冊(cè)寫事件,寫事件根據(jù)業(yè)務(wù)響應(yīng)請(qǐng)求;比如listen讀事件,肯定要給新的連接注冊(cè)讀事件)
將epoll封裝成reactor事件驅(qū)動(dòng)
封裝每一個(gè)連接sockfd變成ntyevent
我們知道一個(gè)連接對(duì)應(yīng)一個(gè)文件描述符fd,對(duì)于這個(gè)連接(fd)來(lái)說(shuō),它有自己的事件(讀,寫)。我們將fd都設(shè)置成非阻塞的,所以這里我們需要添加兩個(gè)buffer,至于大小就是看業(yè)務(wù)需求了。
structntyevent{
intfd;//socketfd
intevents;//事件
charsbuffer[BUFFER_LENGTH];//寫緩沖buffer
intslength;
charrbuffer[BUFFER_LENGTH];//讀緩沖buffer
intrlength;
//typedefint(*NtyCallBack)(int,int,void*);
NtyCallBackcallback;//回調(diào)函數(shù)
void*arg;
intstatus;//1MOD0null
封裝epfd和ntyevent變成ntyreactor
我們知道socketfd已經(jīng)被封裝成了ntyevent,那么有多少個(gè)ntyevent呢?這里demo初始化reactor的時(shí)候其實(shí)是將*events指向了一個(gè)1024的ntyevent數(shù)組(按照道理來(lái)說(shuō)客戶端連接可以一直連,不止1024個(gè)客戶端,后續(xù)文章有解決方案,這里從簡(jiǎn))。epfd肯定要封裝進(jìn)行,不用多說(shuō)。
structntyreactor{
intepfd;
structntyevent*events;
//structntyeventevents[1024];
封裝讀、寫、接收連接等事件對(duì)應(yīng)的操作變成callback
前面已經(jīng)說(shuō)了,把事件寫成回調(diào)函數(shù),這里的參數(shù)fd肯定要知道自己的哪個(gè)連接,events是什么事件的意思,arg傳的是ntyreactor(考慮到后續(xù)多線程多進(jìn)程,如果將ntyreactor設(shè)為全局感覺(jué)不太好)
typedefint(*NtyCallBack)(int,int,void*);
intrecv_cb(intfd,intevents,void*arg);
intsend_cb(intfd,intevents,void*arg);
intaccept_cb(intfd,intevents,void*arg);
給每個(gè)客戶端的ntyevent設(shè)置屬性
具兩個(gè)例子,我們知道第一個(gè)socket一定是listenfd,用來(lái)監(jiān)聽(tīng)用的,那么首先肯定是設(shè)置ntyevent的各項(xiàng)屬性。本來(lái)是讀事件,讀完后要改成寫事件,那么必然要把原來(lái)的讀回調(diào)函數(shù)設(shè)置成寫事件回調(diào)。
voidnty_event_set(structntyevent*ev,intfd,NtyCallBackcallback,void*arg){
ev-fd=fd;
ev-callback=callback;
ev-events=0;
ev-arg=arg;
將ntyevent加入到epoll中由內(nèi)核監(jiān)聽(tīng)
intnty_event_add(intepfd,intevents,structntyevent*ntyev){
structepoll_eventev={0,{0}};
ev.data.ptr=ntyev;
ev.events=ntyev-events=events;
intop;
if(ntyev-status==1){
op=EPOLL_CTL_MOD;
else{
op=EPOLL_CTL_ADD;
ntyev-status=1;
if(epoll_ctl(epfd,op,ntyev-fd,ev)0){
printf("eventaddfailed[fd=%d],events[%d],err:%s,err:%d\n",ntyev-fd,events,strerror(errno),errno);
return-1;
return0;
將ntyevent從epoll中去除
intnty_event_del(intepfd,structntyevent*ev){
structepoll_eventep_ev={0,{0}};
if(ev-status!=1){
return-1;
ep_ev.data.ptr=ev;
ev-status=0;
epoll_ctl(epfd,EPOLL_CTL_DEL,ev-fd,ep_ev);
//epoll_ctl(epfd,EPOLL_CTL_DEL,ev-fd,NULL);
return0;
讀事件回調(diào)函數(shù)
這里就是被觸發(fā)的回調(diào)函數(shù),具體代碼要與業(yè)務(wù)結(jié)合,這里的參考意義不大(這里就是讀一次,改成寫事件)
intrecv_cb(intfd,intevents,void*arg){
structntyreactor*reactor=(structntyreactor*)arg;
structntyevent*ntyev=reactor-events[fd];
intlen=recv(fd,ntyev-buffer,BUFFER_LENGTH,0);
nty_event_del(reactor-epfd,ntyev);
if(len0){
ntyev-length=len;
ntyev-buffer[len]='\0';
printf("C[%d]:%s\n",fd,ntyev-buffer);
nty_event_set(ntyev,fd,send_cb,reactor);
nty_event_add(reactor-epfd,EPOLLOUT,ntyev);
elseif(len==0){
close(ntyev-
printf("[fd=%d]pos[%ld],closed\n",fd,ntyev-reactor-events);
else{
close(ntyev-
printf("recv[fd=%d]error[%d]:%s\n",fd,errno,strerror(errno));
returnlen;
寫事件回調(diào)函數(shù)
這里就是被觸發(fā)的回調(diào)函數(shù),具體代碼要與業(yè)務(wù)結(jié)合,這里的參考意義不大(將讀事件讀的數(shù)據(jù)寫回,再改成讀事件,相當(dāng)于echo)
intsend_cb(intfd,intevents,void*arg){
structntyreactor*reactor=(structntyreactor*)arg;
structntyevent*ntyev=reactor-events[fd];
intlen=send(fd,ntyev-buffer,ntyev-length,0);
if(len0){
printf("send[fd=%d],[%d]%s\n",fd,len,ntyev-buffer);
nty_event_del(reactor-epfd,ntyev);
nty_event_set(ntyev,fd,recv_cb,reactor);
nty_event_add(reactor-epfd,EPOLLIN,ntyev);
else{
close(ntyev-
nty_event_del(reactor-epfd,ntyev);
printf("send[fd=%d]error%s\n",fd,strerror(errno));
returnlen;
接受新連接事件回調(diào)函數(shù)
本質(zhì)上就是accept,然后將其加入到epoll監(jiān)聽(tīng)
intaccept_cb(intfd,intevents,void*arg){
structntyreactor*reactor=(structntyreactor*)arg;
if(reactor==NULL)return-1;
structsockaddr_inclient_addr;
socklen_tlen=sizeof(client_addr);
intclientfd;
if((clientfd=accept(fd,(structsockaddr*)client_addr,len))==-1){
printf("accept:%s\n",strerror(errno));
return-1;
printf("clientfd=%d\n",clientfd);
if((fcntl(clientfd,F_SETFL,O_NONBLOCK))0){
printf("%s:fcntlnonblockingfailed,%d\n",__func__,MAX_EPOLL_EVENTS);
return-1;
nty_event_set(reactor-events[clientfd],clientfd,recv_cb,reactor);
nty_event_add(reactor-epfd,EPOLLIN,reactor-events[clientfd]);
printf("newconnect[%s:%d][time:%ld],pos[%d]\n",inet_ntoa(client_addr.sin_addr),ntohs(client_addr.sin_port),
reactor-events[clientfd].last_active,clientfd);
return0;
reactor運(yùn)行
就是將原來(lái)的epoll_wait從main函數(shù)中封裝到ntyreactor_run函數(shù)中
intntyreactor_run(structntyreactor*reactor){
if(reactor==NULL)return-1;
if(reactor-epfd0)return-1;
if(reactor-events==NULL)return-1;
structepoll_eventevents[MAX_EPOLL_EVENTS];
intcheckpos=0,i;
while(1){
intnready=epoll_wait(reactor-epfd,events,MAX_EPOLL_EVENTS,1000);
if(nready0){
printf("epoll_waiterror,exit\n");
continue;
for(i=0;inready;i++){
structntyevent*ev=(structntyevent*)events[i].data.ptr;
ev-callback(ev-fd,events[i].events,ev-arg);
reactor簡(jiǎn)單版代碼與測(cè)試
后續(xù)會(huì)出一篇測(cè)試百萬(wàn)連接數(shù)量的文章
#includestdio.h
#includestdlib.h
#includestring.h
#includesys/socket.h
#includesys/epoll.h
#includearpa/inet.h
#includefcntl.h
#includeunistd.h
#includeerrno.h
#includetime.h
#defineBUFFER_LENGTH4096
#defineMAX_EPOLL_EVENTS1024
#defineSERVER_PORT8082
typedefint(*NtyCallBack)(int,int,void*);
structntyevent{
intfd;
intevents;
void*arg;
NtyCallBackcallback;
intstatus;//1MOD0null
charbuffer[BUFFER_LENGTH];
intlength;
longlast_active;
structntyreactor{
intepfd;
structntyevent*events;
intrecv_cb(intfd,intevents,void*arg);
intsend_cb(intfd,intevents,void*arg);
intaccept_cb(intfd,intevents,void*arg);
voidnty_event_set(structntyevent*ev,intfd,NtyCallBackcallback,void*arg){
ev-fd=fd;
ev-callback=callback;
ev-events=0;
ev-arg=arg;
ev-last_active=time(NULL);
intnty_event_add(intepfd,intevents,structntyevent*ntyev){
structepoll_eventev={0,{0}};
ev.data.ptr=ntyev;
ev.events=ntyev-events=events;
intop;
if(ntyev-status==1){
op=EPOLL_CTL_MOD;
else{
op=EPOLL_CTL_ADD;
ntyev-status=1;
if(epoll_ctl(epfd,op,ntyev-fd,ev)0){
printf("eventaddfailed[fd=%d],events[%d],err:%s,err:%d\n",ntyev-fd,events,strerror(errno),errno);
return-1;
return0;
intnty_event_del(intepfd,structntyevent*ev){
structepoll_eventep_ev={0,{0}};
if(ev-status!=1){
return-1;
ep_ev.data.ptr=ev;
ev-status=0;
epoll_ctl(epfd,EPOLL_CTL_DEL,ev-fd,ep_ev);
//epoll_ctl(epfd,EPOLL_CTL_DEL,ev-fd,NULL);
return0;
intrecv_cb(intfd,intevents,void*arg){
structntyreactor*reactor=(structntyreactor*)arg;
structntyevent*ntyev=reactor-events[fd];
intlen=recv(fd,ntyev-buffer,BUFFER_LENGTH,0);
nty_event_del(reactor-epfd,ntyev);
if(len0){
ntyev-length=len;
ntyev-buffer[len]='\0';
printf("C[%d]:%s\n",fd,ntyev-buffer);
nty_event_set(ntyev,fd,send_cb,reactor);
nty_event_add(reactor-epfd,EPOLLOUT,ntyev);
elseif(len==0){
close(ntyev-
printf("[fd=%d]pos[%ld],closed\n",fd,ntyev-reactor-events);
else{
close(ntyev-
printf("recv[fd=%d]error[%d]:%s\n",fd,errno,strerror(errno));
returnlen;
intsend_cb(intfd,intevents,void*arg){
structntyreactor*reactor=(structntyreactor*)arg;
structntyevent*ntyev=reactor-events[fd];
intlen=send(fd,ntyev-buffer,ntyev-length,0);
if(len0){
printf("send[fd=%d],[%d]%s\n",fd,len,ntyev-buffer);
nty_event_del(reactor-epfd,ntyev);
nty_event_set(ntyev,fd,recv_cb,reactor);
nty_event_add(reactor-epfd,EPOLLIN,ntyev);
else{
close(ntyev-
nty_event_del(reactor-epfd,ntyev);
printf("send[fd=%d]error%s\n",fd,strerror(errno));
returnlen;
intaccept_cb(intfd,intevents,void*arg){
structntyreactor*reactor=(structntyreactor*)arg;
if(reactor==NULL)return-1;
structsockaddr_inclient_addr;
socklen_tlen=sizeof(client_addr);
intclientfd;
if((clientfd=accept(fd,(structsockaddr*)client_addr,len))==-1){
printf("accept:%s\n",strerror(errno));
return-1;
printf("clientfd=%d\n",clientfd);
if((fcntl(clientfd,F_SETFL,O_NONBLOCK))0){
printf("%s:fcntlnonblockingfailed,%d\n",__func__,MAX_EPOLL_EVENTS);
return-1;
nty_event_set(reactor-events[clientfd],clientfd,recv_cb,reactor);
nty_event_add(reactor-epfd,EPOLLIN,reactor-events[clientfd]);
printf("newconnect[%s:%d][time:%ld],pos[%d]\n",inet_ntoa(client_addr.sin_addr),ntohs(client_addr.sin_port),
reactor-events[clientfd].last_active,clientfd);
return0;
intinit_sock(shortport){
intfd=socket(AF_INET,SOCK_STREAM,0);
structsockaddr_inserver_addr;
memset(server_addr,0,sizeof(server_addr));
server_addr.sin_family=AF_INET;
server_addr.sin_addr.s_addr=htonl(INADDR_ANY);
server_addr.sin_port=htons(port);
bind(fd,(structsockaddr*)server_addr,sizeof(server_addr));
if(listen(fd,20)0){
printf("listenfailed:%s\n",strerror(errno));
returnfd;
intntyreactor_init(structntyreactor*reactor){
if(reactor==NULL)return-1;
memset(reactor,0,sizeof(structntyreactor));
reactor-epfd=epoll_create(1);
if(reactor-epfd=0){
printf("createepfdin%serr%s\n",__func__,strerror(errno));
return-2;
reactor-events=(structntyevent*)malloc((MAX_EPOLL_EVENTS)*sizeof(structntyevent));
memset(reactor-events,0,(MAX_EPOLL_EVENTS)*sizeof(structntyevent));
if(reactor-events==NULL){
printf("createeplleventsin%serr%s\n",__func__,strerror(errno));
close(reactor-epfd);
return-3;
return0;
intntyreactor_destory(structntyreactor*reactor){
close(reactor-epfd);
free(reactor-events);
intntyreactor_addlistener(structntyreactor*reactor,intsockfd,NtyCallBackacceptor){
if(reactor==NULL)return-1;
if(reactor-events==NULL)return-1;
nty_event_set(reactor-events[sockfd],sockfd,acceptor,reactor);
nty_event_add(reactor-epfd,EPOLLIN,reactor-events[sockfd]);
return0;
_Noreturnintntyreactor_run(structntyreactor*reactor){
if(reactor==NULL)return-1;
if(reactor-epfd0)return-1;
if(reactor-events==NULL)return-1;
structepoll_eventevents[MAX_EPOLL_EVENTS];
intcheckpos=0,i;
while(1){
//心跳包60s超時(shí)則斷開(kāi)連接
longnow=time(NULL);
for(i=0;i100;i++,checkpos++){
if(checkpos==MAX_EPOLL_EVENTS){
checkpos=0;
if(reactor-events[checkpos].status!=1||checkpos==3){
continue;
longduration=now-reactor-events[checkpos].last_active;
if(duration=60){
close(reactor-events[checkpos].fd);
printf("[fd=%d]timeout\n",reactor-events[checkpos].fd);
nty_event_del(reactor-epfd,reactor-events[checkpos]);
intnready=epoll_wait(reactor-epfd,events,MAX_EPOLL_EVENTS,1000);
if(nready0){
printf("epoll_waiterror,exit\n");
continue;
for(i=0;inready;i++){
structntyevent*ev=(structntyevent*)events[i].data.ptr;
ev-callback(ev-fd,events[i].events,ev-arg);
intmain(intargc,char*argv[]){
intsockfd=init_sock(SERVER_PORT);
structntyreactor*reactor=(structntyreactor*)malloc(sizeof(structntyreactor));
if(ntyreactor_init(reactor)!=0){
return-1;
ntyreactor_addlistener(reactor,sockfd,accept_cb);
ntyreactor_run(reactor);
ntyreactor_destory(reactor);
close(sockfd);
return0;
reactor優(yōu)點(diǎn)
reactor模式是編寫高性能網(wǎng)絡(luò)服務(wù)器的必備技術(shù)之一,它具有如下優(yōu)點(diǎn):
響應(yīng)快,不必為單個(gè)同步時(shí)間所阻塞,雖然reactor本身依然是同步的編程相對(duì)簡(jiǎn)單,可以最大程度的避免復(fù)雜的多線程及同步問(wèn)題,并且避免了多線程/進(jìn)程的切換開(kāi)銷可擴(kuò)展性,可以方便的通過(guò)增加reactor實(shí)例個(gè)數(shù)來(lái)充分利用CPU資源可復(fù)用性,reactor框架本身與具體事件處理邏輯無(wú)關(guān),具有很高的復(fù)用性
reactor模型開(kāi)發(fā)效率上比起直接使用IO復(fù)用要高,它通常是單線程的,設(shè)計(jì)目標(biāo)是希望單線程使用一顆CPU的全部資源,但也有附帶優(yōu)點(diǎn),即每個(gè)事件處理中很多時(shí)候可以不考慮共享資源的互斥訪問(wèn)??墒侨秉c(diǎn)也是明顯的,現(xiàn)在的硬件發(fā)展,已經(jīng)不再遵循摩爾定律,CPU的頻率受制于材料的限制不再有大的提升,而改為是從核數(shù)的增加上提升能力,當(dāng)程序需要使用多核資源時(shí),reactor模型就會(huì)悲劇。
如果程序業(yè)務(wù)很簡(jiǎn)單,例如只是簡(jiǎn)單的訪問(wèn)一些提供了并發(fā)訪問(wèn)的服務(wù),就可以直接開(kāi)啟多個(gè)反應(yīng)堆,每個(gè)反應(yīng)堆對(duì)應(yīng)一顆CPU核心,這些反應(yīng)堆上跑的請(qǐng)求互不相關(guān),這是完全可以利用多核的。例如Nginx這樣的http靜態(tài)服務(wù)器。
reactor多種模型
單reactor+單線程模型
單reactor單線程模型,指的是所有的IO操作(讀,寫,建立連接)都在同一個(gè)線程上面完成
缺點(diǎn):
由于只有一個(gè)線程,因此事件是順序處理的,一個(gè)線程同時(shí)只能做一件事情,事件的優(yōu)先級(jí)得不到保證不能充分利用多核CPU
單reactor+線程池(ThreadPool)模型
相比于單reactor單線程模型,此模型中收到請(qǐng)求后,不在reactor線程計(jì)算,而是使用線程池來(lái)計(jì)算,這會(huì)充分的利用多核CPU。
采用此模式時(shí)有可能存在多個(gè)線程同時(shí)計(jì)算同一個(gè)連接上的多個(gè)請(qǐng)求,算出的結(jié)果的次序是不確定的,所以需要網(wǎng)絡(luò)框架在設(shè)計(jì)協(xié)議時(shí)帶一個(gè)id標(biāo)示,以便以便讓客戶端區(qū)分response對(duì)應(yīng)的是哪個(gè)request。
多reactor+多線程模型
此模式的特點(diǎn)是每個(gè)線程一個(gè)循環(huán),有一個(gè)mainreactor負(fù)責(zé)accept連接,然后把該連接掛在某個(gè)subreactor中,這樣該連接的所有操作都在那個(gè)subreactor所處的線程中完成。
多個(gè)連接可能被分配到多個(gè)線程中,充分利用CPU。在應(yīng)用場(chǎng)景中,reactor的個(gè)數(shù)可以采用固定的個(gè)數(shù),比如跟CPU數(shù)目一致。
此模型與單reactor多線程模型相比,減少了進(jìn)出threadpool兩次上下文切換,小規(guī)模的計(jì)算可以在當(dāng)前IO線程完成并且返回結(jié)果,降低響應(yīng)的延遲。
并可以有效防止當(dāng)IO壓力過(guò)大時(shí)一個(gè)reactor處理能力飽和問(wèn)題。
多reactor+線程池(ThreadPool)模型
此模型是上面兩個(gè)的混合體,它既使用多個(gè)reactors來(lái)處理IO,又使用線程池來(lái)處理計(jì)算。此模式適適合既有突發(fā)IO(利用MultipleReactor分擔(dān)),又有突發(fā)計(jì)算的應(yīng)用(利用線程池把一個(gè)連接上的計(jì)算任務(wù)分配給多個(gè)線程)。
注意點(diǎn)
注意:
前面介紹的四種reactor模式在具體實(shí)現(xiàn)時(shí)為了簡(jiǎn)應(yīng)該遵循的原則是:每個(gè)文件描述符只由一個(gè)線程操作。
這樣可以輕輕松松解決消息收發(fā)的順序性問(wèn)題,也避免了關(guān)閉文件描述符的各種racecondition。一個(gè)線程可以操作多個(gè)文件描述符,但是一個(gè)線程不能操作別的線程擁有的文件描述符。
這一點(diǎn)不難做到。epoll也遵循了相同的原則。Linux文檔中并沒(méi)有說(shuō)明,當(dāng)一個(gè)線程證阻塞在epoll_wait時(shí),另一個(gè)線程往epollfd添加一個(gè)新的監(jiān)控fd會(huì)發(fā)生什么。
新fd上的事件會(huì)不會(huì)在此次epoll_wait調(diào)用中返回?為了穩(wěn)妥起見(jiàn),我們應(yīng)該吧對(duì)同一個(gè)epollfd的操作(添加、刪除、修改等等)都放到同一個(gè)線程中執(zhí)行。
reactor完善版代碼
由于fd的數(shù)量未知,這里設(shè)計(jì)ntyreactor里面包含eventblock,eventblock包含1024個(gè)fd。每個(gè)fd通過(guò)fd/1024定位到在第幾個(gè)eventblock,通過(guò)fd%1024定位到在eventblock第幾個(gè)位置。
#includestdio.h
#includestdlib.h
#includestring.h
#includesys/socket.h
#includesys/epoll.h
#includearpa/inet.h
#includefcntl.h
#includeunistd.h
#includeerrno.h
#defineBUFFER_LENGTH4096
#defineMAX_EPOLL_EVENTS1024
#defineSERVER_PORT8081
#definePORT_COUNT100
typedefint(*NCALLBACK)(int,int,void*);
structntyevent{
intfd;
intevents;
void*arg;
NCALLBACKcallback;
intstatus;
charbuffer[BUFFER_LENGTH];
intlength;
structeventblock{
structeventblock*next;
structntyevent*events;
structntyreactor{
intepfd;
intblkcnt;
structeventblock*evblk;
intrecv_cb(intfd,intevents,void*arg);
intsend_cb(intfd,intevents,void*arg);
structntyevent*ntyreactor_find_event_idx(structntyreactor*reactor,intsockfd);
voidnty_event_set(structntyevent*ev,intfd,NCALLBACK*callback,void*arg){
ev-fd=fd;
ev-callback=callback;
ev-events=0;
ev-arg=arg;
intnty_event_add(intepfd,intevents,structntyevent*ev){
structepoll_eventep_ev={0,{0}};
ep_ev.data.ptr=ev;
ep_ev.events=ev-events=events;
intop;
if(ev-status==1){
op=EPOLL_CTL_MOD;
else{
op=EPOLL_CTL_ADD;
ev-status=1;
if(epoll_ctl(epfd,op,ev-fd,ep_ev)0){
printf("eventaddfailed[fd=%d],events[%d]\n",ev-fd,events);
return-1;
return0;
intnty_event_del(intepfd,structntyevent*ev){
structepoll_eventep_ev={0,{0}};
if(ev-status!=1){
return-1;
ep_ev.data.ptr=ev;
ev-status=0;
epoll_ctl(epfd,EPOLL_CTL_DEL,ev-fd,ep_ev);
return0;
intrecv_cb(intfd,intevents,void*arg){
structntyreactor*reactor=(structntyreactor*)arg;
structntyevent*ev=ntyreactor_find_event_idx(reactor,fd);
intlen=recv(fd,ev-buffer,BUFFER_LENGTH,0);//
nty_event_del(reactor-epfd,ev);
if(len0){
ev-length=len;
ev-buffer[len]='\0';
//printf("recv[%d]:%s\n",fd,ev-buffer);
printf("recvfd=[%d\n",fd);
nty_event_set(ev,fd,send_cb,reactor);
nty_event_add(reactor-epfd,EPOLLOUT,ev);
elseif(len==0){
close(ev-
//printf("[fd=%d]pos[%ld],closed\n",fd,ev-reactor-events);
else{
close(ev-
//printf("recv[fd=%d]error[%d]:%s\n",fd,errno,strerror(errno));
returnlen;
intsend_cb(intfd,intevents,void*arg){
structntyreactor*reactor=(structntyreactor*)arg;
structntyevent*ev=ntyreactor_find_event_idx(reactor,fd);
intlen=send(fd,ev-buffer,ev-length,0);
if(len0){
//printf("send[fd=%d],[%d]%s\n",fd,len,ev-buffer);
printf("sendfd=[%d\n]",fd);
nty_event_del(reactor-epfd,ev);
nty_event_set(ev,fd,recv_cb,reactor);
nty_event_add(reactor-epfd,EPOLLIN,ev);
else{
nty_event_del(reactor-epfd,ev);
close(ev-
printf("send[fd=%d]error%s\n",fd,strerror(errno));
returnlen;
intaccept_cb(intfd,intevents,void*arg){//非阻塞
structntyreactor*reactor=(structntyreactor*)arg;
if(reactor==NULL)return-1;
structsockaddr_inclient_addr;
socklen_tlen=sizeof(client_addr);
intclientfd;
if((clientfd=accept(fd,(structsockaddr*)client_addr,len))==-1){
printf("accept:%s\n",strerror(errno));
return-1;
if((fcntl(clientfd,F_SETFL,O_NONBLOCK))0){
printf("%s:fcntlnonblockingfailed,%d\n",__func__,MAX_EPOLL_EVENTS);
return-1;
structntyevent*event=ntyreactor_find_event_idx(reactor,clientfd);
nty_event_set(event,clientfd,recv_cb,reactor);
nty_event_add(reactor-epfd,EPOLLIN,event);
printf("newconnect[%s:%d],pos[%d]\n",
inet_ntoa(client_addr.sin_addr),ntohs(client_addr.sin_port),clientfd);
return0;
intinit_sock(shortport){
intfd=socket(AF_INET,SOCK_STREAM,0);
fcntl(fd,F_SETFL,O_NONBLOCK);
structsockaddr_inserver_addr;
memset(server_addr,0,sizeof(server_addr));
server_addr.sin_family=AF_INET;
server_addr.sin_addr.s_addr=htonl(INADDR_ANY);
server_addr.sin_port=htons(port);
bind(fd,(structsockaddr*)server_addr,sizeof(server_addr));
if(listen(fd,20)0){
printf("listenfailed:%s\n",strerror(errno));
returnfd;
intntyreactor_alloc(structntyreactor*reactor){
if(reactor==NULL)return-1;
if(reactor-evblk==NULL)return-1;
structeventblock*blk=reactor-evblk;
while(blk-next!=NULL){
blk=blk-next;
structntyevent*evs=(structntyevent*)malloc((MAX_EPOLL_EVENTS)*sizeof(structntyevent));
if(evs==NULL){
printf("ntyreactor_allocntyeventsfailed\n");
return-2;
memset(evs,0,(MAX_EPOLL_EVENTS)*sizeof(structntyevent));
structeventblock*block=(structeventblock*)malloc(sizeof(structeventblock));
if(block==NULL){
printf("ntyreactor_alloceventblockfailed\n");
return-2;
memset(block,0,sizeof(structeventblock));
block-events=evs;
block-next=NULL;
blk-next=block;
reactor-blkcnt++;//
return0;
structntyevent*ntyreactor_find_event_idx(structntyreactor*reactor,intsockfd){
intblkidx=sockfd/MAX_EPOLL_EVENTS;
while(blkidx=reactor-blkcnt){
ntyreactor_alloc(reactor);
inti=0;
structeventblock*blk=reactor-evblk;
while(i++blkidxblk!=NULL){
blk=blk-next;
returnblk-events[sockfd%MAX_EPOLL_EVENTS];
intntyreactor_init(structntyreactor*reactor){
if(reactor==NULL)return-1;
memset(reactor,0,sizeof(structntyreactor));
reactor-epfd=epoll_create(1);
if(reactor-epfd=0){
printf("createepfdin%serr%s\n",__func__,strerror(errno));
return-2;
structntyevent*evs=(structntyevent*)malloc((MAX_EPOLL_EVENTS)*sizeof(structntyevent));
if(evs==NULL){
printf("ntyreactor_allocntyeventsfailed\n");
return-2;
memset(evs,0,(MAX_EPOLL_EVENTS)*sizeof(structntyevent));
structeventblock*block=(structeventbl
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 學(xué)校培訓(xùn)機(jī)構(gòu)會(huì)計(jì)制度
- 學(xué)院培訓(xùn)處工作制度
- 新教師教研培訓(xùn)制度
- 水電線路改造日常維護(hù)方案
- 作業(yè)人員崗前培訓(xùn)制度
- 應(yīng)屆畢業(yè)生集中培訓(xùn)制度
- 英語(yǔ)培訓(xùn)班各項(xiàng)管理制度
- 培訓(xùn)與演練工作制度
- 重癥感染培訓(xùn)制度
- 內(nèi)部崗前培訓(xùn)制度
- 特種工安全崗前培訓(xùn)課件
- 新疆維吾爾自治區(qū)普通高中2026屆高二上數(shù)學(xué)期末監(jiān)測(cè)試題含解析
- 2026屆福建省三明市第一中學(xué)高三上學(xué)期12月月考?xì)v史試題(含答案)
- 2026年遼寧金融職業(yè)學(xué)院?jiǎn)握新殬I(yè)技能測(cè)試題庫(kù)附答案解析
- (正式版)DB51∕T 3342-2025 《爐灶用合成液體燃料經(jīng)營(yíng)管理規(guī)范》
- 2026北京海淀初三上學(xué)期期末語(yǔ)文試卷和答案
- 2024-2025學(xué)年北京市東城區(qū)五年級(jí)(上)期末語(yǔ)文試題(含答案)
- 人工智能在醫(yī)療領(lǐng)域的應(yīng)用
- 2025學(xué)年度人教PEP五年級(jí)英語(yǔ)上冊(cè)期末模擬考試試卷(含答案含聽(tīng)力原文)
- 【10篇】新部編五年級(jí)上冊(cè)語(yǔ)文課內(nèi)外閱讀理解專項(xiàng)練習(xí)題及答案
- 南京市雨花臺(tái)區(qū)醫(yī)療保險(xiǎn)管理中心等單位2025年公開(kāi)招聘編外工作人員備考題庫(kù)有完整答案詳解
評(píng)論
0/150
提交評(píng)論