redis 文件事件模型


參考文獻:

  1. 深入剖析 redis 事件驅動
  2. Redis 中的事件循環
  3. 深入了解epoll (轉)
  4. Redis自己的事件模型 ae
  5. EPOLL(7)
  6. Linux IO模式及 select、poll、epoll詳解
  7. epoll為什么這么快,epoll的實現原理

    概述

    在redis中,對於對於文件事件的處理采用了Reactor模型。總體來說,就是將io多路復用所監聽到的文件去處,並放入一個隊列中依次處理。接下去本文以一個io多路復用的例子開始,一步步還原redis文件事件的運行過程

epoll (本節從Linux IO模式及 select、poll、epoll詳解摘抄)

epoll使用的過程中需要如下的三個接口:

int epoll_create(int size);//創建一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

int epoll_create(int size)

創建一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大,這個參數不同於select()中的第一個參數,給出最大監聽的fd+1的值,參數size並不是限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議。
當創建好epoll句柄后,它就會占用一個fd值,在linux下如果查看/proc/進程id/fd/,是能夠看到這個fd的,所以在使用完epoll后,必須調用close()關閉,否則可能導致fd被耗盡。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函數是對指定描述符fd執行op操作。

  • epfd:是epoll_create()的返回值。
  • op:表示op操作,用三個宏來表示:添加EPOLL_CTL_ADD,刪除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分別添加、刪除和修改對fd的監聽事件。
  • fd:是需要監聽的fd(文件描述符)
  • epoll_event:是告訴內核需要監聽什么事,struct epoll_event結構如下:
struct epoll_event {
  __uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */
};

//events可以是以下幾個宏的集合:
EPOLLIN :表示對應的文件描述符可以讀(包括對端SOCKET正常關閉);
EPOLLOUT:表示對應的文件描述符可以寫;
EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這里應該表示有帶外數據到來);
EPOLLERR:表示對應的文件描述符發生錯誤;
EPOLLHUP:表示對應的文件描述符被掛斷;
EPOLLET: 將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來說的。
EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之后,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列里

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等待epfd上的io事件,最多返回maxevents個事件。
參數events用來從內核得到事件的集合,maxevents告之內核這個events有多大,這個maxevents的值不能大於創建epoll_create()時的size,參數timeout是超時時間(毫秒,0會立即返回,-1將不確定,也有說法說是永久阻塞)。該函數返回需要處理的事件數目,如返回0表示已超時。

一個epoll的示例

有了上面的論述,用一個簡單的例子來說明下epoll的使用(來自http://man7.org/linux/man-pages/man7/epoll.7.html):

#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;

/* Code to set up listening socket, 'listen_sock',
   (socket(), bind(), listen()) omitted */

epollfd = epoll_create1(0);
if (epollfd == -1) {
    perror("epoll_create1");
    exit(EXIT_FAILURE);
}

ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
    perror("epoll_ctl: listen_sock");
    exit(EXIT_FAILURE);
}

for (;;) {
    nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
    if (nfds == -1) {
        perror("epoll_wait");
        exit(EXIT_FAILURE);
    }

    for (n = 0; n < nfds; ++n) {
        if (events[n].data.fd == listen_sock) {
            conn_sock = accept(listen_sock,
                    (struct sockaddr *) &addr, &addrlen);
            if (conn_sock == -1) {
                perror("accept");
                exit(EXIT_FAILURE);
            }
            setnonblocking(conn_sock);
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = conn_sock;
            if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
                        &ev) == -1) {
                perror("epoll_ctl: conn_sock");
                exit(EXIT_FAILURE);
            }
        } else {
            do_use_fd(events[n].data.fd);
        }
    }
}

image
如圖所示,可以看出使用epoll的過程。接下來將介紹redis事件驅動模型中主要涉及的數據結構。

redis事件驅動模型

數據結構

