前言
在上一节中以拆分的方式学习完 Linux 、C++、网络等知识后,这节会将这三个模块糅合起来,站在项目的基础上再次去学习这三个模块。 Linux 网路编程比较经典的有 Redis、Muduo、TeamTalk等开源项目。本文将以 Muduo 来介绍 网络编程的框架,学习完 Muduo 再去学习其他框架就容易许多了。注意, 文中贴的代码均为核心代码,且注释详细,认真阅读。
下载路径
github 地址: https://github.com/smilew12/muduo.git
项目结构
项目主要分为两个模块:
base 模块: 主要封装互斥锁、条件变量、线程池、日志等基础类;
net 模块: 主要根据 reactor 模型对 Linux 平台下 Epoll 的封装;
本文主要介绍服务器模型,所以只引导读者学习 net 模块, base 模块主要使用 RAII 技法封装的常用类。
epoll 由来
先来说说传统的迭代式服务器模型以及一个连接一个线程的模型的缺点,以下图一个连接一个线程为例说明:
有图中可以看到,进程将阻塞在 accept 函数处,当客户端调用 connect 函数时,accept 返回,当前进程创建一个线程用来处理本次会话的任务。但是创建线程是有一定的开销的,在 32 位 Linux 系统下用户空间有 3G, 一个线程栈 10M, 那么总共可以创建 300 多个线程,同时切换线程也会有时间开销,所以并发连接数不会很高。
IO 多路复用的产生
有没有一种方式:
我将我想要监听的链接、读写等事件全权委托给它;
有连接到来事件的时候通知我? 我直接调用 accept 函数去接收。
有数据到来的时候也通知我? 我直接调用 recv 函数去收取数据。
发送缓冲区有空间时也通知我? 我直接调用 send 函数去发送数据。
为了解决这种相当于代理一样的东西,linux 内核给用户提供了一系列系统调用,当有事件时,该系统调用会返回相应事件,用户只需拿到事件进行处理即可。那么这就是 IO 多路复用函数(select / poll / epoll)
因为本文是将 Linux 平台下的网络编程,所以选择了 Epoll:
epoll_create() 函数用来创建一个代理对象;
epoll_wait() 函数就是当有事件到来时,会返回响应事件的代理;
epoll_ctl() 主要是往这个代理中注册你想要监听的事件;
下面是一个根据 Reactor 模型对 Epoll 的封装例子:
//1)在父进程中,fork返回新创建子进程的进程ID;
//2)在子进程中,fork返回0;
//3)如果出现错误,fork返回一个负值;
pid=fork();
if(pid<0)
{
std::cout<<"fork error"<<std::endl;
exit(-1);
}
//父进程退出,子进程独立运行
elseif(pid>0) {
exit(0);
}
//之前parent和child运行在同一个session里,parent是会话(session)的领头进程,
//parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。
//执行setsid()之后,child将重新获得一个新的会话(session)id。
//这时parent退出之后,将不会影响到child了。
setsid();
intfd;
fd=open("/dev/null",O_RDWR,0);
if(fd!=-1)
{
dup2(fd,STDIN_FILENO);
dup2(fd,STDOUT_FILENO);
dup2(fd,STDERR_FILENO);
}
if(fd>2)
close(fd);
}
intmain(intargc,char*argv[])
{
//设置信号处理
signal(SIGCHLD,SIG_DFL);
signal(SIGPIPE,SIG_IGN);
signal(SIGINT,prog_exit);
signal(SIGKILL,prog_exit);
signal(SIGTERM,prog_exit);
shortport=0;
intch;
boolbdaemon=false;
//根据传入参数,判断是否开启守护进程模式 -d
while((ch=getopt(argc,argv,"p:d"))!=-1)
{
switch(ch)
{
case'd':
bdaemon=true;
break;
case'p':
port=atol(optarg);
break;
}
}
if(bdaemon)
daemon_run();
if(port==0)
port=12345;
//根据ip、port 初始化socket,创建 epoll_wait()
if(!g_reator.init("0.0.0.0",12345))
return-1;
//进入 loop 循环
g_reator.main_loop(&g_reator);
return0;
}
为了方便理解,我画了一个大体流程图,如下所示:
执行流程:
1、创建监听 socket,并绑定、监听;
2、调用 epoll_create() , 创建 epollfd 代理;
3、将想要监听的 listenfd,通过 epoll_ctl() , 挂载到 epollfd 上,让 epollfd 代理监听;
4、在一个 while 循环中,调用 epoll_wait(), 程序阻塞在这里,等待客户端连接到来;
5、当某个客户端连接到来,epoll_wait() 第一次返回的是接受连接的 listenfd, 调用 accept 函数接受连接,将 accpet 返回的 connfd,挂载到 epollfd 上,继续让代理监听 connfd 的读写事件;
6、当再次到达 5 时,如果是接受连接的 listenfd,那么继续 accept, 如果是读写事件,则进行 7, 接受或发送数据。
至此,一个 reactor 模型基本已经完成了,相信你应该已经理解了,大体的流程了。那么在来看 Muduo 是怎养封装这个 epoll 的?
EventLoop
先来介绍最重要的模块 EventLoop : 他是上面第四步的实现:
//在一个循环中让 epoll_wait() 不断检测事件
voidEventLoop::loop() {
while() {
//vector, 用来返回 epoll_wait 中监听到的 有活动的事件(fd)
m_activeChannels.clear();
//m_activeChannels 是一个输入输出参数
m_poller->poll(kPollTimeMs,&m_activeChannels);
//遍历 epoll_wait 返回的结果,然后进行 accept / send / recv
for(ChannelList::iteratorit=m_activeChannels.begin();
it!=m_activeChannels.end();++it)
{
(*it)->handleEvent();
}
//处理其他事件, 此时不必关心
doPendingFunctors();
}
}
Channel
对于上面 epoll_wait() 返回的就绪事件,怎么去处理呢? 那么 Channel 类主要负责把不同的 IO 事件分发给不同的回调,例如 ReadCallback、 WriteCallBack 等;同时提供向 epollfd 中注册可读可写事件的接口。每个 Channel 自始至终只负责一个文件描述符的 IO 事件分发。
//主要处理 epoll_wait 返回的事件,并将他们分发到不同的回调
voidChannel::handleEvent()
{
//如果出错,那么分发到错误的回调
if(m_revents&(POLLERR|POLLNVAL)){
if(m_errorCallBack)m_errorCallBack();
}
//如果是 POLLIN, 说明是数据收发的回调
if(m_revents&(POLLIN|POLLPRI|POLLRDHUP)){
if(m_readCallBack)m_readCallBack();
}
//如果是 POLLOUT, 那么是可写事件的回调
if(m_revents&POLLOUT){
if(m_writeCallBack)m_writeCallBack();
}
}
//设置回调函数,供其他模块注册回调函数
voidsetReadCallback(constReadEventCallback&cb)
{readCallback_=cb; }
voidsetWriteCallback(constEventCallback&cb)
{writeCallback_=cb; }
voidsetCloseCallback(constEventCallback&cb)
{closeCallback_=cb; }
voidsetErrorCallback(constEventCallback&cb)
{errorCallback_=cb; }
//往 epoll 中注册可读事件
voidenableReading() {m_events|=kReadEvent;update(); }
//从 epoll 中移除可读事件
voiddisableReading() {m_events&=~kReadEvent;update(); }
voidenableWriting() {m_events|=kWriteEvent;update(); }
boolisWriting() {returnm_events&=kWriteEvent; }
boolisReading() {returnm_events&=kReadEvent; }
voiddisableWriting() {m_events&=~kWriteEvent;update(); }
//最终都调用 uodate函数,该函数会调用 EventLoop::uodateChannel(), 后者在调用 Poller::updateChannel();
voidChannel::update()
{
m_addedToLoop=true;
p_loop->runInLoop(std::bind(&EventLoop::updateChannel,p_loop,this));
}
Poller
Poller类是 IO multiplexing 的封装。在 Muduo 中是一个抽象类,因为 Muduo 同时支持 poll 和 epoll 两种 IO 多路复用机制,他们是真正调用 epoll_wait() 的地方
TimeStampPoller::poll(inttimeoutMs,ChannelList*activeChannels)
{
//真正的调用 epoll_wait / poll 的地方。
intnumEvents=::poll(/*&*m_pollfds.begin()*/m_pollfds.data(),m_pollfds.size(),timeoutMs);
TimeStampnow(TimeStamp::now());
if(numEvents>0){
//将返回的结果封装成 channel 返回给 Eventloop::loop 函数
fillActiveChannels(numEvents,activeChannels);
}
elseif(numEvents==0){
LOG_TRACE<<" nothing happended";
}
else{
LOG_SYSERR<<"Poller::poll()";
}
returnnow;
}
voidPoller::fillActiveChannels(intnumEvents,ChannelList*activeChannels)const
{
for(PollFdList::const_iteratorpfd=m_pollfds.begin();
pfd!=m_pollfds.end()&&numEvents>0;++pfd)
{
if(pfd->revents>0)
{
--numEvents;
ChannelMap::const_iteratorch=m_channels.find(pfd->fd);
assert(ch!=m_channels.end());
Channel*channel=ch->second;
assert(channel->fd()==pfd->fd);
channel->set_revents(pfd->revents);
//将返回的结果封装成 channel 返回给 Eventloop::loop 函数
activeChannels->push_back(channel);
}
}
}
讲解完 EventLoop、Poller、Channel 和 Poller 后,在从下面的时序图,看看他们的执行流程
上面提到的 Channel::handleEvent() 会将事件分发给注册此回调函数的模块,那么谁都需要注册呢?
接受客户端连接到来的类 ,需要注册 Channel::setReadCallback(), 以调用accept 进行接受;
接受有数据到来的类 ,需要注册 Channel::setReadCallback(), 以调用 recv 接收数据;
检测发送缓冲区是否可写, 需要注册 Channel::WriteCallback(), 以调用 send 函数发送数据;
Acceptor
接受客户端连接到来的类就是 Muduo::net::Acceptor 类
Acceptor::Acceptor(EventLoop*loop,constInetAddress&listenAddr)
:loop_(loop),
//在构造函数中同时创建 acceptsocket,并调用下面的 acceptChannel_.setReadCallback 将acceptSocket添加到 epoll 中,监听可读事件
acceptSocket_(sockets::createNonblockingOrDie()),
acceptChannel_(loop,acceptSocket_.fd()),
listenning_(false),
idleFd_(::open("/dev/null",O_RDONLY|O_CLOEXEC))
{
assert(idleFd_>=0);
acceptSocket_.setReuseAddr(true);
//绑定监听
acceptSocket_.bindAddress(listenAddr);
//注册可读事件,当有事件到来时, 调用 handleRead() 回调接受连接
acceptChannel_.setReadCallback(
boost::bind(&Acceptor::handleRead,this));
}
voidAcceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddresspeerAddr(0);
//真正 调用 accept 的地方
intconnfd=acceptSocket_.accept(&peerAddr);
if(connfd>=0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
if(newConnectionCallback_)
{
//将接受的新连接,分配给 TcpConnection 函数。即代表一路连接
newConnectionCallback_(connfd,peerAddr);
}
else
{
sockets::close(connfd);
}
}
else
{
// Read the section named "The special problem of
// accept()ing when you can't" in libev's doc.
// By Marc Lehmann, author of livev.
if(errno==EMFILE)
{
::close(idleFd_);
idleFd_=::accept(acceptSocket_.fd(),NULL,NULL);
::close(idleFd_);
idleFd_=::open("/dev/null",O_RDONLY|O_CLOEXEC);
}
}
}
下面的时序图将更好的为你展示 Acceptor 的作用:
Acceptor::accpet() 会返回一个 connfd(客户端连接fd),同时会暴露setNewConnectionCallback() 回调函数,供需要使用 connfd 的类使用;那么那个类需要使用 connfd 呢? 首先想一想 connfd 主要是干嘛的? 它是 accpet 返回的文件描述符,可以用来完成数据收发,收取和发送用户的数据,完成具体业务。现在需要一个类去管理(TcpServer) connfd,并且每创建一个 connfd,都会创建这个文件描述符所对应的连接类(TcpConnection).用来管理本次会话。
那么 TcpServer 和 TcpConnection 类就相应而生:
TcpServer
TcpServer 类会注册 Acceptor::setNewConnectionCallback,同时会为每个连接创建一个 TcpConnection 类;
//TcpServer 类
TcpServer::TcpServer(EventLoop*loop,
constInetAddress&listenAddr,
conststring&nameArg)
:loop_(CHECK_NOTNULL(loop)),
hostport_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(newAcceptor(loop,listenAddr)),
threadPool_(newEventLoopThreadPool(loop)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
started_(false),
nextConnId_(1)
{
//在构造函数中 注册 Acceptor 的 connfd 回调函数,同时调用 newconnection 函数
acceptor_->setNewConnectionCallback(
boost::bind(&TcpServer::newConnection,this,_1,_2));
}
voidTcpServer::newConnection(intsockfd,constInetAddress&peerAddr)
{
loop_->assertInLoopThread();
// 按照轮叫的方式选择一个EventLoop
EventLoop*ioLoop=threadPool_->getNextLoop();
charbuf[32];
snprintf(buf,sizeofbuf,":%s#%d",hostport_.c_str(),nextConnId_);
++nextConnId_;
stringconnName=name_+buf;
LOG_INFO<<"TcpServer::newConnection ["<<name_
<<"] - new connection ["<<connName
<<"] from "<<peerAddr.toIpPort();
InetAddresslocalAddr(sockets::getLocalAddr(sockfd));
//每一个连接 新建一个 TcpConnection 类,同时设置他们的用来数据收发的回调函数
TcpConnectionPtrconn(newTcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName]=conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
boost::bind(&TcpServer::removeConnection,this,_1));
// conn->connectEstablished();
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished,conn));
LOG_TRACE<<"[5] usecount="<<conn.use_count();
}
TcpConnection
TcpConnection 类会向 分发事件的类 Channel 注册接受数据和发送数据的回调 setReadCallback、setWriteCallback等,
// TcpConeciton 的构造函数
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024)
{
// 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间
channel_->setReadCallback(
boost::bind(&TcpConnection::handleRead, this, _1));
// 通道可写事件到来的时候,回调TcpConnection::handleWrite
channel_->setWriteCallback(
boost::bind(&TcpConnection::handleWrite, this));
// 连接关闭,回调TcpConnection::handleClose
channel_->setCloseCallback(
boost::bind(&TcpConnection::handleClose, this));
// 发生错误,回调TcpConnection::handleError
channel_->setErrorCallback(
boost::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
<< " fd=" << sockfd;
socket_->setKeepAlive(true);
}
//用于发送数据的函数
void TcpConnection::handleWrite()
//当有事件到来,Channel::handleEvent() 会分发事件,同时提供回调函数, TcpConnection 会注册 setReadCallback 完成真正的数据收发;
void TcpConnection::handleRead(Timestamp receiveTime)
由于篇幅有限,本文先介绍了 Muduo 中大体的类, 还有 Buffer 类将会在下文介绍。
总结
读者认真阅读上文中的所有类,结合代码注释,先了解 Reactor 模型的大体工作流程,然后再了解 Muduo 中每个类的功能,希望本片文章会对你有所帮助!
后期彩蛋
应用层收发缓冲区的设计?
为什么要有收发缓冲区?
ET / LT 模式
数据收发的过程