IO多路復用(select poll epoll)


IO多路復用是高性能網絡編程一個重要的手段。

一,IO多路復用的概念
以前我們用多線程來處理並發的請求,現在可以只用單線程來實現。單線程,通過記錄跟蹤每個每個I/O流(sock)的狀態,來達到同時管理多個I/O流的目的,提高了服務器的吞吐能力。

這里寫圖片描述
如圖所示,IO多路復用,就如同中間的開關,哪個sock就緒就連上開關,達到了單開關處理了多個I/O流的目的。這就是單線程卻能處理多個Sock傳輸數據的IO多路復用模型。(當Sock請求很多時,可以進行分組,一個線程控制一個組,把多線程與IO復用結合使用)

之前的博客介紹了多線程的方式,兩者的區別在於
1. 多線程模型適合於處理短連接,且連接的打開關閉非常頻繁的情況,但不適合處理長連接。
2. 多線程畢竟是要耗費資源的,IO多路復用基本不耗費資源,也不必創建,維護線程,使得系統的開銷大大減小,效率變得更快。(效率與單線程比較,不一定比多線程快)

I/O復用典型使用在下列網絡應用場合:
1. 當客戶處理多個描述符(通常是交互式輸入和網絡套接字)時,必須使用I/O復用。
2. 如果一個TCP服務器既要處理TCP,又要處理UDP,一般就要用I/O復用。
3. 如果一個服務器要處理多個服務或者多個協議,一般就要使用I/O復用。

二,I/O多路復用的實現
在I/O多路復用這個概念被提出以后,select()是第一個實現的(1983年)。

select()原理:

1、使用copy_from_user從用戶空間拷貝fd_set到內核空間
2、注冊回調函數__pollwait
3、遍歷所有fd,調用其對應的poll方法(對於socket,這個poll方法是sock_poll,sock_poll根據情況會調用到tcp_poll,udp_poll或者datagram_poll)
4、以tcp_poll為例,其核心實現就是__pollwait,也就是上面注冊的回調函數。
5、__pollwait的主要工作就是把current(當前進程)掛到設備的等待隊列中,不同的設備有不同的等待隊列,對於tcp_poll來說,其等待隊列是sk->sk_sleep(注意把進程掛到等待隊列中並不代表進程已經睡眠了)。在設備收到一條消息(網絡設備)或填寫完文件數據(磁盤設備)后,會喚醒設備等待隊列上睡眠的進程,這時current便被喚醒了。
6、poll方法返回時會返回一個描述讀寫操作是否就緒的mask掩碼,根據這個mask掩碼給fd_set賦值。
7、如果遍歷完所有的fd,還沒有返回一個可讀寫的mask掩碼,則會調用schedule_timeout是調用select的進程(也就是current)進入睡眠。當設備驅動發生自身資源可讀寫后,會喚醒其等待隊列上睡眠的進程。如果超過一定的超時時間(schedule_timeout指定),還是沒人喚醒,則調用select的進程會重新被喚醒獲得CPU,進而重新遍歷fd,判斷有沒有就緒的fd。
8、把fd_set從內核空間拷貝到用戶空間

如下圖流程所示:
這里寫圖片描述
通俗點說,select將所有fd放入一個集合中,不停輪詢遍歷集合,並將未就緒的fd踢出集合,系統可以通過集合來調動就緒fd,只要fd就緒就會被放入集合處理,可以等價於select管理了所有的fd!

當然select細節還有很多,后面提供代碼分析。先來總結select的不足之處:
1. 每次調用select,都需要把fd集合從用戶態拷貝到內核態,這個開銷在fd很多時會很大。
2. 同時每次調用select都需要在內核遍歷傳遞進來的所有fd,這個開銷在fd很多時也很大。
3. select 如果任何一個sock(I/O stream)出現了數據,select 僅僅會返回,但是並不會告訴你是那個sock上有數據,只能自己一個一個的找,數據大時不方便。
4. select不是線程安全的,不同線程不可對同一sock操作。
5. select支持的文件描述符數量太小了,默認是1024。

