基于epoll的多線程網(wǎng)絡服務程序設計_第1頁
基于epoll的多線程網(wǎng)絡服務程序設計_第2頁
基于epoll的多線程網(wǎng)絡服務程序設計_第3頁
基于epoll的多線程網(wǎng)絡服務程序設計_第4頁
基于epoll的多線程網(wǎng)絡服務程序設計_第5頁
已閱讀5頁,還剩8頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領

文檔簡介

第基于epoll的多線程網(wǎng)絡服務程序設計基于epoll的多線程網(wǎng)絡服務程序設計C語言?

采用C語言設計了一個基于epoll的多線程網(wǎng)絡服務程序。每個線程都有一個epoll來捕獲處于這個線程的socket事件。當子線程數(shù)量為0,即只有一個線程,則網(wǎng)絡監(jiān)聽服務與socket消息處理處于同一個epoll。當子線程數(shù)量大于0時,主線程監(jiān)聽socket連接,當有新的連接到來時將其加入到活躍socket數(shù)量最小的子線程的epoll中。

server.h

#ifndefEPOLL_C_SERVER_H

#defineEPOLL_C_SERVER_H

#includestdio.h

#includestdlib.h

#includestring.h

#includeunistd.h

#includeerrno.h

#includesys/epoll.h

#includepthread.h

#includenetinet/in.h

#includesys/types.h

#includesys/socket.h

#defineRESULT_OK0

#defineRESULT_ERROR-1

/**************************************************************************

*Function

:*MSG_HANDLE

*Input

:socket_fd--socket文件描述符

*

:arg--void*參數(shù)

*Output

:

*Return

:1處理成功,繼續(xù)等待下次消息;0處理完畢完畢該連接;-1異常錯誤發(fā)生

*Description:消息處理函數(shù)指針

****************************************************************************/

typedefint(*MSG_HANDLE)(intsocket_fd,void*arg);

typedefstruct

intepoll_fd;

pthread_tthd_fd;

//消息處理函數(shù),各個線程會調(diào)用該函數(shù)進行消息處理

MSG_HANDLEdata_func;

//一個線程里面的有效socket數(shù)量

unsignedintactive_conection_cnt;

//線程互斥鎖,用于實時更新有效socket數(shù)量

pthread_mutex_tthd_mutex;

}socket_thd_struct;

//表示處理socket的子線程

typedefstruct

intepoll_fd;

unsignedshortip_port;

//消息處理函數(shù),當只有一個線程時,會調(diào)用該函數(shù)進行消息處理

MSG_HANDLEdata_func;

//子線程數(shù)量,可以為0,為0表示server與socket處理處于同一個線程

unsignedintsocket_pthread_count;

//子線程結構體指針

socket_thd_struct*socket_thd;

}server_struct;

//一個網(wǎng)絡服務結構體

/**************************************************************************

*Function

:initServerStruct

*Input

:param_port--服務端口號

*

:param_thd_count--子線程數(shù)量,用于處理連接的client

*

:param_handle--socket數(shù)據(jù)處理函數(shù)指針

*Output

:

*Return

:初始化好的server結構體

*Description:初始化server結構體

****************************************************************************/

server_struct*initServerStruct(unsignedshortparam_port,unsignedintparam_thd_count,MSG_HANDLEparam_handle);

/**************************************************************************

*Function

:serverRun

*Input

:param_server--server服務結構體參數(shù)

*Output

:

*Return

:RESULT_OK(0):執(zhí)行成功;RESULT_ERROR(-1):執(zhí)行失敗

*Description:運行網(wǎng)絡服務,監(jiān)聽服務端口

****************************************************************************/

intserverRun(server_struct*param_server);

#endif//EPOLL_C_SERVER_H

server.c

#include"server.h"

staticvoid*socketPthreadRun(void*arg)

socket_thd_struct*pa_sock_st=(socket_thd_struct*)arg;

intactive_counts=0;

structepoll_eventev;

