一起来写web server 08 -- 多线程+非阻塞IO+epoll
到了多线程,一些东西就变得耐人寻味了.
这个版本是在前面单线程epoll
的基础上引入了线程池,当然不是前面玩具一样的线程池,而是一个通用的组件,生产者消费者队列.
生产者消费者队列
生产者消费者问题是操作系统中一个很经典的同步互斥问题,已经有了很不错的解决方案,将它的解决方案拓展一下,就可以用于我们的实践啦.
我自己写了一个生产者消费者的队列,然后发现muduo
中已经内置了这种模型,而且使用起来比我写的更加顺手,所以我就引用它的实现,我这里稍微来讲解一下它的实现,然后我会顺带讲解一下我的思路.
muduo
库的生产者消费者模型
这是ThreadPool
类的一个声明:
class ThreadPool : noncopyable
{
public:
typedef boost::function<void()> Task; /* 需要执行的任务 */
private:
bool isFull();
Task take();
size_t queueSize();
int threadNum_; /* 线程的数目 */
int maxQueueSize_;
std::list<Task> queue_; /* 工作队列 */
MutexLock mutex_;
Condition notEmpty_;
Condition notFull_;
};
这里的boost::function
其实在cpp 11
标准中已经加入了,你如果没有安装boost库的话,可以缓存std
的版本,效果是一样的.因为boost
本来就是cpp
的std
的一个备选库.
为什么要使用boost::function
不用我多说,你可以查看这里:http://blog.csdn.net/solstice/article/details/3066268
我们来看一下代码的实现,首先是构造函数:
ThreadPool::ThreadPool(int threadNum, int maxQueueSize)
: threadNum_(threadNum)
, maxQueueSize_(maxQueueSize)
, mutex_()
, notEmpty_(mutex_)
, notFull_(mutex_)
{
assert(threadNum >= 1 && maxQueueSize >= 1);
/* 接下来要构建threadNum个线程 */
pthread_t tid_t;
for (int i = 0; i < threadNum; i++) {
Pthread_create(&tid_t, NULL, startThread, this);
}
}
这里ThreadPool
有两个条件变量,一个是notEmpty_
,一个是notFull_
,构造函数接受两个参数,一个是线程的数目,一个是最大的队列的大小.
接下来是所有的线程都运行的函数startThread
:
void* ThreadPool::startThread(void* obj)
{ /* 工作者线程 */
Pthread_detach(Pthread_self());
ThreadPool* pool = static_cast<ThreadPool*>(obj);
pool->run();
return pool;
}
它们都开始调用run
函数:
void ThreadPool::run()
{
for ( ; ; ) { /* 一直运行下去 */
Task task(take());
if (task) {
//mylog("task run!");
task();
}
//mylog("task over!");
}
}
run
函数非常简单,就是不断从队列中取出任务,然后运行任务,没有任务的话,会阻塞在那里.
我们来看take
函数:
ThreadPool::Task ThreadPool::take()
{
MutexLockGuard lock(mutex_); /* 加锁 */
while (queue_.empty()) { /* 如果队列为空 */
notEmpty_.wait(); /* 等待 */
}
Task task;
if (!queue_.empty()) {
task = queue_.front();
queue_.pop_front();
if (maxQueueSize_ > 0) { /* 通知生产者队列有空位置了 */
notFull_.notify();
}
}
//mylog("threadpool take 1 task!");
return task;
}
对于生产者而言,有一个非常重要的函数,那就是append
:
bool ThreadPool::append(Task&& task)
{ /* 使用了右值引用 */
{
MutexLockGuard lock(mutex_); /* 首先加锁 */
while (isFull()) { /* 如果队列已满 */
notFull_.wait(); /* 等待queue有空闲位置 */
}
assert(!isFull());
queue_.push_back(std::move(task)); /* 直接用move语义,提高了效率 */
//mylog("put task onto queue!");
}
notEmpty_.notify(); /* 通知消费者有任务可做了 */
}
生产者消费者队列的代码就是这么简单,但是muduo
库写的确实很漂亮.
我的思路
其实代码基本上和前面的类似,不同的是,我压根就没有考虑过使用boost::funciton
和boost::bind
这对神器,因为我之前也压根就没有这样编过码.
如果不用boost::funciton
和boost::bind
这两样东西,我们要实现类似的代码的话,可能的一个解决方案是使用模版(template
).
队列里面放的是T
类型,然后消费者取出一个T
类型,调用T
类型的一个run
或者别的什么不带参数的方法.这样以来,对T
类型就有了限制,要求T
类型必须实现run
之类的方法.
而且代码变得不太容易读.加了模版的玩意总是不容易读,不是吗?所以要积极使用cpp
的新特性.
主程序变成了生产者
这一次的代码变得简洁多了,
int main(int argc, char *argv[])
{
int listenfd = Open_listenfd(8080); /* 8080号端口监听 */
epoll_event events[MAXEVENTNUM];
sockaddr clnaddr;
socklen_t clnlen = sizeof(clnaddr);
block_sigpipe(); /* 首先要将SIGPIPE消息阻塞掉 */
int epollfd = Epoll_create(1024); /* 10基本上没有什么用处 */
addfd(epollfd, listenfd, false); /* epollfd要监听listenfd上的可读事件 */
ThreadPool pools(10, 30000); /* 10个线程,300个任务 */
HttpHandle::setEpollfd(epollfd);
HttpHandle handle[2000];
for ( ; ;) {
int eventnum = Epoll_wait(epollfd, events, MAXEVENTNUM, -1);
for (int i = 0; i < eventnum; ++i) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) { /* 有连接到来 */
//mylog("connection comes!");
for ( ; ; ) {
int connfd = accept(listenfd, &clnaddr, &clnlen);
if (connfd == -1) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { /* 将连接已经建立完了 */
break;
}
unix_error("accept error");
}
handle[connfd].init(connfd); /* 初始化 */
addfd(epollfd, connfd, false); /* 加入监听 */
}
}
else { /* 有数据可读或者可写 */
pools.append(boost::bind(&HttpHandle::process, &handle[sockfd]));
}
}
}
return 0;
}
注意最后的一句boost::bind(&HttpHandle::process, &handle[sockfd])
,直接将对象往函数上一绑定,就往队列里面扔.非常爽.
这一次,我们终于将SIGPIPE消息给忽略掉了,主要是调用下面这个函数:
void block_sigpipe()
{
sigset_t signal_mask;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGPIPE);
int rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
if (rc != 0) {
printf("block sigpipe error\n");
}
}
shared_ptr并不是线程安全的
正如文章开头所讲的,多线程一来,很多事情就变得莫名奇妙了,比如说shared_ptr
,因为这个玩意的线程不安全性,我调了半天bug
,才发现原来是Cache的查找函数出了问题,下面是修改过后的线程安全版本的函数:
/* 线程安全版本的getFileAddr */
void getFileAddr(std::string fileName, int fileSize, boost::shared_ptr<FileInfo>& ptr) {
/*-
* shared_ptr并不是线程安全的,对其的读写都需要加锁.
*/
MutexLockGuard lock(mutex_);
if (cache_.end() != cache_.find(fileName)) { /* 如果在cache中找到了 */
ptr = cache_[fileName];
return;
}
if (cache_.size() >= MAX_CACHE_SIZE) { /* 文件数目过多,需要删除一个元素 */
cache_.erase(cache_.begin()); /* 直接移除掉最前一个元素 */
}
boost::shared_ptr<FileInfo> fileInfo(new FileInfo(fileName, fileSize));
cache_[fileName] = fileInfo;
ptr = std::move(fileInfo); /* 直接使用move语义 */
}
至于为什么不安全,可以查看这里,写的再好不过了:http://blog.csdn.net/solstice/article/details/8547547
多线程的调试
原谅我到了这接近尾声的时候,才提起多线程的调试,首先要说一句的是,多线程真的不太好调,因为很难重现错误,但是我在这里稍稍介绍一下我的技巧.
打印
打印算是屡试不爽的一种方法,对于我们这个简陋的web server
,我封装了一个日志函数mylog
:
void mlog(pthread_t tid, const char *fileName, int lineNum, const char *func, const char *log_str, ...)
{
va_list vArgList; //定义一个va_list型的变量,这个变量是指向参数的指针.
char buf[1024];
va_start(vArgList, log_str); //用va_start宏初始化变量,这个宏的第二个参数是第一个可变参数的前一个参数,是一个固定的参数
vsnprintf(buf, 1024, log_str, vArgList); //注意,不要漏掉前面的_
va_end(vArgList); //用va_end宏结束可变参数的获取
printf("%lu:%s:%d:%s --> %s\n", tid, fileName, lineNum, func, buf);
}
然后定义了一个宏,方便使用这个函数:
#define mylog(formatPM, args...)\
mlog(pthread_self(), __FILE__, __LINE__, __FUNCTION__, (formatPM) , ##args)
需要日志的时候,可以像printf
函数一样使用:
mylog("My simple web server! %d, %s\n", 1, "hello, workd!");
这个宏展开后会调用mlog
函数,打印出行,文件名,函数名等信息,对付我们这个小玩意足够了.
用VS来调试
VS
其实也内置了线程的调试,你可以结合Visual Gdb
一起来调试linux
下的代码.一两个线程问题倒是不大,不过线程多了的话,这个玩意就不好调了,要我说,最好的方法还是分析日志.
总结
这个版本已经算是比较强劲的一个版本了,修复了前面的一些bug
,但是引入了新的bug
,这个bug
我也是折腾了很久才弄出来.
一般在单线程下不可能出现这样的bug
,只有在多线程的条件下,这样的代码才变成了bug
,正如前面见到的,每个HttpHandle
处理一个连接,试想这样一种情形:客户端不知道因为什么原因,第一次发送了这样的数据:
GET /
隔了很短时间才会发送余下的数据.这时,第一次发送的数据正在被另外一个线程处理,在多线程条件下,对于第二次到来的数据,这个HttpHandle
会交由另外一个线程处理,也就是说,有两个线程在不加锁地使用同一个HttpHandle
,不出问题才怪.
解决方案是有的,那就是EPOLL
的ONESHOT
参数.不过那是下一个版本的故事啦.
和之前类似的,代码在这里:https://github.com/lishuhuakai/Spweb