select代碼如下:

#include"../unp.h"
#include<malloc.h>

typedef struct server_context_st
{
int cli_cnt;
int clifds[SIZE];
fd_set allfds;
int maxfd;
}server_context_st;

static server_context_st *s_srv_ctx = NULL;

int server_init()
{
s_srv_ctx = (server_context_st*)malloc(sizeof(server_context_st));
if(s_srv_ctx == NULL)
return -1;
memset(s_srv_ctx, 0, sizeof(server_context_st));
for(int i=0; i<SIZE; ++i)
{
s_srv_ctx->clifds[i] = -1;
}
return 0;
}
void server_uninit()
{
if(s_srv_ctx)
{
free(s_srv_ctx);
s_srv_ctx = NULL;
}
}

int create_server_proc(const char *ip, short port)
{
int fd;
fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd == -1)
{
perror("socket");
return -1;
}
struct sockaddr_in addrSer;
addrSer.sin_family = AF_INET;
addrSer.sin_port = htons(port);
addrSer.sin_addr.s_addr = inet_addr(ip);

socklen_t addrlen = sizeof(struct sockaddr);
int res = bind(fd, (struct sockaddr*)&addrSer, addrlen);
if(res == -1)
{
perror("bind");
return -1;
}
listen(fd, LISTENQ);
return fd;
}

int accept_client_proc(int srvfd)
{
struct sockaddr_in addrCli;
socklen_t addrlen = sizeof(struct sockaddr);

int clifd;
ACCEPT:
clifd = accept(srvfd, (struct sockaddr*)&addrCli, &addrlen);
if(clifd == -1)
{
goto ACCEPT;
}
printf("accept a new client: %s:%d\n",inet_ntoa(addrCli.sin_addr),addrCli.sin_port);

int i;
for(i=0; i<SIZE; ++i)
{
if(s_srv_ctx->clifds[i] == -1)
{
s_srv_ctx->clifds[i] = clifd;
s_srv_ctx->cli_cnt++;
break;
}
}
if(i == SIZE)
{
printf("Server Over Load.\n");
return -1;
}
}

void handle_client_msg(int fd, char *buf)
{
printf("recv buf is:> %s\n",buf);
send(fd, buf, strlen(buf)+1, 0);
}

void recv_client_msg(fd_set *readfds)
{
int clifd;
char buffer[256];
int n;
for(int i=0; i<s_srv_ctx->cli_cnt; ++i)
{
clifd = s_srv_ctx->clifds[i];
if(clifd < 0)
continue;
if(FD_ISSET(clifd, readfds))
{
n = recv(clifd, buffer, 256, 0);
if(n <= 0)
{
FD_CLR(clifd, &s_srv_ctx->allfds);
close(clifd);
s_srv_ctx->clifds[i] = -1;
s_srv_ctx->cli_cnt--;
continue;
}

handle_client_msg(clifd, buffer);

}
}
}

int handle_client_proc(int srvfd)
{
int clifd = -1;
int retval = 0;
fd_set *readfds = &s_srv_ctx->allfds;
struct timeval tv;

while(1)
{
FD_ZERO(readfds);
FD_SET(srvfd, readfds);
s_srv_ctx->maxfd = srvfd;
tv.tv_sec = 30;
tv.tv_usec = 0;

int i;
for(i=0; i<s_srv_ctx->cli_cnt; ++i)
{
clifd = s_srv_ctx->clifds[i];
FD_SET(clifd, readfds);
s_srv_ctx->maxfd = (clifd > s_srv_ctx->maxfd ? clifd : s_srv_ctx->maxfd);
}

retval = select(s_srv_ctx->maxfd+1, readfds, NULL, NULL, &tv);
if(retval == -1)
{
perror("select");
return -1;
}
if(retval == 0)
{
printf("server time out.\n");
continue;
}

//accept
if(FD_ISSET(srvfd, readfds))
{
accept_client_proc(srvfd);
}
else
{
recv_client_msg(readfds);
}
}
}

