聊聊linux定时器


title: 聊聊Linux定时器
tags: longzy:2018-11-17


在上一篇文章<如何选择TCP长连接与短连接>中,我介绍了 如何根据业务场景来选择TCP长连接还是短连接,其中在选择的长连接中,我提到了在长连接中用什么方法来检测长连接的存在与否?

为什么要去检测连接呢?连接建立以后不就可以收发数据了吗?no no no!其实TCP连接建立以后,并不是一直保持的,操作系统在实现TCP协议的时候做了一个限制,就是keepAlive。

所以我们需要一种方法来检测连接,那使用什么方法呢?这里我介绍两种方法,也是最常用的。

  1. 操作系统的KeepAlive机制
  2. 应用层实现心跳检测
KeepAlive机制

keepalive机制可以参考以下配置

cat /proc/sys/net/ipv4/tcp_keepalive_time
cat /proc/sys/net/ipv4/tcp_keepalive_intvl
cat /proc/sys/net/ipv4/tcp_keepalive_probes

大概含义是:

  1. tcp_keepalive_time: KeepAlive的空闲时长,或者说每次正常发送心跳的周期,默认值为7200s(2小时)
  2. tcp_keepalive_intvl: KeepAlive探测包的发送间隔,默认值为75s
  3. tcp_keepalive_probes:在tcp_keepalive_time之后,没有接收到对方确认,继续发送保活探测包次数,默认值为9(次)

keepalive默认不是开启的,如果想使用,则先修改配置文件 /etc/sysctl.conf:

net.ipv4.tcp_keepalive_time=7200
net.ipv4.tcp_keepalive_intvl=75
net.ipv4.tcp_keepalive_probes=9

然后通过linux底层函数 setsockopt ,原型为:

#include <sys/socket.h>
int setsockopt(int socket, int level, int option_name,const void *option_value, socklen_t option_len);

具体的使用这里就不介绍了,google有很多文章介绍的。

应用层心跳检测

应用层的心跳检测就是在server和client的接口协议中,设计一种用于心跳检测的协议格式,然后一般由客户端定时发送给服务端,因为这个连接一般是由服务端来管理的。

这里提到了定时,终于说到我这篇文章的重点了,对,接下来,该讨论重点:定时器!

定时器

在这里,我从三个方面来说定时器

  1. linux提供的定时器
  2. 高性能定时器
  3. 工业级定时器
linux提供的定时器

linux提供了三种定时器

  1. socket选项 SO_RCVTIMEO和SO_SNDTIMEO
  2. SIGALRM信号
  3. I/O复用系统调用的超时函数。

socket选项 SO_RCVTIMEO和SO_SNDTIMEO
这个选项分别是用来设置socket接受和发送数据的超时时间,因此这两个选项仅对与数据接受和发送相关的socket专用系统调用,这些调用包括send、sendmsg、recv、recvmsg、accept、和connect。我们将这两个选项对这些系统调用的影响总结为下表

系统调用 有效选项 系统调用超时后的行为
send SO_SNDTIMEO return -1,errno is EAGAIN or EWOULDBLOCK
sendmsg SO_SNDTIMEO return -1,errno is EAGAIN or EWOULDBLOCK
recv SO_RCVTIMEO return -1,errno is EAGAIN or EWOULDBLOCK
recvmsg SO_RCVTIMEO return -1,errno is EAGAIN or EWOULDBLOCK
accept SO_RCVTIMEO return -1,errno is EAGAIN or EWOULDBLOCK
connect SO_SNDTIMEO return -1,errno is EINPROGRESS

这些选项在l实际inux服务器开发中很少用,因为如果设置这些选项后,意味着可能就选择了阻塞,然后在如今服务器开发过程中,基本不会采用阻塞。所以这里具体的用法我就不用举例了,大家去google,毕竟这篇文章的重点打算说的是工业级定时器

SIGALRM信号
由alarm和setitimer、sleep等函数设置的时间闹钟一旦超时,将会触发SIGALRM信号,因此可以利用该信号的信号处理函数来处理定时任务。这里不作过多的描述,如果想进一步了解的话,推荐大家去读<Linux高性能服务器编程>一书。

I/O复用系统调用的超时函数
Linux下的3组I/O复用(select、poll、epoll)系统调用都带有超时参数,他们不仅能处理信号事件和I/O事件,同样也能处理定时事件。由于I/O复用系统调用可能在超时时间到期之前就有返回(信号或I/O事件到来),所有如果利用I/O复用来做定时,每一次返回之后要重新更新时间,以保证处理最小定时器。
利用epoll设置定时的简单example

int timeout = 2000;
time_t start = time(NULL);
time_t end = time(NULL);