structepoll_eventevents[5];

intret=0;

while(1)

{

//等待讀寫事件的到來

active_counts=epoll_wait(pa_sock_st-epoll_fd,events,5,-1);

fprintf(stdout,"activecount:%d\n",active_counts);

intindex=0;

for(index=0;indexactive_counts;index++)

{

if(events[index].eventsEPOLLERR)//發(fā)生異常錯誤

{

fprintf(stderr,"errorhappened:errno(%d)-%s\n",errno,strerror(errno));

epoll_ctl(pa_sock_st-epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);

close(events[index].data.fd);

pthread_mutex_lock((pa_sock_st-thd_mutex));

pa_sock_st-active_conection_cnt--;

pthread_mutex_unlock((pa_sock_st-thd_mutex));

}

elseif(events[index].eventsEPOLLRDHUP)//對端異常關閉連接

{

fprintf(stdout,"clientclosethissocket\n");

epoll_ctl(pa_sock_st-epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);

close(events[index].data.fd);

pthread_mutex_lock((pa_sock_st-thd_mutex));

pa_sock_st-active_conection_cnt--;

pthread_mutex_unlock((pa_sock_st-thd_mutex));

}

elseif(events[index].eventsEPOLLIN)//讀事件到來,進行消息處理

{

ret=pa_sock_st-data_func(events[index].data.fd,NULL);

if(ret==-1)

{

fprintf(stderr,"clientsocketexceptionhappened\n");

epoll_ctl(pa_sock_st-epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);

close(events[index].data.fd);

pthread_mutex_lock((pa_sock_st-thd_mutex));

pa_sock_st-active_conection_cnt--;

pthread_mutex_unlock((pa_sock_st-thd_mutex));

}

if(ret==0)

{

fprintf(stdout,"clientclosethissocket\n");

epoll_ctl(pa_sock_st-epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);

close(events[index].data.fd);

pthread_mutex_lock((pa_sock_st-thd_mutex));

pa_sock_st-active_conection_cnt--;

pthread_mutex_unlock((pa_sock_st-thd_mutex));

}

elseif(ret==1)

{

}

}

}

}

pthread_exit(NULL);

server_struct*initServerStruct(unsignedshortparam_port,unsignedintparam_thd_count,MSG_HANDLEparam_handle)

server_struct*serv_st=(server_struct*)malloc(sizeof(server_struct));

serv_st-ip_port=param_port;

serv_st-data_func=param_handle;

serv_st-epoll_fd=epoll_create(256);

serv_st-socket_pthread_count=param_thd_count;

serv_st-socket_thd=NULL;

if(serv_st-socket_pthread_count0)

{

fprintf(stdout,"createclientsocketsubthread\n");

serv_st-socket_thd=(socket_thd_struct*)malloc(sizeof(socket_thd_struct)*serv_st-socket_pthread_count);

intindex=0;

for(index=0;indexserv_st-socket_pthread_count;index++)

{

serv_st-socket_thd[index].epoll_fd=epoll_create(256);

serv_st-socket_thd[index].data_func=param_handle;

serv_st-socket_thd[index].active_conection_cnt=0;

serv_st-socket_thd[index].thd_fd=0;

//創(chuàng)建子線程

pthread_create((serv_st-socket_thd[index].thd_fd),NULL,socketPthreadRun,(void*)(serv_st-socket_thd[index]));

//初始化線程互斥鎖

pthread_mutex_init((serv_st-socket_thd[index].thd_mutex),NULL);

}

}

returnserv_st;

intserverRun(server_struct*param_server)

intret=RESULT_OK;

intserver_socket=0;

structsockaddr_inserver_addr;

bzero(server_addr,sizeof(server_addr));

structepoll_eventev;

structepoll_eventevents[5];

intactive_count=0;

intindex=0;

intnew_socket=0;

structsockaddr_inclient_info;

socklen_tclient_info_len=0;

server_addr.sin_family=AF_INET;

