epoll封裝reactor原理剖析示例詳解_第1頁(yè)
epoll封裝reactor原理剖析示例詳解_第2頁(yè)
epoll封裝reactor原理剖析示例詳解_第3頁(yè)
epoll封裝reactor原理剖析示例詳解_第4頁(yè)
epoll封裝reactor原理剖析示例詳解_第5頁(yè)
已閱讀5頁(yè),還剩24頁(yè)未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

第epoll封裝reactor原理剖析示例詳解目錄reactor是什么?reactor模型三個(gè)重要組件與流程分析組件流程將epoll封裝成reactor事件驅(qū)動(dòng)封裝每一個(gè)連接sockfd變成ntyevent封裝epfd和ntyevent變成ntyreactor封裝讀、寫、接收連接等事件對(duì)應(yīng)的操作變成callback給每個(gè)客戶端的ntyevent設(shè)置屬性將ntyevent加入到epoll中由內(nèi)核監(jiān)聽(tīng)將ntyevent從epoll中去除讀事件回調(diào)函數(shù)寫事件回調(diào)函數(shù)接受新連接事件回調(diào)函數(shù)reactor運(yùn)行reactor簡(jiǎn)單版代碼與測(cè)試reactor優(yōu)點(diǎn)reactor多種模型單reactor+單線程模型單reactor+線程池(ThreadPool)模型多reactor+多線程模型多reactor+線程池(ThreadPool)模型注意點(diǎn)reactor完善版代碼

reactor是什么?

本文將由淺入深的介紹reactor,深入淺出的封裝epoll,一步步變成reactor模型,并在文末介紹reactor的四種模型。

reactor是一種高并發(fā)服務(wù)器模型,是一種框架,一個(gè)概念,所以reactor沒(méi)有一個(gè)固定的代碼,可以有很多變種,后續(xù)會(huì)介紹到。

組成:?阻塞的IO(如果是阻塞IO,發(fā)送緩沖區(qū)滿了怎么辦,就阻塞了)+io多路復(fù)?;特征:基于事件循環(huán),以事件驅(qū)動(dòng)或者事件回調(diào)的?式來(lái)實(shí)現(xiàn)業(yè)務(wù)邏輯。

reactor中的IO使用的是select,poll,epoll這些IO多路復(fù)用,使用IO多路復(fù)用系統(tǒng)不必創(chuàng)建維護(hù)大量線程,只使用一個(gè)線程、一個(gè)選擇器就可同時(shí)處理成千上萬(wàn)連接,大大減少了系統(tǒng)開(kāi)銷。

reactor中文譯為反應(yīng)堆,將epoll中的IO變成事件驅(qū)動(dòng),比如讀事件,寫事件。來(lái)了個(gè)讀事件,立馬進(jìn)行反應(yīng),執(zhí)行提前注冊(cè)好的事件回調(diào)函數(shù)。

回想一下普通函數(shù)調(diào)用的機(jī)制:程序調(diào)用某函數(shù),函數(shù)執(zhí)行,程序等待,函數(shù)將結(jié)果和控制權(quán)返回給程序,程序繼續(xù)處理。reactor反應(yīng)堆,是一種事件驅(qū)動(dòng)機(jī)制,和普通函數(shù)調(diào)用的不同之處在于:應(yīng)用程序不是主動(dòng)的調(diào)用某個(gè)API完成處理,而是恰恰相反,reactor逆置了事件處理流程,應(yīng)用程序需要提供相應(yīng)的接口并注冊(cè)到reactor上,如果相應(yīng)的事件發(fā)生,reactor將主動(dòng)調(diào)用應(yīng)用程序注冊(cè)的接口,這些接口又稱為回調(diào)函數(shù)。

說(shuō)白了,reactor就是對(duì)epoll進(jìn)行封裝,進(jìn)行網(wǎng)絡(luò)IO與業(yè)務(wù)的解耦,將epoll管理IO變成管理事件,整個(gè)程序由事件進(jìn)行驅(qū)動(dòng)執(zhí)行。就像下圖一樣,有就緒事件返回,reactor:由事件驅(qū)動(dòng)執(zhí)行對(duì)應(yīng)的回調(diào)函數(shù);epoll:需要自己判斷。

reactor模型三個(gè)重要組件與流程分析

