Muduo網絡庫源碼分析(一) EventLoop事件循環(Poller和Channel)


從這一篇博文起,我們開始剖析Muduo網絡庫的源碼,主要結合《Linux多線程服務端編程》和網上的一些學習資料!

(一)TCP網絡編程的本質:三個半事件

1. 連接的建立,包括服務端接受(accept) 新連接和客戶端成功發起(connect) 連接。TCP 連接一旦建立,客戶端和服務端是平等的,可以各自收發數據。

2. 連接的斷開,包括主動斷開(close 或shutdown) 和被動斷開(read(2) 返回0)。

3. 消息到達,文件描述符可讀。這是最為重要的一個事件,對它的處理方式決定了網絡編程的風格(阻塞還是非阻塞,如何處理分包,應用層的緩沖如何設計等等)。

3.5 消息發送完畢,這算半個。對於低流量的服務,可以不必關心這個事件;另外,這里“發送完畢”是指將數據寫入操作系統的緩沖區,將由TCP 協議棧負責數據的發送與重傳,不代表對方已經收到數據。


這其中,最主要的便是第三點: 消息到達,文件描述符可讀。下面我們來仔細分析(順便分析消息發送完畢):


(1)消息到達,文件可讀:

內核接收-> 網絡庫可讀事件觸發-->將數據從內核轉至應用緩沖區(並且回調函數OnMessage根據協議判斷是否是完整的數據包,如果不是立即返回)-->如果完整就取出讀走、解包、處理、發送(read decode compute encode write)

(2)消息發送完畢:

應用緩沖區-->內核緩沖區(可全填)--->觸發發送完成的事件,回調Onwrite。如果內核緩沖區不足以容納數據(高流量的服務),要把數據追加到應用層發送緩沖區中內核數據發送之后,觸發socket可寫事件,應用層-->內核;當全發送至內核時,又會回調Onwrite(可繼續寫)

(二)事件循環類圖


EventLoop類:

EventLoop是對Reactor模式的封裝,由於Muduo的並發原型是 Multiple reactors + threadpool  (one loop per thread + threadpool),所以每個線程最多只能有一個EventLoop對象。EventLoop對象構造的時候,會檢查當前線程是否已經創建了其他EventLoop對象,如果已創建,終止程序(LOG_FATAL),EventLoop類的構造函數會記錄本對象所屬線程(threadld_),創建了EventLoop對象的線程稱為IO線程,其功能是運行事件循環(EventLooploop),啥也不干==

下面是簡化版的EventLoop(內部的Poller尚未實現,只是一個框架)

EventLoop.h

#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H

#include <boost/noncopyable.hpp>
#include <muduo/base/CurrentThread.h>
#include <muduo/base/Thread.h>

namespace muduo
{
namespace net
{
/// Reactor, at most one per thread.
/// This is an interface class, so don't expose too much details.
class EventLoop : boost::noncopyable
{
public:
EventLoop();
~EventLoop(); // force out-line dtor, for scoped_ptr members.
/// Loops forever.
/// Must be called in the same thread as creation of the object.
void loop();
void assertInLoopThread()
{
if (!isInLoopThread())
{
abortNotInLoopThread();
}
}
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

static EventLoop* getEventLoopOfCurrentThread();

private:
void abortNotInLoopThread();

bool looping_; /* atomic */
const pid_t threadId_;// 當前對象所屬線程ID
};

}
}
#endif // MUDUO_NET_EVENTLOOP_H


EventLoop.c

#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h>
#include <poll.h>
using namespace muduo;
using namespace muduo::net;

namespace
{
// 當前線程EventLoop對象指針
// 線程局部存儲
__thread EventLoop* t_loopInThisThread = 0;
}

EventLoop* EventLoop::getEventLoopOfCurrentThread()
{
return t_loopInThisThread;
}

EventLoop::EventLoop()
: looping_(false),
threadId_(CurrentThread::tid())
{
LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
// 如果當前線程已經創建了EventLoop對象,終止(LOG_FATAL)
if (t_loopInThisThread)
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this;
}
}

EventLoop::~EventLoop()
{
t_loopInThisThread = NULL;
}

// 事件循環,該函數不能跨線程調用
// 只能在創建該對象的線程中調用
void EventLoop::loop()
{
assert(!looping_);
// 斷言當前處於創建該對象的線程中
assertInLoopThread();
looping_ = true;
LOG_TRACE << "EventLoop " << this << " start looping";

::poll(NULL, 0, 5*1000);

LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}

void EventLoop::abortNotInLoopThread()
{
LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
<< " was created in threadId_ = " << threadId_
<< ", current thread id = " << CurrentThread::tid();
}


Poller類:

時序圖:



Poller是個抽象類,具體可以是EPollPoller(默認) 或者PollPoller,需要去實現(唯一使用面向對象的一個類)

對於PollPoller來說,存在一個map,用來關聯fd和channel的,我們可以根據fd快速找到對應的channel。一個fd對應一個struct pollfd(pollfd.fd),一個fd 對應一個channel*;這個fd 可以是socket, eventfd, timerfd, signalfd。

Poller的作用是更新IO復用中的channel(IO事件),添加、刪除Channel。我們看一下PollPoller的實現:

PollPoller.h

#ifndef MUDUO_NET_POLLER_POLLPOLLER_H
#define MUDUO_NET_POLLER_POLLPOLLER_H

#include <muduo/net/Poller.h>
#include <map>
#include <vector>

struct pollfd;

namespace muduo
{
namespace net
{

/// IO Multiplexing with poll(2).
class PollPoller : public Poller
{
public:

PollPoller(EventLoop* loop);
virtual ~PollPoller();

virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels);
virtual void updateChannel(Channel* channel);
virtual void removeChannel(Channel* channel);

private:
void fillActiveChannels(int numEvents,
ChannelList* activeChannels) const;

typedef std::vector<struct pollfd> PollFdList;
typedef std::map<int, Channel*> ChannelMap;// key是文件描述符,value是Channel*
PollFdList pollfds_;
ChannelMap channels_;
};

}
}
#endif // MUDUO_NET_POLLER_POLLPOLLER_H

PollPoller.c

#include <muduo/net/poller/PollPoller.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Types.h>
#include <muduo/net/Channel.h>
#include <assert.h>
#include <poll.h>

using namespace muduo;
using namespace muduo::net;

PollPoller::PollPoller(EventLoop* loop)
: Poller(loop)
{
}

PollPoller::~PollPoller()
{
}

Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
// XXX pollfds_ shouldn't change
int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happended";
fillActiveChannels(numEvents, activeChannels);
}
else if (numEvents == 0)
{
LOG_TRACE << " nothing happended";
}
else
{
LOG_SYSERR << "PollPoller::poll()";
}
return now;
}

void PollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
for (PollFdList::const_iterator pfd = pollfds_.begin();
pfd != pollfds_.end() && numEvents > 0; ++pfd)
{
if (pfd->revents > 0)
{
--numEvents;
ChannelMap::const_iterator ch = channels_.find(pfd->fd);
assert(ch != channels_.end());
Channel* channel = ch->second;
assert(channel->fd() == pfd->fd);
channel->set_revents(pfd->revents);
// pfd->revents = 0;
activeChannels->push_back(channel);
}
}
}

void PollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
if (channel->index() < 0)
{
// index < 0說明是一個新的通道
// a new one, add to pollfds_
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
pollfds_.push_back(pfd);
int idx = static_cast<int>(pollfds_.size())-1;
channel->set_index(idx);
channels_[pfd.fd] = channel;
}
else
{
// update existing one
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
struct pollfd& pfd = pollfds_[idx];
assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
// 將一個通道暫時更改為不關注事件,但不從Poller中移除該通道
if (channel->isNoneEvent())
{
// ignore this pollfd
// 暫時忽略該文件描述符的事件
// 這里pfd.fd 可以直接設置為-1
pfd.fd = -channel->fd()-1;// 這樣子設置是為了removeChannel優化
}
}
}

void PollPoller::removeChannel(Channel* channel)
{
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd();
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
assert(channel->isNoneEvent());
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
const struct pollfd& pfd = pollfds_[idx]; (void)pfd;
assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());
size_t n = channels_.erase(channel->fd());
assert(n == 1); (void)n;
if (implicit_cast<size_t>(idx) == pollfds_.size()-1)
{
pollfds_.pop_back();
}
else
{
// 這里移除的算法復雜度是O(1),將待刪除元素與最后一個元素交換再pop_back
int channelAtEnd = pollfds_.back().fd;
iter_swap(pollfds_.begin()+idx, pollfds_.end()-1);
if (channelAtEnd < 0)
{
channelAtEnd = -channelAtEnd-1;
}
channels_[channelAtEnd]->set_index(idx);
pollfds_.pop_back();
}
}
代碼中的幾個技巧都在注釋中標出。

Channel類:

Channel是selectable IO channel,負責注冊與響應IO 事件,它不擁有file descriptor。

Channel是Reactor結構中的“事件”,它自始至終都屬於一個EventLoop(一個EventLoop對應多個Channel,處理多個IO),負責一個文件描述符的IO事件,它包含又文件描述符fd_,但實際上它不擁有fd_,不用負責將其關閉。在Channel類中保存這IO事件的類型以及對應的回調函數,當IO事件發生時,最終會調用到Channel類中的回調函數。Channel類一般不單獨使用,它常常包含在其他類中(Acceptor、Connector、EventLoop、TimerQueue、TcpConnection)使用。Channel類有EventLoop的指針 loop_,通過這個指針可以向EventLoop中添加當前Channel事件。事件類型用events_表示,不同事件類型對應不同回調函數。

以下兩個都由Channel注冊:

Acceptor是被動連接的抽象--->關注監聽套接字的可讀事件,回調handleRead

Connector對主動連接的抽象。

時序圖:


Channel.h

#ifndef MUDUO_NET_CHANNEL_H
#define MUDUO_NET_CHANNEL_H

#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
#include <muduo/base/Timestamp.h>

namespace muduo
{
namespace net
{

class EventLoop;

/// A selectable I/O channel.
/// This class doesn't own the file descriptor.
/// The file descriptor could be a socket,
/// an eventfd, a timerfd, or a signalfd
class Channel : boost::noncopyable
{
public:
typedef boost::function<void()> EventCallback;
typedef boost::function<void(Timestamp)> ReadEventCallback;

Channel(EventLoop* loop, int fd);
~Channel();

void handleEvent(Timestamp receiveTime);
void setReadCallback(const ReadEventCallback& cb)
{ readCallback_ = cb; }
void setWriteCallback(const EventCallback& cb)
{ writeCallback_ = cb; }
void setCloseCallback(const EventCallback& cb)
{ closeCallback_ = cb; }
void setErrorCallback(const EventCallback& cb)
{ errorCallback_ = cb; }

/// Tie this channel to the owner object managed by shared_ptr,
/// prevent the owner object being destroyed in handleEvent.
void tie(const boost::shared_ptr<void>&);

int fd() const { return fd_; }
int events() const { return events_; }
void set_revents(int revt) { revents_ = revt; } // used by pollers
// int revents() const { return revents_; }
bool isNoneEvent() const { return events_ == kNoneEvent; }

void enableReading() { events_ |= kReadEvent; update(); }
// void disableReading() { events_ &= ~kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }
bool isWriting() const { return events_ & kWriteEvent; }

// for Poller
int index() { return index_; }
void set_index(int idx) { index_ = idx; }

// for debug
string reventsToString() const;

void doNotLogHup() { logHup_ = false; }

EventLoop* ownerLoop() { return loop_; }
void remove();

private:
void update();
void handleEventWithGuard(Timestamp receiveTime);

static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;

EventLoop* loop_;// 所屬EventLoop
const int fd_;// 文件描述符,但不負責關閉該文件描述符
int events_;// 關注的事件
int revents_;// poll/epoll返回的事件
int index_;// used by Poller.表示在poll的事件數組中的序號
bool logHup_;// for POLLHUP

boost::weak_ptr<void> tie_;
bool tied_;
bool eventHandling_;// 是否處於處理事件中
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;
};

}
}
#endif // MUDUO_NET_CHANNEL_H
Channel.c

#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <sstream>
#include <poll.h>

using namespace muduo;
using namespace muduo::net;

const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;

Channel::Channel(EventLoop* loop, int fd__)
: loop_(loop),
fd_(fd__),
events_(0),
revents_(0),
index_(-1),
logHup_(true),
tied_(false),
eventHandling_(false)
{
}

Channel::~Channel()
{
assert(!eventHandling_);
}

void Channel::tie(const boost::shared_ptr<void>& obj)
{
tie_ = obj;
tied_ = true;
}

void Channel::update()
{
loop_->updateChannel(this);
}

// 調用這個函數之前確保調用disableAll
void Channel::remove()
{
assert(isNoneEvent());
loop_->removeChannel(this);
}

void Channel::handleEvent(Timestamp receiveTime)
{
boost::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}

void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}

if (revents_ & POLLNVAL)
{
LOG_WARN << "Channel::handle_event() POLLNVAL";
}

if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}

string Channel::reventsToString() const
{
std::ostringstream oss;
oss << fd_ << ": ";
if (revents_ & POLLIN)
oss << "IN ";
if (revents_ & POLLPRI)
oss << "PRI ";
if (revents_ & POLLOUT)
oss << "OUT ";
if (revents_ & POLLHUP)
oss << "HUP ";
if (revents_ & POLLRDHUP)
oss << "RDHUP ";
if (revents_ & POLLERR)
oss << "ERR ";
if (revents_ & POLLNVAL)
oss << "NVAL ";

return oss.str().c_str();
}
這三個類之間的關系不難理解,其實本質就是一個Poll/Epoll,只不過進行了更高的抽象后划分出來的這些類,重點理解博客開頭的那張類圖即可。

參考:

《Muduo使用手冊》

《Linux多線程服務端編程》















注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com