Linux C++ 网络编程(二)

前言

在上一节中以拆分的方式学习完 Linux 、C++、网络等知识后,这节会将这三个模块糅合起来,站在项目的基础上再次去学习这三个模块。 Linux 网路编程比较经典的有 Redis、Muduo、TeamTalk等开源项目。本文将以 Muduo 来介绍 网络编程的框架,学习完 Muduo 再去学习其他框架就容易许多了。注意, 文中贴的代码均为核心代码,且注释详细,认真阅读。

Linux C++ 网络编程 (一)

Linux C++ 开发环境搭建

下载路径

github 地址: https://github.com/smilew12/muduo.git

项目结构

项目主要分为两个模块:

base 模块: 主要封装互斥锁、条件变量、线程池、日志等基础类;

net    模块: 主要根据 reactor 模型对 Linux 平台下 Epoll 的封装;

本文主要介绍服务器模型,所以只引导读者学习 net 模块, base 模块主要使用 RAII 技法封装的常用类。

epoll 由来

先来说说传统的迭代式服务器模型以及一个连接一个线程的模型的缺点,以下图一个连接一个线程为例说明:


有图中可以看到,进程将阻塞在 accept 函数处,当客户端调用 connect 函数时,accept 返回,当前进程创建一个线程用来处理本次会话的任务。但是创建线程是有一定的开销的,在 32 位 Linux 系统下用户空间有 3G, 一个线程栈 10M, 那么总共可以创建 300 多个线程,同时切换线程也会有时间开销,所以并发连接数不会很高。

IO 多路复用的产生

有没有一种方式:

我将我想要监听的链接、读写等事件全权委托给它;

有连接到来事件的时候通知我? 我直接调用 accept 函数去接收。

有数据到来的时候也通知我?  我直接调用 recv 函数去收取数据。

发送缓冲区有空间时也通知我? 我直接调用 send 函数去发送数据。

为了解决这种相当于代理一样的东西,linux 内核给用户提供了一系列系统调用,当有事件时,该系统调用会返回相应事件,用户只需拿到事件进行处理即可。那么这就是 IO 多路复用函数(select / poll / epoll)

因为本文是将 Linux 平台下的网络编程,所以选择了 Epoll:

epoll_create()  函数用来创建一个代理对象;

epoll_wait()      函数就是当有事件到来时,会返回响应事件的代理;

epoll_ctl()        主要是往这个代理中注册你想要监听的事件;

下面是一个根据 Reactor 模型对 Epoll 的封装例子:


//1)在父进程中,fork返回新创建子进程的进程ID;

//2)在子进程中,fork返回0;

//3)如果出现错误,fork返回一个负值;

pid=fork();

if(pid<0)

   {

std::cout<<"fork error"<<std::endl;

exit(-1);

   }

//父进程退出,子进程独立运行

elseif(pid>0) {

exit(0);

   }

//之前parent和child运行在同一个session里,parent是会话(session)的领头进程,

//parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。

//执行setsid()之后,child将重新获得一个新的会话(session)id。

//这时parent退出之后,将不会影响到child了。

setsid();

intfd;

fd=open("/dev/null",O_RDWR,0);

if(fd!=-1)

   {

dup2(fd,STDIN_FILENO);

dup2(fd,STDOUT_FILENO);

dup2(fd,STDERR_FILENO);

   }

if(fd>2)

close(fd);

}

intmain(intargc,char*argv[])

{  

    //设置信号处理

    signal(SIGCHLD,SIG_DFL);

    signal(SIGPIPE,SIG_IGN);

    signal(SIGINT,prog_exit);

    signal(SIGKILL,prog_exit);

    signal(SIGTERM,prog_exit);


    shortport=0;

    intch;

    boolbdaemon=false;

//根据传入参数,判断是否开启守护进程模式  -d

    while((ch=getopt(argc,argv,"p:d"))!=-1)

    {

        switch(ch)

        {

        case'd':

            bdaemon=true;

            break;

        case'p':

            port=atol(optarg);

            break;

        }

    }

    if(bdaemon)

        daemon_run();

    if(port==0)

        port=12345;

    //根据ip、port 初始化socket,创建 epoll_wait()

    if(!g_reator.init("0.0.0.0",12345))

        return-1;


//进入 loop 循环

    g_reator.main_loop(&g_reator);

return0;

}

为了方便理解,我画了一个大体流程图,如下所示:

执行流程

