Linux下套接字詳解(十)---epoll模式下的IO多路復用服務器


1 epoll模型簡介


epoll可是當前在Linux下開發大規模並發網絡程序的熱門人選,epoll 在Linux2.6內核中正式引入,和select相似,其實都I/O多路復用技術而已,並沒有什么神秘的。

其實在Linux下設計並發網絡程序,向來不缺少方法,比如典型的Apache模型(Process Per Connection,簡稱PPC),TPC(Thread PerConnection)模型,以及select模型和poll模型,那為何還要再引入Epoll這個東東呢?那還是有得說說的…

2 常用模型的缺點


如果不擺出來其他模型的缺點,怎么能對比出Epoll的優點呢。

2.1 多進程PPC/多線程TPC模型


這兩種模型思想類似,就是讓每一個到來的連接一邊自己做事去,別再來煩我。只是PPC是為它開了一個進程,而TPC開了一個線程。可是別煩我是有代價的,它要時間和空間啊,連接多了之后,那么多的進程/線程切換,這開銷就上來了;

因此這類模型能接受的最大連接數都不會高,一般在幾百個左右。

2.2 select模型-O(n)


多進程多線程的模型龐大而且繁瑣,因此我們出現了select模型

  int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);

void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

select系統調用是用來讓我們的程序監視多個文件句柄(file descrīptor)的狀態變化的。通過select()系統調用來監視多個文件描述符的數組,當select()返回后,該數組中就緒的文件描述符便會被內核修改標志位,使得進程可以獲得這些文件描述符從而進行后續的讀寫操作。

select系統調用是用來讓我們的程序監視多個文件描述符的狀態變化的。程序會停在select這里等待,直到被監視的文件描述符有某一個或多個發生了狀態改變。

select()的機制中提供一fd_set的數據結構,實際上是一long類型的數組,每一個數組元素都能與一打開的文件句柄建立聯系,建立聯系的工作由程序員完成,當調用select()時,由內核根據IO狀態修改fd_set的內容,由此來通知執行了select()的進程哪些Socket或文件可讀可寫。

當某些描述符可以讀寫之后,select返回數據(沒有數據讀寫時,select也會返回,因為select是同步)時就掃描一遍描述符fd_set來查詢那些有數據請求的描述符,並進行處理。時間復雜度為O(n)

因此性能比那些阻塞的多進程或者多線程模型性能提高不少,但是仍然不夠。因為select有很多限制

  1. 最大並發數限制,因為一個進程所打開的FD(文件描述符)是有限制的,由FD_SETSIZE設置(可以查看深入解析為何select最多只能監聽1024個),默認值是1024/2048,因此Select模型的最大並發數就被相應限制了。用戶可以自己修改FD_SETSIZE,然后重新編譯,但是其實,並不推薦這么做

    linux 下 fd_set 是個 1024 位的位圖,每個位代表一個 fd 的值,返回后需要掃描位圖,這也是效率低的原因。性能問題且不提,正確性問題則更值得重視。

    因為這是一個 1024 位的位圖,因此當進程內的 fd 值 >= 1024 時,就會越界,可能會造成崩潰。對於服務器程序,fd >= 1024 很容易達到,只要連接數 + 打開的文件數足夠大即可發生。

    include/linux/posix_types.h:

    #define __FD_SETSIZE 1024
  2. 效率問題,select每次調用都會線性掃描全部的FD集合,這樣效率就會呈現線性下降,把FD_SETSIZE改大的后果就是,大家都慢慢來,什么?都超時了??!!

  3. 內核/用戶空間 內存拷貝問題,如何讓內核把FD消息通知給用戶空間呢?在這個問題上select采取了內存拷貝方法。

2.3 poll模型


poll的實現和select非常相似,只是描述fd集合的方式不同,poll使用pollfd結構而不是select的fd_set結構,其他的都差不多。

他通過注冊一堆事件組,當有事件請求時返回,然后仍然需要輪詢一遍pollfd才能知道查找到對應的文件描述符,數據也需要在內核空間和用戶空間來回拷貝。時間復雜度為O(n)

因此他只解決了select的問題1,但是問題2,3仍然得不帶解決。

3 epoll模型


這里寫圖片描述

3.1 epoll的性能提升


把其他模型逐個批判了一下,再來看看Epoll的改進之處吧,其實把select的缺點反過來那就是Epoll的優點了。

  1. epoll沒有最大並發連接的限制,上限是最大可以打開文件的數目,這個數字一般遠大於2048, 一般來說這個數目和系統內存關系很大,具體數目可以cat /proc/sys/fs/file-max察看。

  2. 效率提升,Epoll最大的優點就在於它只管你“活躍”的連接,而跟連接總數無關,因此在實際的網絡環境中,Epoll的效率就會遠遠高於select和poll。

  3. 內存拷貝,Epoll在這點上使用了“共享內存”,這個內存拷貝也省略了。

3.2 如何解決上述的3個缺點


epoll既然是對select和poll的改進,就避免上述的三個缺點。那epoll都是怎么解決的呢?

在此之前,我們先看一下epoll和select和poll的調用接口上的不同,select和poll都只提供了一個函數——select或者poll函數。

而epoll提供了三個函數,epoll_create,epoll_ctlepoll_wait

  • epoll_create是創建一個epoll句柄;

  • epoll_ctl是注冊要監聽的事件類型;

  • epoll_wait則是等待事件的產生。

3.2.1 支持一個進程打開大數 目的socket描述符(FD)


對於第一個缺點並發數目限制

epoll沒有這個限制,它所支持的FD上限是最大可以打開文件的數目,這個數字一般遠大於2048,舉個例子,在1GB內存的機器上大約是10萬左右,具體數目可以cat /proc/sys/fs/file-max察看,一般來說這個數目和系統內存關系很大。

這里寫圖片描述

select 最不能忍受的是一個進程所打開的FD是有一定限制的,由FD_SETSIZE設置,默認值是2048。對於那些需要支持的上萬連接數目的IM服務器來說顯 然太少了。這時候你一是可以選擇修改這個宏然后重新編譯內核,不過資料也同時指出這樣會帶來網絡效率的下降,二是可以選擇多進程的解決方案(傳統的 Apache方案),不過雖然linux上面創建進程的代價比較小,但仍舊是不可忽視的,加上進程間數據同步遠比不上線程間同步的高效,所以也不是一種完 美的方案。不過 epoll則沒有這個限制,它所支持的FD上限是最大可以打開文件的數目,這個數字一般遠大於2048,舉個例子,在1GB內存的機器上大約是10萬左 右,具體數目可以cat /proc/sys/fs/file-max察看,一般來說這個數目和系統內存關系很大。

3.2.2 IO效率不隨FD數目增加而線性下降


對於第二個缺點輪詢描述符的線性復雜度

epoll的解決方案不像select或poll一樣每次都把current輪流加入fd對應的設備等待隊列中,而只在epoll_ctl時把current掛一遍(這一遍必不可少)並為每個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調函數,而這個回調函數會把就緒的fd加入一個就緒鏈表)。epoll_wait的工作實際上就是在這個就緒鏈表中查看有沒有就緒的f

傳統的select/poll另一個致命弱點就是當你擁有一個很大的socket集合,不過由於網絡延時,任一時間只有部分的socket是”活躍”的, 但是select/poll每次調用都會線性掃描全部的集合,導致效率呈現線性下降。但是epoll不存在這個問題,它只會對”活躍”的socket進行 操作—這是因為在內核實現中epoll是根據每個fd上面的callback函數實現的。那么,只有”活躍”的socket才會主動的去調用 callback函數,其他idle狀態socket則不會,在這點上,epoll實現了一個”偽”AIO,因為這時候推動力在os內核。在一些 benchmark中,如果所有的socket基本上都是活躍的—比如一個高速LAN環境,epoll並不比select/poll有什么效率,相 反,如果過多使用epoll_ctl,效率相比還有稍微的下降。但是一旦使用idle connections模擬WAN環境,epoll的效率就遠在select/poll之上了。

3.2.3 使用mmap加速內核 與用戶空間的消息傳遞


對於第三缺點數據在內核空間和用戶空間的拷貝

epoll的解決方案在epoll_ctl函數中。每次注冊新的事件到epoll句柄中時(在epoll_ctl中指定EPOLL_CTL_ADD),會把所有的fd拷貝進內核,而不是在epoll_wait的時候重復拷貝。epoll保證了每個fd在整個過程中只會拷貝一次。

這點實際上涉及到epoll的具體實現了。無論是select,poll還是epoll都需要內核把FD消息通知給用戶空間,如何避免不必要的內存拷貝就 很重要,在這點上,epoll是通過內核於用戶空間mmap同一塊內存實現的。而如果你想我一樣從2.5內核就關注epoll的話,一定不會忘記手工 mmap這一步的。

3.3 總結


  1. select,poll實現需要自己不斷輪詢所有fd集合,直到設備就緒,期間可能要睡眠和喚醒多次交替。而epoll其實也需要調用epoll_wait不斷輪詢就緒鏈表,期間也可能多次睡眠和喚醒交替,但是它是設備就緒時,調用回調函數,把就緒fd放入就緒鏈表中,並喚醒在epoll_wait中進入睡眠的進程。雖然都要睡眠和交替,但是select和poll在“醒着”的時候要遍歷整個fd集合,而epoll在“醒着”的時候只要判斷一下就緒鏈表是否為空就行了,這節省了大量的CPU時間。這就是回調機制帶來的性能提升。

  2. select,poll每次調用都要把fd集合從用戶態往內核態拷貝一次,並且要把current往設備等待隊列中掛一次,而epoll只要一次拷貝,而且把current往等待隊列上掛也只掛一次(在epoll_wait的開始,注意這里的等待隊列並不是設備等待隊列,只是一個epoll內部定義的等待隊列)。這也能節省不少的開銷。

4 Epoll的使用


4.1 epoll關鍵數據結構


前面提到Epoll速度快和其數據結構密不可分,其關鍵數據結構就是:

structepoll_event {

__uint32_t events; // Epoll events

epoll_data_t data; // User datavariable

};

typedef union epoll_data {

void *ptr;

int fd;

__uint32_t u32;

__uint64_t u64;

} epoll_data_t;

可見epoll_data是一個union結構體,借助於它應用程序可以保存很多類型的信息:fd、指針等等。有了它,應用程序就可以直接定位目標了。

4.2 使用Epoll


首先回憶一下select模型,當有I/O事件到來時,select通知應用程序有事件到了快去處理,而應用程序必須輪詢所有的FD集合,測試每個FD是否有事件發生,並處理事件;代碼像下面這樣:
Epoll的高效和其數據結構的設計是密不可分的,這個下面就會提到。

首先回憶一下select模型,當有I/O事件到來時,select通知應用程序有事件到了快去處理,而應用程序必須輪詢所有的FD集合,測試每個FD是否有事件發生,並處理事件;

代碼像下面這樣:


int res = select(maxfd+1, &readfds, NULL, NULL, 120);
if(res > 0)
{

for(int i = 0; i < MAX_CONNECTION; i++)
{
if(FD_ISSET(allConnection[i],&readfds))
{
handleEvent(allConnection[i]);
}
}
}
// if(res == 0) handle timeout, res < 0 handle error

epoll不僅會告訴應用程序有I/0事件到來,還會告訴應用程序相關的信息,這些信息是應用程序填充的,因此根據這些信息應用程序就能直接定位到事件,而不必遍歷整個FD集合。

intres = epoll_wait(epfd, events, 20, 120);

for(int i = 0; i < res;i++)
{
handleEvent(events[n]);
}

首先通過create_epoll(int maxfds)來創建一個epoll的句柄,其中maxfds為你epoll所支持的最大句柄數。這個函數會返回一個新的epoll句柄,之后的所有操作 將通過這個句柄來進行操作。在用完之后,記得用close()來關閉這個創建出來的epoll句柄。之后在你的網絡主循環里面,每一幀的調用epoll_wait(int epfd, epoll_event events, int max events, int timeout)來查詢所有的網絡接口,看哪一個可以讀,哪一個可以寫了。基本的語法為:

nfds = epoll_wait(kdpfd, events, maxevents, -1);

其中kdpfd為用epoll_create創建之后的句柄,events是一個 epoll_event*的指針,當epoll_wait這個函數操作成功之后,epoll_events里面將儲存所有的讀寫事件。 max_events是當前需要監聽的所有socket句柄數。最后一個timeout是 epoll_wait的超時,為0的時候表示馬上返回,為-1的時候表示一直等下去,直到有事件范圍,為任意正整數的時候表示等這么長的時間,如果一直沒 有事件,則范圍。一般如果網絡主循環是單獨的線程的話,可以用-1來等,這樣可以保證一些效率,如果是和主邏輯在同一個線程的話,則可以用0來保證主循環 的效率。
既然epoll相比select這么好,那么用起來如何呢?會不會很繁瑣啊…先看看下面的三個函數吧,就知道epoll的易用了。

intepoll_create(int size);

生成一個Epoll專用的文件描述符,其實是申請一個內核空間,用來存放你想關注的socket fd上是否發生以及發生了什么事件。size就是你在這個Epoll fd上能關注的最大socket fd數,大小自定,只要內存足夠。

int epoll_ctl(int epfd, intop, int fd, structepoll_event *event);

控制某個Epoll文件描述符上的事件:注冊、修改、刪除。其中參數epfd是epoll_create()創建Epoll專用的文件描述符。相對於select模型中的FD_SET和FD_CLR宏。

int epoll_wait(int epfd,structepoll_event * events,int maxevents,int timeout);

等待I/O事件的發生,返回發生事件數;

功能類似與select函數

參數說明:

參數 描述
epfd 由epoll_create() 生成的Epoll專用的文件描述符
epoll_event 用於回傳代處理事件的數組
maxevents 每次能處理的事件數
timeout 等待I/O事件發生的超時值

4.3 epoll的工作模式


4.3.1 LT和ET模式


令人高興的是,2.6內核的epoll比其2.5開發版本的/dev/epoll簡潔了許多,所以,大部分情況下,強大的東西往往是簡單的。

唯一有點麻煩

epoll對文件描述符的操作有2種模式: LT和ET

模式 名稱 設置 描述
LT Level Trigger, 電平觸發 默認 只有文件描述符號上有未處理的讀寫事件都會通知, 只要存在着事件就會不斷的觸發,直到處理完成
ET Edge Trigger, 邊沿觸發 通過EPOLLET來設置 當且僅當讀寫事件到來時通知, 只觸發一次相同事件或者說只在從非觸發到觸發兩個狀態轉換的時候兒才觸發


LT(level triggered)是缺省的工作方式,並且同時支持block和no-block socket.在這種做法中,內核告訴你一個文件描述符是否就緒了,然后你可以對這個就緒的fd進行IO操作。如果你不作任何操作,內核還是會繼續通知你的,所以,這種模式編程出錯誤可能性要小一點。傳統的select/poll都是這種模型的代表. 這種模式相當於一個效率高的poll

對於采用LT模式工作的文件描述符, 當epoll_wait檢測到其上有事件發生並將此事件通知應用程序后, 應用程序可以不用立即處理該事件. 這樣, 當應用程序下次調用epoll_wait時, epoll_wait還會再次向應用程序通告此事件,直到該事件被處理.

ET (edge-triggered)是高速工作方式,只支持no-block socket。在這種模式下,當描述符從未就緒變為就緒時,內核通過epoll告訴你。然后它會假設你知道文件描述符已經就緒,並且不會再為那個文件描述 符發送更多的就緒通知,直到你做了某些操作導致那個文件描述符不再為就緒狀態了(比如,你在發送,接收或者接收請求,或者發送接收的數據少於一定量時導致 了一個EWOULDBLOCK 錯誤)。但是請注意,如果一直不對這個fd作IO操作(從而導致它再次變成未就緒),內核不會發送更多的通知(only once),不過在TCP協議中,ET模式的加速效用仍需要更多的benchmark確認。

而當往epoll內核事件表中注冊一個文件描述符上的EPOLLET事件時, epoll將以ET模式來操作該文件描述符, ET模式是epoll的高效工作模式. 對於采用ET模式工作的文件描述符, 當epoll_wait檢測到其上有事件發生並將此事件通知應用程序后, 應用程序必須立即處理該事件, 因為后續的epoll_wait調用將不再向應用程序通知該事件.

可見, ET模式在很大程序上降低了同一個epoll事件被重復觸發的次數, 因此效率比LT模式高.


注意

每個使用ET模式的文件描述符都應該是非阻塞的.

如果文件描述符是阻塞的, 那么讀寫操作將會因為沒有后續的事件而一直處於阻塞狀態(飢渴狀態)

4.3.2 示例代碼


// 代碼清單9-3 LT和ET模式
// 參見Linux高性能服務器編程
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10


#define DEFAULT_SERVER_PORT 6666



int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}

void addfd( int epollfd, int fd, bool enable_et )
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
if( enable_et )
{
event.events |= EPOLLET;
}
epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}


/* LT(Level Trigger, 電平觸發)
* 相當於一個效率較高的poll
* 對於采用LT工作模式的文件描述符
* 當epoll_wait檢測到其上時間發生並將此事件通知應用程序后
* 應用程序可以不用立即處理
* 這樣下次調用時, 還會再次向應用程序通知此時間
*/

void lt( epoll_event* events, int number, int epollfd, int listenfd )
{
char buf[ BUFFER_SIZE ];
for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;
if ( sockfd == listenfd )
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
addfd( epollfd, connfd, false );
}
else if ( events[i].events & EPOLLIN )
{
printf( "LT-event trigger once\n" );
memset( buf, '\0', BUFFER_SIZE );
int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
if( ret <= 0 )
{
close( sockfd );
continue;
}
printf( "get %d bytes of content: %s\n", ret, buf );
}
else
{
printf( "something else happened \n" );
}
}
}

/* ET模式的工作流程
* 對於采用ET模式的文件描述符號
* 當epoll_wait檢測到其上由事件發生並將此時間通知應用程序后
* 應用程序應該立即處理該事件
* 因為后續的epoll_wait不會再向應用程序通知這一事件
*/

void et( epoll_event* events, int number, int epollfd, int listenfd )
{
char buf[ BUFFER_SIZE ];
for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;
if ( sockfd == listenfd )
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
addfd( epollfd, connfd, true );
}
else if ( events[i].events & EPOLLIN )
{
/* 這段代碼不會重復觸發
* 因此我嫩循環讀取數據
* 以確保把socket讀緩存中的所有哦數據讀出
*/

printf( "ET-event trigger only once\n" );
while( 1 )
{
memset( buf, '\0', BUFFER_SIZE );
int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
if( ret < 0 )
{
if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
{
printf( "read later\n" );
break;
}
close( sockfd );
break;
}
else if( ret == 0 )
{
close( sockfd );
}
else
{
printf( "get %d bytes of content: %s\n", ret, buf );
}
}
}
else
{
printf( "something else happened \n" );
}
}
}

int main( int argc, char* argv[] )
{
int port = DEFAULT_SERVER_PORT;
char *ip = NULL;

if( argc > 3 )
{
printf( "usage: %s [port_number [ip_address]]\n", basename( argv[0] ) );
return 1;
}
else if (argc == 2)
{
port = atoi( argv[1] );
}
else if(argc == 3)
{
port = atoi( argv[1] );
ip = argv[2];
}

int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
if(ip != NULL)
{
inet_pton( AF_INET, ip, &address.sin_addr );
}
else
{
address.sin_addr.s_addr = AF_INET;
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(port);

int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );

ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );

ret = listen( listenfd, 5 );
assert( ret != -1 );