redis事件驅動模型中主要涉及到如下的幾個數據結構:

  1. aeCreateEventLoop
  2. aeApiState
  3. aeFileEvent
  4. aeFiredEvent
    redis事件處理的核心是aeCreateEventLoop結構,如圖可以看出主要的結構體如下:
    image
    ```
    typedef struct aeEventLoop {

    // 目前已注冊的最大描述符
    int maxfd; /* highest file descriptor currently registered */

    // 目前已追蹤的最大描述符
    int setsize; /* max number of file descriptors tracked */

    // 用於生成時間事件 id
    long long timeEventNextId;

    // 最后一次執行時間事件的時間
    time_t lastTime; /* Used to detect system clock skew */

    // 已注冊的文件事件
    aeFileEvent events; / Registered events */

    // 已就緒的文件事件
    aeFiredEvent fired; / Fired events */

    // 時間事件
    aeTimeEvent *timeEventHead;

    // 事件處理器的開關
    int stop;

    // 多路復用庫的私有數據
    void apidata; / This is used for polling API specific data */

    // 在處理事件前要執行的函數
    aeBeforeSleepProc *beforesleep;

} aeEventLoop;

其中aeFileEvent 結構體為已經注冊並需要監聽的事件的結構體。在redis初始化的時候會創建一個 setSize*sizeof(aeFileEvent) 以及一個 setSize*siezeof(aeFiredEvent) 大小的內存,用文件描述符作為其索引。那么這個大小定位多少合適呢?在Linux個中,文件描述符是個有限的資源,當打開一個文件時就會消耗一個文件描述符,當關閉該文件描述符或者程序結束時會釋放該文件描述符資源,從而供其他文件打開操作使用。當文件描述符超過最大值后,打開文件就會出錯。那么這個最大值是多少呢?可以通過/proc/sys/fs/file-max看到系統支持的最大的文件描述符數。通過 ulimit -n 可以看到當前用戶能打開的最大的文件描述符。在我這里的一台8g內存的機器上,系統支持最大的文件描述是365146。而在這台64bit的機器上 sizeof(aeFiredEvent) + sizeof(aeFileEvent) 大小為40byte。按系統最大支持的文件描述符來算,固定消耗內存為14.6M。這樣以文件描述符作為數組的下標來索引,雖然這樣的哈希在接入量不大的情況下會有大量的浪費。但是最多也就浪費14M 的內存,因此這樣的設計是可取的。【4】

typedef struct aeFileEvent {

// 監聽事件類型掩碼,
// 值可以是 AE_READABLE 或 AE_WRITABLE ,
// 或者 AE_READABLE | AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */

// 讀事件處理器
aeFileProc *rfileProc;

// 寫事件處理器
aeFileProc *wfileProc;

// 多路復用庫的私有數據
void *clientData;

} aeFileEvent;


aeFiredEvent結構體是已經監聽到有事件發生的描述符的集合。

typedef struct aeFiredEvent {

// 已就緒文件描述符
int fd;

// 事件類型掩碼,
// 值可以是 AE_READABLE 或 AE_WRITABLE
// 或者是兩者的或
int mask;

} aeFiredEvent;


void *apidata;在ae創建的時候,會被賦值為aeApiState結構體,結構體的定義如下:

typedef struct aeApiState {

// epoll_event 實例描述符
int epfd;

// 事件槽
struct epoll_event *events;

} aeApiState;

可以見得,這個結構體是為了epoll所准備的數據結構。redis可以選擇不同的io多路復用方法。因此 apidata 是個void類型,根據不同的io多路復用庫來選擇。

## Reactor模型的創建與使用
###  aeEventLoop 的創建

aeEventLoop aeCreateEventLoop(int setsize) {
aeEventLoop
eventLoop;
int i;

// 創建事件狀態結構
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;

// 初始化文件事件結構和已就緒文件事件結構數組
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
// 設置數組大小
eventLoop->setsize = setsize;
// 初始化執行最近一次執行時間
eventLoop->lastTime = time(NULL);

// 初始化時間事件結構
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;

eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;

/* Events with mask == AE_NONE are not set. So let's initialize the
 * vector with it. */
// 初始化監聽事件
for (i = 0; i < setsize; i++)
    eventLoop->events[i].mask = AE_NONE;

// 返回事件循環
return eventLoop;

err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}

如下圖所示,可以見得在初始化的時候創建結構體的流程。

graph LR
創建aeFileEvent-->創建aeFireEvent
創建aeFireEvent-->調用aeApiCreate創建aeApiState

函數aeApiCreate則創建了一個epoll所需要的數據結構。

/*

  • 創建一個新的 epoll 實例,並將它賦值給 eventLoop
    /
    static int aeApiCreate(aeEventLoop
    eventLoop) {

    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;

    // 初始化事件槽空間
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
    zfree(state);
    return -1;
    }

    // 創建 epoll 實例
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
    zfree(state->events);
    zfree(state);
    return -1;
    }

    // 賦值給 eventLoop
    eventLoop->apidata = state;
    return 0;
    }

###  aeFileEvent的注冊
在創建了aeEventLoop之后,對於需要監聽的文件描述符需要進行注冊,在aeFileEvent結構體中,可以看到如下的兩個結構aeFileProc *rfileProc和aeFileProc *wfileProc,就是在注冊監聽事件的時候進行賦值的。
函數aeCreateFileEvent執行創建aeFileEvent和添加文件句柄到epoll中。

/*

  • 根據 mask 參數的值,監聽 fd 文件的狀態,
  • 當 fd 可用時,執行 proc 函數
    /
    int aeCreateFileEvent(aeEventLoop
    eventLoop, int fd, int mask,
    aeFileProc proc, void clientData)
    {
    if (fd >= eventLoop->setsize) {
    errno = ERANGE;
    return AE_ERR;
    }

    if (fd >= eventLoop->setsize) return AE_ERR;

    // 取出文件事件結構
    aeFileEvent *fe = &eventLoop->events[fd];

    // 監聽指定 fd 的指定事件
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
    return AE_ERR;

    // 設置文件事件類型,以及事件的處理器
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;

    // 私有數據
    fe->clientData = clientData;

    // 如果有需要,更新事件處理器的最大 fd
    if (fd > eventLoop->maxfd)
    eventLoop->maxfd = fd;

    return AE_OK;
    }

其中aeApiAddEvent函數就是在開頭之中epoll例子中添加一個文件描述符到監聽集合中的方法封裝函數:

/*

  • 關聯給定事件到 fd
    /
    static int aeApiAddEvent(aeEventLoop
    eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;

    /* If the fd was already monitored for some event, we need a MOD
    • operation. Otherwise we need an ADD operation.
    • 如果 fd 沒有關聯任何事件,那么這是一個 ADD 操作。
    • 如果已經關聯了某個/某些事件,那么這是一個 MOD 操作。
      */
      int op = eventLoop->events[fd].mask == AE_NONE ?
      EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    // 注冊事件到 epoll
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events /
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /
    avoid valgrind warning */
    ee.data.fd = fd;

    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;

    return 0;
    }

### 事件驅動模型的運行過程
到這里redis事件驅動的主要數據結構和初始化的方法已經介紹完畢。接下來將展示事件驅動的運行過程。在redis源碼中,省略去其他部分,跟事件驅動相關的代碼如下:
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
/* Create an event handler for accepting new connections in TCP and Unix
 * domain sockets. */
// 為 TCP 連接關聯連接應答(accept)處理器
// 用於接受並應答客戶端的 connect() 調用
for (j = 0; j < server.ipfd_count; j++) {
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR)
        {
            redisPanic(
                "Unrecoverable error creating server.ipfd file event.");
        }
}

// 為本地套接字關聯應答處理器
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
    acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
aeMain(server.el);
其中aeCreateEventLoop和aeCreateFileEvent函數在之前已經介紹過。接下來重點介紹下aeMain函數:

/*

  • 事件處理器的主循環
    /
    void aeMain(aeEventLoop
    eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

    // 如果有需要在事件處理前執行的函數,那么運行它
    if (eventLoop->beforesleep != NULL)
        eventLoop->beforesleep(eventLoop);
    
    // 開始處理事件
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);

    }
    }
    ```
    我們可以看出,aeMain函數中主要調用了aeProcessEvents處理事件,aeProcessEvents中我們略去其他的代碼,主要關注如下的部分:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    ....
    // 處理文件事件,阻塞時間由 tvp 決定
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            // 從已就緒數組中獲取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

           /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            // 讀事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 確保讀/寫事件只能執行其中一個
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 寫事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }

            processed++;
        }
    ....
}