1、创建监听 socket,并绑定、监听;

2、调用 epoll_create() , 创建 epollfd 代理;

3、将想要监听的 listenfd,通过 epoll_ctl() , 挂载到 epollfd 上,让 epollfd 代理监听;

4、在一个 while 循环中,调用 epoll_wait(), 程序阻塞在这里,等待客户端连接到来;

5、当某个客户端连接到来,epoll_wait() 第一次返回的是接受连接的 listenfd, 调用 accept 函数接受连接,将   accpet 返回的 connfd,挂载到 epollfd 上,继续让代理监听 connfd 的读写事件;

6、当再次到达 5 时,如果是接受连接的 listenfd,那么继续 accept, 如果是读写事件,则进行 7, 接受或发送数据。

至此,一个 reactor 模型基本已经完成了,相信你应该已经理解了,大体的流程了。那么在来看 Muduo 是怎养封装这个 epoll 的?

EventLoop

先来介绍最重要的模块 EventLoop :  他是上面第四步的实现:

//在一个循环中让 epoll_wait() 不断检测事件

voidEventLoop::loop() {

    while() {


//vector, 用来返回 epoll_wait 中监听到的 有活动的事件(fd)

m_activeChannels.clear();


//m_activeChannels 是一个输入输出参数

    m_poller->poll(kPollTimeMs,&m_activeChannels);

        //遍历 epoll_wait 返回的结果,然后进行 accept / send / recv

    for(ChannelList::iteratorit=m_activeChannels.begin();


    it!=m_activeChannels.end();++it)

    {

      (*it)->handleEvent();

    }


    //处理其他事件, 此时不必关心

    doPendingFunctors();

   }

}

Channel

对于上面 epoll_wait() 返回的就绪事件,怎么去处理呢? 那么 Channel 类主要负责把不同的 IO 事件分发给不同的回调,例如 ReadCallback、 WriteCallBack 等;同时提供向 epollfd 中注册可读可写事件的接口。每个 Channel 自始至终只负责一个文件描述符的 IO 事件分发。

//主要处理 epoll_wait 返回的事件,并将他们分发到不同的回调

voidChannel::handleEvent()

{

    //如果出错,那么分发到错误的回调

  if(m_revents&(POLLERR|POLLNVAL)){

    if(m_errorCallBack)m_errorCallBack();

  }


//如果是 POLLIN, 说明是数据收发的回调

  if(m_revents&(POLLIN|POLLPRI|POLLRDHUP)){

    if(m_readCallBack)m_readCallBack();

  }


//如果是 POLLOUT, 那么是可写事件的回调

  if(m_revents&POLLOUT){

    if(m_writeCallBack)m_writeCallBack();

  }

}

//设置回调函数,供其他模块注册回调函数

voidsetReadCallback(constReadEventCallback&cb)

    {readCallback_=cb; }

voidsetWriteCallback(constEventCallback&cb)

    {writeCallback_=cb; }

voidsetCloseCallback(constEventCallback&cb)

    {closeCallback_=cb; }

voidsetErrorCallback(constEventCallback&cb)

{errorCallback_=cb; }

//往 epoll 中注册可读事件

voidenableReading() {m_events|=kReadEvent;update(); }

//从 epoll 中移除可读事件

voiddisableReading() {m_events&=~kReadEvent;update(); }

voidenableWriting() {m_events|=kWriteEvent;update(); }

boolisWriting() {returnm_events&=kWriteEvent; }

boolisReading() {returnm_events&=kReadEvent; }

voiddisableWriting() {m_events&=~kWriteEvent;update(); }

//最终都调用 uodate函数,该函数会调用 EventLoop::uodateChannel(), 后者在调用 Poller::updateChannel();

voidChannel::update()

{

m_addedToLoop=true;

p_loop->runInLoop(std::bind(&EventLoop::updateChannel,p_loop,this));

}

Poller

Poller类是 IO multiplexing 的封装。在 Muduo 中是一个抽象类,因为 Muduo 同时支持 poll 和 epoll 两种 IO 多路复用机制,他们是真正调用 epoll_wait() 的地方

TimeStampPoller::poll(inttimeoutMs,ChannelList*activeChannels)

