【socket 要点回顾】I/ O 多路复用
为什么要使用I/ O 多路复用?
- 当一个进程有很多个I/O 需要处理的时候,1号I/O 可能没有数据可能,此时由于I/O 是阻塞读,整个进程就阻塞到从1号I/O 读数据的这个地方。什么其他事情都无法处理,即使,此时,如果有其他的I/O 有数据可以读了。这很明显是很低效的方式,那么你可能会想,我开n个线程,每个线程处理一个I/O读的事件,那么不就可以避免上述问题了吗?
- 确是这样的,但是呢,如果一个I/O流进来,我们就开启一个线程处理这个I/O流。那么假设现在有一百万个I/O流进来,那我们就需要开启一百万个线程一一对应处理这些I/O流(——这就是传统意义下的多进程并发处理)。思考一下,一百万个线程,你的CPU占有率会多高,这个实现方式及其的不合理。所以人们提出了I/O多路复用这个模型,一个线程,通过记录I/O流的状态来同时管理多个I/O,可以提高服务器的吞吐能力。
I/O 复用如何实现呢?
也就是说,我们希望应用需要同时在多个文件描述符上阻塞,并且在其中一个或多个文件描述可以读写,或出错的情况收到通知即被唤醒。
I/O 复用如何设计呢?
首先,既然要设计,我们就先忽略底层如何实现,专心于如何设计数据结构和输入输出来完成这个事情。
- 需要有个数据结构来保存多个文件描述符
- 需要有几个可以将一个文件描述符 添加,删除到该数据结构中的方法
- 需要有个方法,在唤醒后,来判断某个文件描述符是否是可读可写的
- 需要有个核心的方法,该方法接受这个数据结构,并会当没有文件可以读可写时阻塞,并且在某个或多个文件描述符可读可写后被唤醒
- 同时需要设置超时时间,来避免应用一直等等下去。
既然如此,我们看一下linux 下的三种实现方式 ,select ,poll,epoll
select
/* According to POSIX.1-2001, POSIX.1-2008 */
#include <sys/select.h>
/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
啊哈,看起来和我们所设想的实现方式蛮相似的
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
// nfds:NFD应该设置为三个集合中编号最高的文件描述符的值 加上1。
// readfds,writefds,exceptfds:指定了我们让内核测试读、写和异常条件的描述字
// fd_set:为一个存放文件描述符的信息的结构体,可以通过下面的宏进行设置。
void FD_ZERO(fd_set *fdset);
//清空集合
void FD_SET(int fd, fd_set *fdset);
//将一个给定的文件描述符加入集合之中
void FD_CLR(int fd, fd_set *fdset);
//将一个给定的文件描述符从集合中删除
int FD_ISSET(int fd, fd_set *fdset);
// 检查集合中指定的文件描述符是否可以读写
注意 : nfds:nfds 应该设置为三个集合中编号最高的文件描述符的值 加上1
在这里注意一下linux select 的实现方式
- fd_set 实现方式是一个位数组。
- 因为fd_set 是一个位数组,为了使用FD_ISSET 必须用额外一个数组,来存储所有待监听的文件描述符。
- 当select 调用成功后,每个集合都会被修改成只包含I/O 就绪的文件描述符集合。
- 其中的时间的结构体中的内容会被修改,而且把值改为剩余时间。
## EXAMPLE
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int
main(void)
{
fd_set rfds;
struct timeval tv;
int retval;
/* Watch stdin (fd 0) to see when it has input. */
FD_ZERO(&rfds);
FD_SET(0, &rfds);
/* Wait up to five seconds. */
tv.tv_sec = 5;
tv.tv_usec = 0;
retval = select(1, &rfds, NULL, NULL, &tv);
/* Don't rely on the value of tv now! */
if (retval == -1)
perror("select()");
else if (retval)
printf("Data is available now.\n");
/* FD_ISSET(0, &rfds) will be true. */
else
printf("No data within five seconds.\n");
exit(EXIT_SUCCESS);
}
小提一下,可以用select 替换sleep 来做到更通用的,精确的睡眠阻塞。
poll
看一下另一个实现方式,有种面向对象的感觉了。
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
// The timeout argument specifies the number of milliseconds
很简洁,一个结构体,一个核心函数。封装了一下,这里抽象了一个事件的结构体。
设置事件就是给events 赋值,判断事件就看revents的值
man poll 就OK了
在正式述说epoll 前先说一下select 和 poll 的优缺点。
- 由于select 使用的是位数组,在文件描述符很大的时候,效率会低一些。
- 其次,select 需要计算最大的文件描述符的值。
- 但是呢 ,select 的时间精度更高一些。
- 而且呢 ,select 可移植性好一些。
select 和 poll 有两点不太好的地方
- 每次调用都需要传递你所要监控的所有socket给select/poll系统调用,这意味着需要将用户态的socket列表copy到内核态,如果以万计的句柄会导致每次都要copy几十几百KB的内存到内核态,非常低效。
- 每次为了获得可读可写的文件描述符都需要遍历存储了所有监听的文件描述符的表,非常低效。
epoll
那么epoll 是如何解决这个问题的呢?
首先我们分析一下这个过程,添加一个文件描述符是一个低频的操作,然而监听这个操作是一个高频的事件。那么,为何不把 注册监听和实际监听分离出来。这样,整个过程变成了,
- 创建一个上下文。
- 添加,删除或修改一个文件描述符到上下文中去。
- 进行事件等待。
同时,epoll 会返回一个数值,代表有几个事件就绪,并返回所有事件成立的数组。
epoll 的实现里,他将添加删除修改,封装成了一个函数,通过一个额外的一个控制位来控制具体的操作。
那么以下就一个epoll 实现I/ O 复用 的一个框架。
#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;
/* Code to set up listening socket, 'listen_sock',
(socket(), bind(), listen()) omitted */
epollfd = epoll_create1(0);
if (epollfd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}
for (;;) {
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
for (n = 0; n < nfds; ++n) {
if (events[n].data.fd == listen_sock) {
conn_sock = accept(listen_sock,
(struct sockaddr *) &addr, &addrlen);
if (conn_sock == -1) {
perror("accept");
exit(EXIT_FAILURE);
}
setnonblocking(conn_sock);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
&ev) == -1) {
perror("epoll_ctl: conn_sock");
exit(EXIT_FAILURE);
}
} else {
do_use_fd(events[n].data.fd);
}
}
}