reactor是處理并發(fā)I/O比較常見(jiàn)的一種模式,用于同步I/O,中心思想是將所有要處理的I/O事件注冊(cè)到一個(gè)中心I/O多路復(fù)用器(epoll)上,同時(shí)主線程/進(jìn)程阻塞在多路復(fù)用器上;

一旦有I/O事件到來(lái)或是準(zhǔn)備就緒(文件描述符或socket可讀、寫),多路復(fù)用器返回并將事先注冊(cè)的相應(yīng)I/O事件分發(fā)到對(duì)應(yīng)的處理器中。

組件

reactor模型有三個(gè)重要的組件

多路復(fù)用器:由操作系統(tǒng)提供,在linux上一般是select,poll,epoll等系統(tǒng)調(diào)用。

事件分發(fā)器:將多路復(fù)用器中返回的就緒事件分到對(duì)應(yīng)的處理函數(shù)中。

事件處理器:負(fù)責(zé)處理特定事件的處理函數(shù)。

流程

具體流程:

注冊(cè)相應(yīng)的事件處理器(剛開(kāi)始listenfd注冊(cè)都就緒事件)多路復(fù)用器等待事件事件到來(lái),激活事件分發(fā)器,分發(fā)器調(diào)用事件到對(duì)應(yīng)的處理器事件處理器處理事件,然后注冊(cè)新的事件(比如讀事件,完成讀操作后,根據(jù)業(yè)務(wù)處理數(shù)據(jù),注冊(cè)寫事件,寫事件根據(jù)業(yè)務(wù)響應(yīng)請(qǐng)求;比如listen讀事件,肯定要給新的連接注冊(cè)讀事件)

將epoll封裝成reactor事件驅(qū)動(dòng)

封裝每一個(gè)連接sockfd變成ntyevent

我們知道一個(gè)連接對(duì)應(yīng)一個(gè)文件描述符fd,對(duì)于這個(gè)連接(fd)來(lái)說(shuō),它有自己的事件(讀,寫)。我們將fd都設(shè)置成非阻塞的,所以這里我們需要添加兩個(gè)buffer,至于大小就是看業(yè)務(wù)需求了。