epoll_event events[ MAX_EVENT_NUMBER ];
int epollfd = epoll_create( 5 );
assert( epollfd != -1 );
addfd( epollfd, listenfd, true );

while( 1 )
{
int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ret < 0 )
{
printf( "epoll failure\n" );
break;
}

//lt( events, ret, epollfd, listenfd );
et( events, ret, epollfd, listenfd );
}

close( listenfd );
return 0;
}

4.4 EPOLLONESHOT事件


4.4.1 EPOLLONESHOT事件簡介


在前面說過,epoll有兩種觸發的方式即LT(水平觸發)和ET(邊緣觸發)兩種,在前者,只要存在着事件就會不斷的觸發,直到處理完成,而后者只觸發一次相同事件或者說只在從非觸發到觸發兩個狀態轉換的時候兒才觸發。

這會出現一種情況,就是即使我們使用ET模式, 一個socket上的某個事件還是可能被觸發多次, 這在並發程序中會引起一個問題.

比如, 一個線程(或者進程)在讀取完某個socket上的數據后開始處理這些數據, 而在處理數據的過程中該socket上又有新數據可讀(EPOLLIN再次被觸發), 此時如果應用程序調度另外一個線程來讀取這些數據, 就會出現兩個線程同時操作一個socket的局面, 這會使程序的健壯性大降低而編程的復雜度大大增加. 這顯然不是我們所期望的.

解決這種現象有兩種方法

  • 一種是在單獨的線程或進程里解析數據,也就是說,接收數據的線程接收到數據后立刻將數據轉移至另外的線程

*第二種方法就是使用EPOLLONESHOT事件. 對於注冊了EPOLLONESHOT事件的文件描述符, 操作系統最多觸發其上注冊的一個可讀, 可寫或者異常事件, 且只觸發一次, 除非我們使用epoll_ctl和函數重置該文件描述符上注冊的EPOLLONESHOT事件.

這樣, 當一個線程在處理某個socket的時候, 其他線程就不可能有機會操作該socket

但是反過來思考, 注冊了EPOLLONESHOT事件的socket一旦被某個線程處理完畢, 該線程就有責任立即重置這個socket上的EPOLLONESHOT事件, 以確保這個socket下一次可讀的時候, 其EPOLLIN事件能被再次觸發, 進而讓其他線程有機會處理這個socket.

4.4.2 EPOLLONESHOT事件示例程序


// 代碼清單9-4 使用EPOLLONESHOT事件
// 參見Linux高性能服務器編程
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024

#define DEFAULT_SERVER_PORT 6666


typedef struct fds_pthread_args
{
int epollfd;
int sockfd;
}fds_pthread_args;

int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}

/* 將fd上的EPOLLIN和EPOLLET事件注冊到epollfd指示的epoll內核事件表中
* 參數oneshot用來指定是否注冊fd上的EPOLLONESHOT事件
* */

void addfd( int epollfd, int fd, bool oneshot )
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
/* 對於注冊了EPOLLONESHOT時間的文件描述符
* 操作系統最多觸發其上注冊的一個可讀, 可寫或者異常事件
* 且只觸發一次 */

if( oneshot )
{
event.events |= EPOLLONESHOT;
}
epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}

/* 重置fd上的事件, 這樣操作后, 盡管fd上的EPOLLONESHOT事件被注冊后,
* 但是操作系統仍然會觸發fd上的EPOLLIN事件
* 且只觸發一次
* */

void reset_oneshot( int epollfd, int fd )
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
}