{


//真正的调用 epoll_wait / poll 的地方。

intnumEvents=::poll(/*&*m_pollfds.begin()*/m_pollfds.data(),m_pollfds.size(),timeoutMs);


TimeStampnow(TimeStamp::now());


if(numEvents>0){

//将返回的结果封装成 channel 返回给 Eventloop::loop 函数

fillActiveChannels(numEvents,activeChannels);

  }

elseif(numEvents==0){

LOG_TRACE<<" nothing happended";

  }

else{

LOG_SYSERR<<"Poller::poll()";

  }

returnnow;

}

voidPoller::fillActiveChannels(intnumEvents,ChannelList*activeChannels)const

{

for(PollFdList::const_iteratorpfd=m_pollfds.begin();

pfd!=m_pollfds.end()&&numEvents>0;++pfd)

  {

if(pfd->revents>0)

   {

--numEvents;

ChannelMap::const_iteratorch=m_channels.find(pfd->fd);

assert(ch!=m_channels.end());

Channel*channel=ch->second;

assert(channel->fd()==pfd->fd);

channel->set_revents(pfd->revents);

//将返回的结果封装成 channel 返回给 Eventloop::loop 函数

activeChannels->push_back(channel);

   }

  }

}

讲解完 EventLoop、Poller、Channel 和 Poller 后,在从下面的时序图,看看他们的执行流程

上面提到的 Channel::handleEvent() 会将事件分发给注册此回调函数的模块,那么谁都需要注册呢?

接受客户端连接到来的类 ,需要注册 Channel::setReadCallback(),  以调用accept 进行接受;

接受有数据到来的类 ,需要注册 Channel::setReadCallback(), 以调用 recv 接收数据;

检测发送缓冲区是否可写, 需要注册 Channel::WriteCallback(), 以调用 send 函数发送数据;

Acceptor

接受客户端连接到来的类就是 Muduo::net::Acceptor 类

Acceptor::Acceptor(EventLoop*loop,constInetAddress&listenAddr)

:loop_(loop),

    //在构造函数中同时创建 acceptsocket,并调用下面的 acceptChannel_.setReadCallback 将acceptSocket添加到 epoll 中,监听可读事件

acceptSocket_(sockets::createNonblockingOrDie()),

acceptChannel_(loop,acceptSocket_.fd()),

listenning_(false),

idleFd_(::open("/dev/null",O_RDONLY|O_CLOEXEC))

{

assert(idleFd_>=0);

acceptSocket_.setReuseAddr(true);

//绑定监听

acceptSocket_.bindAddress(listenAddr);


//注册可读事件,当有事件到来时, 调用 handleRead() 回调接受连接

acceptChannel_.setReadCallback(

boost::bind(&Acceptor::handleRead,this));

}

voidAcceptor::handleRead()

{

loop_->assertInLoopThread();

InetAddresspeerAddr(0);


//真正 调用 accept 的地方

intconnfd=acceptSocket_.accept(&peerAddr);

if(connfd>=0)

  {

// string hostport = peerAddr.toIpPort();

// LOG_TRACE << "Accepts of " << hostport;

if(newConnectionCallback_)

   {

//将接受的新连接,分配给 TcpConnection 函数。即代表一路连接

newConnectionCallback_(connfd,peerAddr);

   }

else

   {

sockets::close(connfd);

   }

  }

else

  {

// Read the section named "The special problem of

// accept()ing when you can't" in libev's doc.

// By Marc Lehmann, author of livev.

if(errno==EMFILE)

   {

::close(idleFd_);

idleFd_=::accept(acceptSocket_.fd(),NULL,NULL);

::close(idleFd_);

idleFd_=::open("/dev/null",O_RDONLY|O_CLOEXEC);

   }

  }

}

下面的时序图将更好的为你展示 Acceptor 的作用:

Acceptor::accpet()  会返回一个 connfd(客户端连接fd),同时会暴露setNewConnectionCallback() 回调函数,供需要使用 connfd 的类使用;那么那个类需要使用 connfd 呢?  首先想一想 connfd 主要是干嘛的? 它是 accpet 返回的文件描述符,可以用来完成数据收发,收取和发送用户的数据,完成具体业务。现在需要一个类去管理(TcpServer) connfd,并且每创建一个 connfd,都会创建这个文件描述符所对应的连接类(TcpConnection).用来管理本次会话。

那么 TcpServer 和 TcpConnection 类就相应而生:

TcpServer

TcpServer 类会注册 Acceptor::setNewConnectionCallback,同时会为每个连接创建一个 TcpConnection 类;

//TcpServer 类