int main(int argc, char *argv[])
{
int sockSer;
if(server_init() < 0)
perror("server_init");
sockSer = create_server_proc(IPADDR, PORT);
if(sockSer < 0)
{
perror("create_server_porc");
goto err;
}
handle_client_proc(sockSer);
return 0;
err:
server_uninit();
return -1;

針對select的不足,1997年提出了poll實現IO復用的模型。

poll模型實現:
和select差別不大,最主要的區別在於:
1. **集合方式不同**select將可操作的sock存入FD_SETSIZE集合中,內核默認32*32=1024。而poll是將對應fd列表由數組保存,大小沒有限制。所以poll將可操作的sock數量提升至無限大。
2. **結構不同**select將fd放入集合中后,每次輪詢將未就緒的fd刪除,有新的就緒的再加入。而poll處理是將每個fd對應的狀態更改,就緒為1,未就緒為0,這樣就不用每次輪詢刪除加入fd了。

poll其他與select類似,select的問題除了限制大小poll都存在,比如輪詢效率低下,不是線程安全的,沒有詳細的sock信息。代碼如下:

#include"../unp.h"
#include<stdlib.h>

int sock_bind(const char *ip, short port)
{
int fd;
fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addrSer;
addrSer.sin_family = AF_INET;
addrSer.sin_port = htons(port);
addrSer.sin_addr.s_addr = inet_addr(ip);

socklen_t addrlen = sizeof(struct sockaddr);
bind(fd, (struct sockaddr*)&addrSer, addrlen);
return fd;
}

void handle_connection(struct pollfd *connfds, int num)
{
int n;
char buf[256];
for(int i=1; i<= num; ++i)
{
if(connfds[i].fd == -1)
continue;
if(connfds[i].revents & POLLIN)
{
n = recv(connfds[i].fd, buf, 256, 0);
if(n <= 0)
{
close(connfds[i].fd);
connfds[i].fd = -1;
continue;
}
printf("recv msg:>%s\n",buf);
send(connfds[i].fd, buf, n, 0);
}
}
}

void do_poll(int sockSer)
{
pollfd clientfds[OPEN_SIZE];
clientfds[0].fd = sockSer;
clientfds[0].events = POLLIN;

for(int i=1; i<OPEN_SIZE; ++i)
clientfds[i].fd = -1;

int maxi = 0;
int nready;
struct sockaddr_in addrCli;
socklen_t addrlen = sizeof(struct sockaddr);
int i;
for(;;)
{
nready = poll(clientfds, maxi+1,-1);
if(nready == -1)
{
perror("poll");
exit(1);
}
if(clientfds[0].revents & POLLIN)
{
int sockConn = accept(sockSer, (struct sockaddr*)&addrCli, &addrlen);
if(sockConn == -1)
{
perror("accept");
continue;
}
printf("accept a new client:%s:%d\n",inet_ntoa(addrCli.sin_addr),addrCli.sin_port);

for(i=1; i<OPEN_SIZE; ++i)
{
if(clientfds[i].fd < 0)
{
clientfds[i].fd = sockConn;
break;
}
}
if(i == OPEN_SIZE)
{
printf("Server Over Load.\n");
continue;
}
clientfds[i].events = POLLIN;
maxi = (i > maxi ? i : maxi);
if(--nready <= 0)
continue;
}
handle_connection(clientfds, maxi);
}
}

int main()
{
int sockSer;
sockSer = sock_bind(IPADDR, PORT);
listen(sockSer, LISTENQ);
do_poll(sockSer);
return 0;
}

終於,2002年實現了epoll模式!
epoll可以說是I/O多路復用最新的實現,epoll修復了poll和select絕大部分問題,比如:
epoll 是線程安全的。
epoll 對socket的數量無限制。
epoll 現在不僅告訴sock組里面數據,還會告訴具體哪個sock有數據。
epoll 不是效率低下的輪詢模式,是觸發模式。

epoll原理是:
epoll不同於select和poll的輪詢,只有注冊新的fd到epoll句柄中時,才會把所有新的fd拷貝進內核,epoll保證每個fd在整個過程中只被拷貝一次。並且為每個fd指定一個回調函數,當fd就緒時,回調函數就會將就緒的fd加入一個就緒鏈表中。epoll_wait的工作就是阻塞查看就緒鏈表中有沒有就緒的fd。而如果沒有就緒fd,就會把進程加入一個等待隊列中。直到有fd就緒,就會調用回調函數,將就緒的fd放入就緒列表中,並喚醒epoll_wail繼續等待,直到又有新的fd就緒。。。
可見,這種方式比輪詢的效率高出很多,又節約了CPU時間資源。

select,poll,epoll的效率測試圖:

代碼實現如下:

橫軸Dead connections 就是鏈接數,縱軸是每秒處理請求的數量。
可以看到,epoll每秒處理請求的數量基本不會隨着鏈接變多而下降的,而select和poll當鏈接數量很多時,效率就低的很多了。

epoll代碼實現如下:

#include"../unp.h"
#include"utili.h"
#include<stdlib.h>

int sock_bind(const char *ip, short port)
{
int fd;
fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addrSer;
addrSer.sin_family = AF_INET;
addrSer.sin_port = htons(port);
addrSer.sin_addr.s_addr = inet_addr(ip);
socklen_t addrlen = sizeof(struct sockaddr);
bind(fd, (struct sockaddr*)&addrSer, addrlen);
return fd;
}

void handle_accept(int epollfd, int listenfd)
{
struct sockaddr_in addrCli;
int sockConn;
socklen_t addrlen = sizeof(struct sockaddr);
sockConn = accept(listenfd, (struct sockaddr*)&addrCli, &addrlen);
if(sockConn == -1)
perror("accept");
else
{
printf("accept a new client:%s:%d\n",inet_ntoa(addrCli.sin_addr),addrCli.sin_port);
add_event(epollfd, sockConn, EPOLLIN);
}
}

void do_read(int epollfd, int fd, char *buf)
{
int nread = read(fd, buf, 256);
if(nread <= 0)
{
printf("Server is Closed.\n");
close(fd);
delete_event(epollfd, fd, EPOLLIN);
}
printf("recv msg:>%s\n",buf);
modify_event(epollfd, fd, EPOLLOUT);
}

void do_write(int epollfd, int fd, char *buf)
{
int nwrite = write(fd, buf, strlen(buf)+1);
if(nwrite <= 0)
{
printf("client is closed.\n");
close(fd);
delete_event(epollfd, fd, EPOLLOUT);
}
else
modify_event(epollfd, fd, EPOLLIN);
}

void handle_events(int epollfd, epoll_event *events, int num, int listenfd, char *buf)
{
int fd;
for(int i=0; i<num; ++i)
{
fd = events[i].data.fd;
if((fd==listenfd) && (events[i].events & EPOLLIN))
handle_accept(epollfd, listenfd);
else if(events[i].events & EPOLLIN)
do_read(epollfd, fd, buf);
else if(events[i].events & EPOLLOUT)
do_write(epollfd, fd, buf);
}
}

void do_epoll(int listenfd)
{
int epollfd;
epoll_event events[1024];
epollfd = epoll_create(FDSIZE);
add_event(epollfd,listenfd, EPOLLIN);
int res;
char buf[256];
for(;;)
{
res = epoll_wait(epollfd, events, 1024,-1);
if(res == -1)
{
perror("epoll_wait");
exit(1);
}
handle_events(epollfd, events, res, listenfd, buf);
}
close(epollfd);
}

int main()
{
int listenfd;
listenfd = sock_bind(IPADDR, PORT);
listen(listenfd, LISTENQ);
do_epoll(listenfd);
return 0;
}

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
  © 2014-2022 ITdaan.com