/* 工作線程 */
void* worker( void* args )
{
fds_pthread_args *_args = (fds_pthread_args *)args;

int sockfd = _args->sockfd;
int epollfd = _args->epollfd;
printf( "start new thread to receive data on fd: %d\n", sockfd );

char buf[ BUFFER_SIZE ];
memset( buf, '\0', BUFFER_SIZE );

/* 循環讀取sockfd上的數據, 直到遇見EAGAIN錯誤 */
while( 1 )
{
int ret = recv( sockfd, buf, BUFFER_SIZE - 1, 0 );
if( ret == 0 )
{
close( sockfd );
printf( "foreiner closed the connection\n" );
break;
}
else if( ret < 0 )
{
/* 首先我們看看recv的返回值:
* EAGAIN、EWOULDBLOCK、EINTR與非阻塞 長連接
* EWOULDBLOCK 用於非阻塞模式,不需要重新讀或者寫
* EINTR 指操作被中斷喚醒,需要重新讀/寫
* 在Linux環境下開發經常會碰到很多錯誤(設置errno),
* 其中EAGAIN是其中比較常見的一個錯誤(比如用在非阻塞操作中)
* 從字面上來看, 是提示再試一次.
* 這個錯誤經常出現在當應用程序進行一些非阻塞(non-blocking)操作
* (對文件或socket)的時候
* 例如,以 O_NONBLOCK的標志打開文件/socket/FIFO,
* 如果你連續做read操作而沒有數據可讀.
* 此時程序不會阻塞起來等待數據准備就緒返回,
* read函數會返回一個錯誤EAGAIN,
* 提示你的應用程序現在沒有數據可讀請稍后再試重新讀數據,
* 對非阻塞socket而言, EAGAIN不是一種錯誤。在VxWorks和Windows上,
* EAGAIN的名字叫做EWOULDBLOCK
*/

if( errno == EAGAIN )
{
reset_oneshot( epollfd, sockfd );
printf( "read later\n" );
break;
}
}
else
{
printf( "get content: %s\n", buf );
/* 休眠5s, 模擬數據處理過程 */
sleep( 5 );
}
}
printf( "end thread receiving data on fd: %d\n", sockfd );

return NULL;
}

int main( int argc, char* argv[] )
{
int port = DEFAULT_SERVER_PORT;
char *ip = NULL;

if( argc > 3)
{
printf( "usage: %s port_number ip_address\n", basename( argv[0] ) );
return 1;
}
else if( argc == 2 )
{
port = atoi(argv[1]);
}
else if(argc == 3)
{
port = atoi(argv[1]);
ip = argv[2];
}

int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
if(ip != NULL)
{
inet_pton( AF_INET, ip, &address.sin_addr );
}
else
{
address.sin_addr.s_addr = INADDR_ANY;
}
address.sin_port = htons( port );

int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );

ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret != -1 );

ret = listen( listenfd, 5 );
assert( ret != -1 );

epoll_event events[ MAX_EVENT_NUMBER ];
int epollfd = epoll_create( 5 );
assert( epollfd != -1 );

/* 注意, 監聽套接字listen上不能注冊EPOLLONESHOT事件,
* 否則應用程序只能處理一個客戶端連接
* 因為由於EPOLLONESHOT被設置
* 后續的客戶端連接請求將不再觸發listenfd的EPOLLIN事件
*/

addfd( epollfd, listenfd, false );

while( 1 )
{
int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ret < 0 )
{
printf( "epoll failure\n" );
break;
}

for ( int i = 0; i < ret; i++ )
{
int sockfd = events[i].data.fd;
if ( sockfd == listenfd )
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );

/* 對每個非監聽文件描述符都注冊EPOLLONEHOT事件 */
addfd( epollfd, connfd, true );
}
else if ( events[i].events & EPOLLIN )
{
pthread_t thread;
fds_pthread_args fds_for_new_worker;

fds_for_new_worker.epollfd = epollfd;
fds_for_new_worker.sockfd = sockfd;

/* 新啟動一個工作縣城為sockfd服務 */
pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
}
else
{
printf( "something else happened \n" );
}
}
}

close( listenfd );
return 0;
}

5 參考


epoll只有epoll_create,epoll_ctl,epoll_wait 3個系統調用

具體用法請參考http://www.xmailserver.org/linux-patches/nio-improve.html

http://www.kegel.com/rn/也有一個完整的例子

Leader/follower模式線程 pool實現,以及和epoll的配合。

Epoll的高效和其數據結構的設計是密不可分的,這個下面就會提到。

我讀過最好的Epoll模型講解

Epoll模型詳解

通過完整示例來理解如何使用 epol

epoll 使用詳解


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com