TcpServer::TcpServer(EventLoop*loop,

constInetAddress&listenAddr,

conststring&nameArg)

:loop_(CHECK_NOTNULL(loop)),

hostport_(listenAddr.toIpPort()),

name_(nameArg),

acceptor_(newAcceptor(loop,listenAddr)),

threadPool_(newEventLoopThreadPool(loop)),

connectionCallback_(defaultConnectionCallback),

messageCallback_(defaultMessageCallback),

started_(false),

nextConnId_(1)

{

//在构造函数中 注册 Acceptor 的 connfd 回调函数,同时调用 newconnection 函数

acceptor_->setNewConnectionCallback(

boost::bind(&TcpServer::newConnection,this,_1,_2));

}

voidTcpServer::newConnection(intsockfd,constInetAddress&peerAddr)

{

loop_->assertInLoopThread();

// 按照轮叫的方式选择一个EventLoop

EventLoop*ioLoop=threadPool_->getNextLoop();

charbuf[32];

snprintf(buf,sizeofbuf,":%s#%d",hostport_.c_str(),nextConnId_);

++nextConnId_;

stringconnName=name_+buf;

LOG_INFO<<"TcpServer::newConnection ["<<name_

<<"] - new connection ["<<connName

<<"] from "<<peerAddr.toIpPort();

InetAddresslocalAddr(sockets::getLocalAddr(sockfd));

//每一个连接 新建一个 TcpConnection 类,同时设置他们的用来数据收发的回调函数

TcpConnectionPtrconn(newTcpConnection(ioLoop,

connName,

sockfd,

localAddr,

peerAddr));

connections_[connName]=conn;


conn->setConnectionCallback(connectionCallback_);

conn->setMessageCallback(messageCallback_);

conn->setWriteCompleteCallback(writeCompleteCallback_);

conn->setCloseCallback(

boost::bind(&TcpServer::removeConnection,this,_1));

// conn->connectEstablished();

ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished,conn));

LOG_TRACE<<"[5] usecount="<<conn.use_count();

}

TcpConnection

TcpConnection 类会向 分发事件的类 Channel 注册接受数据和发送数据的回调 setReadCallback、setWriteCallback等,

// TcpConeciton 的构造函数

TcpConnection::TcpConnection(EventLoop* loop,

                            const string& nameArg,

                            int sockfd,

                            const InetAddress& localAddr,

                            const InetAddress& peerAddr)

  : loop_(CHECK_NOTNULL(loop)),

    name_(nameArg),

    state_(kConnecting),

    socket_(new Socket(sockfd)),

    channel_(new Channel(loop, sockfd)),

    localAddr_(localAddr),

    peerAddr_(peerAddr),

    highWaterMark_(64*1024*1024)

{

  // 通道可读事件到来的时候,回调TcpConnection::handleRead,_1是事件发生时间

  channel_->setReadCallback(

      boost::bind(&TcpConnection::handleRead, this, _1));


  // 通道可写事件到来的时候,回调TcpConnection::handleWrite

  channel_->setWriteCallback(

      boost::bind(&TcpConnection::handleWrite, this));


  // 连接关闭,回调TcpConnection::handleClose

  channel_->setCloseCallback(

      boost::bind(&TcpConnection::handleClose, this));


  // 发生错误,回调TcpConnection::handleError

  channel_->setErrorCallback(

      boost::bind(&TcpConnection::handleError, this));

  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this

            << " fd=" << sockfd;

  socket_->setKeepAlive(true);

}

//用于发送数据的函数

void TcpConnection::handleWrite()


//当有事件到来,Channel::handleEvent() 会分发事件,同时提供回调函数, TcpConnection 会注册 setReadCallback 完成真正的数据收发;

void TcpConnection::handleRead(Timestamp receiveTime)

由于篇幅有限,本文先介绍了 Muduo 中大体的类, 还有 Buffer 类将会在下文介绍。

总结

读者认真阅读上文中的所有类,结合代码注释,先了解 Reactor 模型的大体工作流程,然后再了解 Muduo 中每个类的功能,希望本片文章会对你有所帮助!

后期彩蛋

应用层收发缓冲区的设计?

为什么要有收发缓冲区?

ET / LT 模式

数据收发的过程

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,009评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,808评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,891评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,283评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,285评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,409评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,809评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,487评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,680评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,499评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,548评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,268评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,815评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,872评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,102评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,683评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,253评论 2 341

推荐阅读更多精彩内容