while(1)
{
  start = time(NULL);
  int num = epoll_wait(efd,events,events_size,timeout);
  if (num < 0 && errno != EINTR) {
    printf("epoll_wait failed!\n");
    break;
  }
  if (0 == timeout)
  {
    printf("timeout is arrived\n");
    continue;
  }
  end = time(NULL);
  //重新计算下一次的超时时间
  timeout -= (end - start)*1000;
  //timeout有可能为0,这时设置成默认的超时时间好了
  if (timeout <=0 ) {
    timeout = 2000;
  }
  //处理到来的定时事件
}

这里只是简单提一下,后面在工业级定时器中会分析redis的定时器设置,以及后面再专门讲一下epoll

高性能定时器

这里有几个理论知识要讲,我怕自己描述不清,给大家造成知识上的误解,所以在这里我推荐大家去读<Linux高性能服务器编程>一书,里面讲得特别清楚,如果大家需要电子版,可以给我留言或加我微信(个人资料中有微信号)。

工业级定时器

这里主要分析两个比较流行的开源代码,看看人家是怎么设计定时器的!

  1. redis
  2. muduo

redis
我们来看看redis的定时器是如何做的
1,创建一个定时器,调用如下

if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }

我们来看看实现:

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
   /*这里解释下每个参数
   eventLoop: 暂时不用管
   milliseconds: 定时器的到期时间,是相对时间
   proc: 定时器回调函数
   clientData: 用户数据
   finalizerProc: 最后处理函数
   */
    long long id = eventLoop->timeEventNextId++;//定时器编号
    aeTimeEvent *te;//具体结构不用管,表示一个具体定时器事件

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    //aeAddMillisecondsToNow这个函数是把相对时间转为绝对时间
    te->timeProc = proc;//定时器处理函数
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;//放入到eventLoop维护的定时器列表中
    return id;
}

在主处理事件的过程中

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
        /*如果是定时器事件,则去查找aeSearchNearestTimer最小定时器,
        也就是即将到来的定时器
        */
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // 如果时间事件存在的话
            // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
            long now_sec, now_ms;
            // 计算距今最近的时间事件还要多久才能达到
            // 并将该时间距保存在 tv 结构中
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;
            // 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
             // 执行到这一步,说明没有时间事件
             // 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度
            if (flags & AE_DONT_WAIT) {
                // 设置文件事件不阻塞
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                //设置阻塞
                tvp = NULL; /* wait forever */
            }
        }

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
         //调用epoll_wait
        numevents = aeApiPoll(eventLoop, tvp);
       
        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event laster. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsynching a file to disk,
             * before replying to a client. */
            int invert = fe->mask & AE_BARRIER;

            /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    //这里才是关键,通过epoll_wait调用阻塞一段时间后,然后处理到期的定时器
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

这里有两个关键的函数processTimeEvents,aeApiPoll。
processTimeEvents:轮询eventLoop维护的定时器列表,依次处理到期的定时器,然后调用定时器处理函数,但这里有一个很关键点,如果有人调整过系统时间,然后又调回来(之前调向前,现在调回来)。怎么办?redis是这样处理的

//由于每一次定时器到来后,都会更新这个lastTime,如果现在的时间now小于最后一次到来时间,就说明系统时间肯定被调整过了,然后重置定时器的运行时间
   if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    // 更新最后一次处理时间事件的时间
    eventLoop->lastTime = now;

aeApiPoll:其实就是调用 epoll_wait

muduo
muduo在定时器的设计上面首先选用了计时函数:gettimeofday,而没有用clock_gettime。
其实这两个函数各有优缺点,如下:

函数 优点 缺点
gettiemofday 不是系统调用,用户态实现的,没有上下文切换 精确到只到微妙(不过足以满足)
clock_gettime 精确到纳秒 系统调用,有上下文切换

定时函数选择timefd*系列

  • timerfd_create
  • timerfd_settime
  • timerfd_gettime

用这一系列函数是由原因的,把时间转换成了一个文件描述符,该文件描述符在定时器超时的时刻变成可读,这样就很方便融入到I/O复用框架中,用统一的方式来处理I/O事件和定时器事件。这就是Reactor模式的特点,而且比调用I/O复用的超时精度还要低。

只要转换成了文件描述符,那处理起来就相对简单了,大概流程如下

  1. 通过timerfd_create 创建一个定时器的文件描述符
  2. 加入到定时器管理队列中
  3. 添加事件,调用epoll_ctl
  4. 调用epoll_wait等待事件的到来
  5. 当事件变为可读时,读取内容
  6. 去定时器管理队列中查找,执行对应的定时器处理函数
  7. 然后根据最初设置的定时器类型是否需要更新下一个超时时间
总结

定时器在linux服务器开发中,是必备的。
redis的设计相当巧妙,调用了epoll_wait来做超时,但有一点不好的地方,就是时间精度,epoll_wait超时时间是毫秒级。而muduo则把时间转为了文件描述符,利用Reactor的特点,对统一事件进行处理。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,529评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,015评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,409评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,385评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,387评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,466评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,880评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,528评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,727评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,528评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,602评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,302评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,873评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,890评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,132评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,777评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,310评论 2 342

推荐阅读更多精彩内容