structntyevent{

intfd;//socketfd

intevents;//事件

charsbuffer[BUFFER_LENGTH];//寫緩沖buffer

intslength;

charrbuffer[BUFFER_LENGTH];//讀緩沖buffer

intrlength;

//typedefint(*NtyCallBack)(int,int,void*);

NtyCallBackcallback;//回調(diào)函數(shù)

void*arg;

intstatus;//1MOD0null

封裝epfd和ntyevent變成ntyreactor

我們知道socketfd已經(jīng)被封裝成了ntyevent,那么有多少個(gè)ntyevent呢?這里demo初始化reactor的時(shí)候其實(shí)是將*events指向了一個(gè)1024的ntyevent數(shù)組(按照道理來(lái)說(shuō)客戶端連接可以一直連,不止1024個(gè)客戶端,后續(xù)文章有解決方案,這里從簡(jiǎn))。epfd肯定要封裝進(jìn)行,不用多說(shuō)。

structntyreactor{

intepfd;

structntyevent*events;

//structntyeventevents[1024];

封裝讀、寫、接收連接等事件對(duì)應(yīng)的操作變成callback

前面已經(jīng)說(shuō)了,把事件寫成回調(diào)函數(shù),這里的參數(shù)fd肯定要知道自己的哪個(gè)連接,events是什么事件的意思,arg傳的是ntyreactor(考慮到后續(xù)多線程多進(jìn)程,如果將ntyreactor設(shè)為全局感覺(jué)不太好)

typedefint(*NtyCallBack)(int,int,void*);

intrecv_cb(intfd,intevents,void*arg);

intsend_cb(intfd,intevents,void*arg);

intaccept_cb(intfd,intevents,void*arg);

給每個(gè)客戶端的ntyevent設(shè)置屬性

具兩個(gè)例子,我們知道第一個(gè)socket一定是listenfd,用來(lái)監(jiān)聽(tīng)用的,那么首先肯定是設(shè)置ntyevent的各項(xiàng)屬性。本來(lái)是讀事件,讀完后要改成寫事件,那么必然要把原來(lái)的讀回調(diào)函數(shù)設(shè)置成寫事件回調(diào)。

voidnty_event_set(structntyevent*ev,intfd,NtyCallBackcallback,void*arg){

ev-fd=fd;

ev-callback=callback;

ev-events=0;

ev-arg=arg;

將ntyevent加入到epoll中由內(nèi)核監(jiān)聽(tīng)

intnty_event_add(intepfd,intevents,structntyevent*ntyev){

structepoll_eventev={0,{0}};

ev.data.ptr=ntyev;

ev.events=ntyev-events=events;

intop;

if(ntyev-status==1){

op=EPOLL_CTL_MOD;

else{

op=EPOLL_CTL_ADD;

ntyev-status=1;

if(epoll_ctl(epfd,op,ntyev-fd,ev)0){

printf("eventaddfailed[fd=%d],events[%d],err:%s,err:%d\n",ntyev-fd,events,strerror(errno),errno);

return-1;

return0;

將ntyevent從epoll中去除

intnty_event_del(intepfd,structntyevent*ev){

structepoll_eventep_ev={0,{0}};

if(ev-status!=1){

return-1;

ep_ev.data.ptr=ev;

ev-status=0;

epoll_ctl(epfd,EPOLL_CTL_DEL,ev-fd,ep_ev);

//epoll_ctl(epfd,EPOLL_CTL_DEL,ev-fd,NULL);

return0;

讀事件回調(diào)函數(shù)

這里就是被觸發(fā)的回調(diào)函數(shù),具體代碼要與業(yè)務(wù)結(jié)合,這里的參考意義不大(這里就是讀一次,改成寫事件)

intrecv_cb(intfd,intevents,void*arg){

structntyreactor*reactor=(structntyreactor*)arg;

structntyevent*ntyev=reactor-events[fd];

intlen=recv(fd,ntyev-buffer,BUFFER_LENGTH,0);

nty_event_del(reactor-epfd,ntyev);

if(len0){

ntyev-length=len;

ntyev-buffer[len]='\0';

printf("C[%d]:%s\n",fd,ntyev-buffer);

nty_event_set(ntyev,fd,send_cb,reactor);

nty_event_add(reactor-epfd,EPOLLOUT,ntyev);

elseif(len==0){

close(ntyev-

printf("[fd=%d]pos[%ld],closed\n",fd,ntyev-reactor-events);

else{

close(ntyev-

printf("recv[fd=%d]error[%d]:%s\n",fd,errno,strerror(errno));

returnlen;

寫事件回調(diào)函數(shù)

這里就是被觸發(fā)的回調(diào)函數(shù),具體代碼要與業(yè)務(wù)結(jié)合,這里的參考意義不大(將讀事件讀的數(shù)據(jù)寫回,再改成讀事件,相當(dāng)于echo)

intsend_cb(intfd,intevents,void*arg){

structntyreactor*reactor=(structntyreactor*)arg;

structntyevent*ntyev=reactor-events[fd];

intlen=send(fd,ntyev-buffer,ntyev-length,0);

if(len0){

printf("send[fd=%d],[%d]%s\n",fd,len,ntyev-buffer);

nty_event_del(reactor-epfd,ntyev);

nty_event_set(ntyev,fd,recv_cb,reactor);

nty_event_add(reactor-epfd,EPOLLIN,ntyev);

else{

close(ntyev-

nty_event_del(reactor-epfd,ntyev);

printf("send[fd=%d]error%s\n",fd,strerror(errno));

returnlen;

接受新連接事件回調(diào)函數(shù)

本質(zhì)上就是accept,然后將其加入到epoll監(jiān)聽(tīng)

intaccept_cb(intfd,intevents,void*arg){

structntyreactor*reactor=(structntyreactor*)arg;

if(reactor==NULL)return-1;

structsockaddr_inclient_addr;

socklen_tlen=sizeof(client_addr);

intclientfd;

if((clientfd=accept(fd,(structsockaddr*)client_addr,len))==-1){

printf("accept:%s\n",strerror(errno));

return-1;

printf("clientfd=%d\n",clientfd);

if((fcntl(clientfd,F_SETFL,O_NONBLOCK))0){

printf("%s:fcntlnonblockingfailed,%d\n",__func__,MAX_EPOLL_EVENTS);

return-1;

nty_event_set(reactor-events[clientfd],clientfd,recv_cb,reactor);

nty_event_add(reactor-epfd,EPOLLIN,reactor-events[clientfd]);

printf("newconnect[%s:%d][time:%ld],pos[%d]\n",inet_ntoa(client_addr.sin_addr),ntohs(client_addr.sin_port),

reactor-events[clientfd].last_active,clientfd);

return0;

reactor運(yùn)行

就是將原來(lái)的epoll_wait從main函數(shù)中封裝到ntyreactor_run函數(shù)中

intntyreactor_run(structntyreactor*reactor){

if(reactor==NULL)return-1;

if(reactor-epfd0)return-1;

if(reactor-events==NULL)return-1;

structepoll_eventevents[MAX_EPOLL_EVENTS];

intcheckpos=0,i;

while(1){

intnready=epoll_wait(reactor-epfd,events,MAX_EPOLL_EVENTS,1000);

if(nready0){

printf("epoll_waiterror,exit\n");

continue;

for(i=0;inready;i++){

structntyevent*ev=(structntyevent*)events[i].data.ptr;

ev-callback(ev-fd,events[i].events,ev-arg);

reactor簡(jiǎn)單版代碼與測(cè)試

后續(xù)會(huì)出一篇測(cè)試百萬(wàn)連接數(shù)量的文章

#includestdio.h

#includestdlib.h

#includestring.h

#includesys/socket.h

#includesys/epoll.h

#includearpa/inet.h

#includefcntl.h

#includeunistd.h

#includeerrno.h

#includetime.h

#defineBUFFER_LENGTH4096

#defineMAX_EPOLL_EVENTS1024

#defineSERVER_PORT8082

typedefint(*NtyCallBack)(int,int,void*);

structntyevent{

intfd;

intevents;

void*arg;

NtyCallBackcallback;

intstatus;//1MOD0null

charbuffer[BUFFER_LENGTH];

intlength;

longlast_active;

structntyreactor{

intepfd;

structntyevent*events;

intrecv_cb(intfd,intevents,void*arg);

intsend_cb(intfd,intevents,void*arg);

intaccept_cb(intfd,intevents,void*arg);

voidnty_event_set(structntyevent*ev,intfd,NtyCallBackcallback,void*arg){

ev-fd=fd;

ev-callback=callback;

ev-events=0;

ev-arg=arg;

ev-last_active=time(NULL);

intnty_event_add(intepfd,intevents,structntyevent*ntyev){

structepoll_eventev={0,{0}};

ev.data.ptr=ntyev;

ev.events=ntyev-events=events;

intop;

if(ntyev-status==1){

op=EPOLL_CTL_MOD;

else{

op=EPOLL_CTL_ADD;

ntyev-status=1;

if(epoll_ctl(epfd,op,ntyev-fd,ev)0){

printf("eventaddfailed[fd=%d],events[%d],err:%s,err:%d\n",ntyev-fd,events,strerror(errno),errno);

return-1;

return0;

intnty_event_del(intepfd,structntyevent*ev){

structepoll_eventep_ev={0,{0}};

if(ev-status!=1){

return-1;

ep_ev.data.ptr=ev;

ev-status=0;

epoll_ctl(epfd,EPOLL_CTL_DEL,ev-fd,ep_ev);

//epoll_ctl(epfd,EPOLL_CTL_DEL,ev-fd,NULL);

return0;

intrecv_cb(intfd,intevents,void*arg){

structntyreactor*reactor=(structntyreactor*)arg;

structntyevent*ntyev=reactor-events[fd];

intlen=recv(fd,ntyev-buffer,BUFFER_LENGTH,0);

nty_event_del(reactor-epfd,ntyev);

if(len0){

ntyev-length=len;

ntyev-buffer[len]='\0';

printf("C[%d]:%s\n",fd,ntyev-buffer);

nty_event_set(ntyev,fd,send_cb,reactor);

nty_event_add(reactor-epfd,EPOLLOUT,ntyev);

elseif(len==0){

close(ntyev-

printf("[fd=%d]pos[%ld],closed\n",fd,ntyev-reactor-events);

else{

close(ntyev-

printf("recv[fd=%d]error[%d]:%s\n",fd,errno,strerror(errno));

returnlen;

intsend_cb(intfd,intevents,void*arg){

structntyreactor*reactor=(structntyreactor*)arg;

structntyevent*ntyev=reactor-events[fd];

intlen=send(fd,ntyev-buffer,ntyev-length,0);

if(len0){

printf("send[fd=%d],[%d]%s\n",fd,len,ntyev-buffer);

nty_event_del(reactor-epfd,ntyev);

nty_event_set(ntyev,fd,recv_cb,reactor);

nty_event_add(reactor-epfd,EPOLLIN,ntyev);

else{

close(ntyev-

nty_event_del(reactor-epfd,ntyev);

printf("send[fd=%d]error%s\n",fd,strerror(errno));

returnlen;

intaccept_cb(intfd,intevents,void*arg){

structntyreactor*reactor=(structntyreactor*)arg;

if(reactor==NULL)return-1;

structsockaddr_inclient_addr;

socklen_tlen=sizeof(client_addr);

intclientfd;

if((clientfd=accept(fd,(structsockaddr*)client_addr,len))==-1){

printf("accept:%s\n",strerror(errno));

return-1;

printf("clientfd=%d\n",clientfd);

if((fcntl(clientfd,F_SETFL,O_NONBLOCK))0){

printf("%s:fcntlnonblockingfailed,%d\n",__func__,MAX_EPOLL_EVENTS);

return-1;

nty_event_set(reactor-events[clientfd],clientfd,recv_cb,reactor);

nty_event_add(reactor-epfd,EPOLLIN,reactor-events[clientfd]);

printf("newconnect[%s:%d][time:%ld],pos[%d]\n",inet_ntoa(client_addr.sin_addr),ntohs(client_addr.sin_port),

reactor-events[clientfd].last_active,clientfd);

return0;

intinit_sock(shortport){

intfd=socket(AF_INET,SOCK_STREAM,0);

structsockaddr_inserver_addr;

memset(server_addr,0,sizeof(server_addr));

server_addr.sin_family=AF_INET;

server_addr.sin_addr.s_addr=htonl(INADDR_ANY);

server_addr.sin_port=htons(port);

bind(fd,(structsockaddr*)server_addr,sizeof(server_addr));

if(listen(fd,20)0){

printf("listenfailed:%s\n",strerror(errno));

returnfd;

intntyreactor_init(structntyreactor*reactor){

if(reactor==NULL)return-1;

memset(reactor,0,sizeof(structntyreactor));

reactor-epfd=epoll_create(1);

if(reactor-epfd=0){

printf("createepfdin%serr%s\n",__func__,strerror(errno));

return-2;

reactor-events=(structntyevent*)malloc((MAX_EPOLL_EVENTS)*sizeof(structntyevent));

memset(reactor-events,0,(MAX_EPOLL_EVENTS)*sizeof(structntyevent));

if(reactor-events==NULL){

printf("createeplleventsin%serr%s\n",__func__,strerror(errno));

close(reactor-epfd);

return-3;

return0;

intntyreactor_destory(structntyreactor*reactor){

close(reactor-epfd);

free(reactor-events);

intntyreactor_addlistener(structntyreactor*reactor,intsockfd,NtyCallBackacceptor){

if(reactor==NULL)return-1;

if(reactor-events==NULL)return-1;

nty_event_set(reactor-events[sockfd],sockfd,acceptor,reactor);

nty_event_add(reactor-epfd,EPOLLIN,reactor-events[sockfd]);

return0;

_Noreturnintntyreactor_run(structntyreactor*reactor){

if(reactor==NULL)return-1;

if(reactor-epfd0)return-1;

if(reactor-events==NULL)return-1;

structepoll_eventevents[MAX_EPOLL_EVENTS];

intcheckpos=0,i;

while(1){

//心跳包60s超時(shí)則斷開(kāi)連接

longnow=time(NULL);

for(i=0;i100;i++,checkpos++){

if(checkpos==MAX_EPOLL_EVENTS){

checkpos=0;

if(reactor-events[checkpos].status!=1||checkpos==3){

continue;

longduration=now-reactor-events[checkpos].last_active;

if(duration=60){

close(reactor-events[checkpos].fd);

printf("[fd=%d]timeout\n",reactor-events[checkpos].fd);

nty_event_del(reactor-epfd,reactor-events[checkpos]);

intnready=epoll_wait(reactor-epfd,events,MAX_EPOLL_EVENTS,1000);

if(nready0){

printf("epoll_waiterror,exit\n");

continue;

for(i=0;inready;i++){

structntyevent*ev=(structntyevent*)events[i].data.ptr;

ev-callback(ev-fd,events[i].events,ev-arg);

intmain(intargc,char*argv[]){

intsockfd=init_sock(SERVER_PORT);

structntyreactor*reactor=(structntyreactor*)malloc(sizeof(structntyreactor));

if(ntyreactor_init(reactor)!=0){

return-1;

ntyreactor_addlistener(reactor,sockfd,accept_cb);

ntyreactor_run(reactor);

ntyreactor_destory(reactor);

close(sockfd);

return0;

reactor優(yōu)點(diǎn)

reactor模式是編寫高性能網(wǎng)絡(luò)服務(wù)器的必備技術(shù)之一,它具有如下優(yōu)點(diǎn):

響應(yīng)快,不必為單個(gè)同步時(shí)間所阻塞,雖然reactor本身依然是同步的編程相對(duì)簡(jiǎn)單,可以最大程度的避免復(fù)雜的多線程及同步問(wèn)題,并且避免了多線程/進(jìn)程的切換開(kāi)銷可擴(kuò)展性,可以方便的通過(guò)增加reactor實(shí)例個(gè)數(shù)來(lái)充分利用CPU資源可復(fù)用性,reactor框架本身與具體事件處理邏輯無(wú)關(guān),具有很高的復(fù)用性

reactor模型開(kāi)發(fā)效率上比起直接使用IO復(fù)用要高,它通常是單線程的,設(shè)計(jì)目標(biāo)是希望單線程使用一顆CPU的全部資源,但也有附帶優(yōu)點(diǎn),即每個(gè)事件處理中很多時(shí)候可以不考慮共享資源的互斥訪問(wèn)??墒侨秉c(diǎn)也是明顯的,現(xiàn)在的硬件發(fā)展,已經(jīng)不再遵循摩爾定律,CPU的頻率受制于材料的限制不再有大的提升,而改為是從核數(shù)的增加上提升能力,當(dāng)程序需要使用多核資源時(shí),reactor模型就會(huì)悲劇。

如果程序業(yè)務(wù)很簡(jiǎn)單,例如只是簡(jiǎn)單的訪問(wèn)一些提供了并發(fā)訪問(wèn)的服務(wù),就可以直接開(kāi)啟多個(gè)反應(yīng)堆,每個(gè)反應(yīng)堆對(duì)應(yīng)一顆CPU核心,這些反應(yīng)堆上跑的請(qǐng)求互不相關(guān),這是完全可以利用多核的。例如Nginx這樣的http靜態(tài)服務(wù)器。

reactor多種模型

單reactor+單線程模型

單reactor單線程模型,指的是所有的IO操作(讀,寫,建立連接)都在同一個(gè)線程上面完成

缺點(diǎn):

由于只有一個(gè)線程,因此事件是順序處理的,一個(gè)線程同時(shí)只能做一件事情,事件的優(yōu)先級(jí)得不到保證不能充分利用多核CPU

單reactor+線程池(ThreadPool)模型

相比于單reactor單線程模型,此模型中收到請(qǐng)求后,不在reactor線程計(jì)算,而是使用線程池來(lái)計(jì)算,這會(huì)充分的利用多核CPU。

采用此模式時(shí)有可能存在多個(gè)線程同時(shí)計(jì)算同一個(gè)連接上的多個(gè)請(qǐng)求,算出的結(jié)果的次序是不確定的,所以需要網(wǎng)絡(luò)框架在設(shè)計(jì)協(xié)議時(shí)帶一個(gè)id標(biāo)示,以便以便讓客戶端區(qū)分response對(duì)應(yīng)的是哪個(gè)request。

多reactor+多線程模型

此模式的特點(diǎn)是每個(gè)線程一個(gè)循環(huán),有一個(gè)mainreactor負(fù)責(zé)accept連接,然后把該連接掛在某個(gè)subreactor中,這樣該連接的所有操作都在那個(gè)subreactor所處的線程中完成。

多個(gè)連接可能被分配到多個(gè)線程中,充分利用CPU。在應(yīng)用場(chǎng)景中,reactor的個(gè)數(shù)可以采用固定的個(gè)數(shù),比如跟CPU數(shù)目一致。

此模型與單reactor多線程模型相比,減少了進(jìn)出threadpool兩次上下文切換,小規(guī)模的計(jì)算可以在當(dāng)前IO線程完成并且返回結(jié)果,降低響應(yīng)的延遲。

并可以有效防止當(dāng)IO壓力過(guò)大時(shí)一個(gè)reactor處理能力飽和問(wèn)題。

多reactor+線程池(ThreadPool)模型

此模型是上面兩個(gè)的混合體,它既使用多個(gè)reactors來(lái)處理IO,又使用線程池來(lái)處理計(jì)算。此模式適適合既有突發(fā)IO(利用MultipleReactor分擔(dān)),又有突發(fā)計(jì)算的應(yīng)用(利用線程池把一個(gè)連接上的計(jì)算任務(wù)分配給多個(gè)線程)。

注意點(diǎn)

注意:

前面介紹的四種reactor模式在具體實(shí)現(xiàn)時(shí)為了簡(jiǎn)應(yīng)該遵循的原則是:每個(gè)文件描述符只由一個(gè)線程操作。

這樣可以輕輕松松解決消息收發(fā)的順序性問(wèn)題,也避免了關(guān)閉文件描述符的各種racecondition。一個(gè)線程可以操作多個(gè)文件描述符,但是一個(gè)線程不能操作別的線程擁有的文件描述符。

這一點(diǎn)不難做到。epoll也遵循了相同的原則。Linux文檔中并沒(méi)有說(shuō)明,當(dāng)一個(gè)線程證阻塞在epoll_wait時(shí),另一個(gè)線程往epollfd添加一個(gè)新的監(jiān)控fd會(huì)發(fā)生什么。

新fd上的事件會(huì)不會(huì)在此次epoll_wait調(diào)用中返回?為了穩(wěn)妥起見(jiàn),我們應(yīng)該吧對(duì)同一個(gè)epollfd的操作(添加、刪除、修改等等)都放到同一個(gè)線程中執(zhí)行。

reactor完善版代碼

由于fd的數(shù)量未知,這里設(shè)計(jì)ntyreactor里面包含eventblock,eventblock包含1024個(gè)fd。每個(gè)fd通過(guò)fd/1024定位到在第幾個(gè)eventblock,通過(guò)fd%1024定位到在eventblock第幾個(gè)位置。

#includestdio.h

#includestdlib.h

#includestring.h

#includesys/socket.h

#includesys/epoll.h

#includearpa/inet.h

#includefcntl.h

#includeunistd.h

#includeerrno.h

#defineBUFFER_LENGTH4096

#defineMAX_EPOLL_EVENTS1024

#defineSERVER_PORT8081

#definePORT_COUNT100

typedefint(*NCALLBACK)(int,int,void*);

structntyevent{

intfd;

intevents;

void*arg;

NCALLBACKcallback;

intstatus;

charbuffer[BUFFER_LENGTH];

intlength;

structeventblock{

structeventblock*next;

structntyevent*events;

structntyreactor{

intepfd;

intblkcnt;

structeventblock*evblk;

intrecv_cb(intfd,intevents,void*arg);

intsend_cb(intfd,intevents,void*arg);

structntyevent*ntyreactor_find_event_idx(structntyreactor*reactor,intsockfd);

voidnty_event_set(structntyevent*ev,intfd,NCALLBACK*callback,void*arg){

ev-fd=fd;

ev-callback=callback;

ev-events=0;

ev-arg=arg;

intnty_event_add(intepfd,intevents,structntyevent*ev){

structepoll_eventep_ev={0,{0}};

ep_ev.data.ptr=ev;

ep_ev.events=ev-events=events;

intop;

if(ev-status==1){

op=EPOLL_CTL_MOD;

else{

op=EPOLL_CTL_ADD;

ev-status=1;

if(epoll_ctl(epfd,op,ev-fd,ep_ev)0){

printf("eventaddfailed[fd=%d],events[%d]\n",ev-fd,events);

return-1;

return0;

intnty_event_del(intepfd,structntyevent*ev){

structepoll_eventep_ev={0,{0}};

if(ev-status!=1){

return-1;

ep_ev.data.ptr=ev;

ev-status=0;

epoll_ctl(epfd,EPOLL_CTL_DEL,ev-fd,ep_ev);

return0;

intrecv_cb(intfd,intevents,void*arg){

structntyreactor*reactor=(structntyreactor*)arg;

structntyevent*ev=ntyreactor_find_event_idx(reactor,fd);

intlen=recv(fd,ev-buffer,BUFFER_LENGTH,0);//

nty_event_del(reactor-epfd,ev);

if(len0){

ev-length=len;

ev-buffer[len]='\0';

//printf("recv[%d]:%s\n",fd,ev-buffer);

printf("recvfd=[%d\n",fd);

nty_event_set(ev,fd,send_cb,reactor);

nty_event_add(reactor-epfd,EPOLLOUT,ev);

elseif(len==0){

close(ev-

//printf("[fd=%d]pos[%ld],closed\n",fd,ev-reactor-events);

else{

close(ev-

//printf("recv[fd=%d]error[%d]:%s\n",fd,errno,strerror(errno));

returnlen;

intsend_cb(intfd,intevents,void*arg){

structntyreactor*reactor=(structntyreactor*)arg;

structntyevent*ev=ntyreactor_find_event_idx(reactor,fd);

intlen=send(fd,ev-buffer,ev-length,0);

if(len0){

//printf("send[fd=%d],[%d]%s\n",fd,len,ev-buffer);

printf("sendfd=[%d\n]",fd);

nty_event_del(reactor-epfd,ev);

nty_event_set(ev,fd,recv_cb,reactor);

nty_event_add(reactor-epfd,EPOLLIN,ev);

else{

nty_event_del(reactor-epfd,ev);

close(ev-

printf("send[fd=%d]error%s\n",fd,strerror(errno));

returnlen;

intaccept_cb(intfd,intevents,void*arg){//非阻塞

structntyreactor*reactor=(structntyreactor*)arg;

if(reactor==NULL)return-1;

structsockaddr_inclient_addr;

socklen_tlen=sizeof(client_addr);

intclientfd;

if((clientfd=accept(fd,(structsockaddr*)client_addr,len))==-1){

printf("accept:%s\n",strerror(errno));

return-1;

if((fcntl(clientfd,F_SETFL,O_NONBLOCK))0){

printf("%s:fcntlnonblockingfailed,%d\n",__func__,MAX_EPOLL_EVENTS);

return-1;

structntyevent*event=ntyreactor_find_event_idx(reactor,clientfd);

nty_event_set(event,clientfd,recv_cb,reactor);

nty_event_add(reactor-epfd,EPOLLIN,event);

printf("newconnect[%s:%d],pos[%d]\n",

inet_ntoa(client_addr.sin_addr),ntohs(client_addr.sin_port),clientfd);

return0;

intinit_sock(shortport){

intfd=socket(AF_INET,SOCK_STREAM,0);

fcntl(fd,F_SETFL,O_NONBLOCK);

structsockaddr_inserver_addr;

memset(server_addr,0,sizeof(server_addr));

server_addr.sin_family=AF_INET;

server_addr.sin_addr.s_addr=htonl(INADDR_ANY);

server_addr.sin_port=htons(port);

bind(fd,(structsockaddr*)server_addr,sizeof(server_addr));

if(listen(fd,20)0){

printf("listenfailed:%s\n",strerror(errno));

returnfd;

intntyreactor_alloc(structntyreactor*reactor){

if(reactor==NULL)return-1;

if(reactor-evblk==NULL)return-1;

structeventblock*blk=reactor-evblk;

while(blk-next!=NULL){

blk=blk-next;

structntyevent*evs=(structntyevent*)malloc((MAX_EPOLL_EVENTS)*sizeof(structntyevent));

if(evs==NULL){

printf("ntyreactor_allocntyeventsfailed\n");

return-2;

memset(evs,0,(MAX_EPOLL_EVENTS)*sizeof(structntyevent));

structeventblock*block=(structeventblock*)malloc(sizeof(structeventblock));

if(block==NULL){

printf("ntyreactor_alloceventblockfailed\n");

return-2;

memset(block,0,sizeof(structeventblock));

block-events=evs;

block-next=NULL;

blk-next=block;

reactor-blkcnt++;//

return0;

structntyevent*ntyreactor_find_event_idx(structntyreactor*reactor,intsockfd){

intblkidx=sockfd/MAX_EPOLL_EVENTS;

while(blkidx=reactor-blkcnt){

ntyreactor_alloc(reactor);

inti=0;

structeventblock*blk=reactor-evblk;

while(i++blkidxblk!=NULL){

blk=blk-next;

returnblk-events[sockfd%MAX_EPOLL_EVENTS];

intntyreactor_init(structntyreactor*reactor){

if(reactor==NULL)return-1;

memset(reactor,0,sizeof(structntyreactor));

reactor-epfd=epoll_create(1);

if(reactor-epfd=0){

printf("createepfdin%serr%s\n",__func__,strerror(errno));

return-2;

structntyevent*evs=(structntyevent*)malloc((MAX_EPOLL_EVENTS)*sizeof(structntyevent));

if(evs==NULL){

printf("ntyreactor_allocntyeventsfailed\n");

return-2;

memset(evs,0,(MAX_EPOLL_EVENTS)*sizeof(structntyevent));

structeventblock*block=(structeventbl

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論