server_addr.sin_addr.s_addr=htons(INADDR_ANY);

server_addr.sin_port=htons(param_server-ip_port);

server_socket=socket(PF_INET,SOCK_STREAM,0);

if(server_socket0)

{

fprintf(stderr,"createsocketerror:errno(%d)-%s\n",errno,strerror(errno));

returnRESULT_ERROR;

}

fprintf(stdout,"createserversocketssuccessful\n");

param_server-epoll_fd=epoll_create(256);

ev.data.fd=server_socket;

ev.events=EPOLLIN|EPOLLET;

if(epoll_ctl(param_server-epoll_fd,EPOLL_CTL_ADD,server_socket,ev)!=0)

{

fprintf(stderr,"serversocketaddtoepollerror:errno(%d)-%s\n",errno,strerror(errno));

returnRESULT_ERROR;

}

fprintf(stdout,"serversocketaddtoepollsuccessful\n");

if(bind(server_socket,(structsockaddr*)server_addr,sizeof(server_addr))!=0)

{

fprintf(stderr,"serverbindfailed:errno(%d)-%s\n",errno,strerror(errno));

returnRESULT_ERROR;

}

fprintf(stdout,"serversocketbindsuccessful\n");

if(listen(server_socket,param_server-ip_port)!=0)

{

fprintf(stderr,"serverlistenfailed:errno(%d)-%s\n",errno,strerror(errno));

returnRESULT_ERROR;

}

fprintf(stdout,"serversocketlistensuccessful\n");

while(1)

{

active_count=epoll_wait(param_server-epoll_fd,events,5,-1);

fprintf(stdout,"activecount:%d\n",active_count);

for(index=0;indexactive_count;index++)

{

if(events[index].data.fd==server_socket)//新連接過來

{

fprintf(stdout,"newsocketcomming\n");

client_info_len=sizeof(client_info);

new_socket=accept(server_socket,(structsockaddr*)client_info,client_info_len);

if(new_socket0)

{

fprintf(stderr,"serveracceptfailed:errno(%d)-%s\n",errno,strerror(errno));

continue;

}

fprintf(stdout,"newsocket:%d.%d.%d.%d:%d--connected\n",((unsignedchar*)(client_info.sin_addr))[0],((unsignedchar*)(client_info.sin_addr))[1],((unsignedchar*)(client_info.sin_addr))[2],((unsignedchar*)(client_info.sin_addr))[3],client_info.sin_port);

ev.data.fd=new_socket;

ev.events=EPOLLIN|EPOLLERR|EPOLLRDHUP;

if(param_server-socket_pthread_count==0)

{

epoll_ctl(param_server-epoll_fd,EPOLL_CTL_ADD,new_socket,ev);

}

else

{

inttmp_index=0;

intmix_cnt_thread_id=0;

unsignedintact_cnts=0;

for(tmp_index=0;tmp_indexparam_server-socket_pthread_count;tmp_index++)

{

pthread_mutex_lock((param_server-socket_thd[tmp_index].thd_mutex));

act_cnts=param_server-socket_thd[tmp_index].active_conection_cnt;

pthread_mutex_unlock((param_server-socket_thd[tmp_index].thd_mutex));

if(mix_cnt_thread_idact_cnts)

{

mix_cnt_thread_id=tmp_index;

}

}

epoll_ctl(param_server-socket_thd[mix_cnt_thread_id].epoll_fd,EPOLL_CTL_ADD,new_socket,ev);

pthread_mutex_lock((param_server-socket_thd[mix_cnt_thread_id].thd_mutex));

param_server-socket_thd[mix_cnt_thread_id].active_conection_cnt++;

pthread_mutex_unlock((param_server-socket_thd[mix_cnt_thread_id].thd_mutex));

}

fprintf(stdout,"addnewclientsockettoepoll\n");

}

elseif(events[index].eventsEPOLLERR||events[index].eventsEPOLLRDHUP)//對端關閉連接

{

fprin

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論