可以看出函數aeProcessEvents調用了aeApiPoll獲取已經就緒的事件。在for循環中,從eventLoop->fired(已經就緒的事件)中取出事件結構體,然后根據是讀時間還是寫事件進行處理。在aeApiPoll中,就可以看到我們熟悉的
epoll_wait的身影。可以見得通過調用系統的epoll_wait函數,然后將已經就緒的事件放入 eventLoop->fired中。

/*
 * 獲取可執行事件
 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    // 等待時間
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

    // 有至少一個事件就緒?
    if (retval > 0) {
        int j;

        // 為已就緒事件設置相應的模式
        // 並加入到 eventLoop 的 fired 數組中
        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;

            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }

    // 返回已就緒事件個數
    return numevents;
}

到這里還有一個疑問,在redis初始化的時候只注冊了tcp和本地套接字的描述符,那么當有個新的客戶端連接進來的時候,是怎么將客戶端的描述符加到監聽隊列里面的呢?答案就在最開始的acceptTcpHandler函數中。在這個函數中依次調用了acceptCommonHandler->createClient->aeCreateFileEvent函數。可以見得當監聽的一個tcp或者本地socket產生了connect 事件的時候,就會依次調用這些函數,然后將新的客戶端端描述符加入監聽中。

總結

redis的事件驅動模型分析就到這里,總體而言還是比較直觀的。這中間也學習了很多,包括epoll的原理等。


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
  © 2014-2022 ITdaan.com 联系我们: