从epoll机制看MessageQueue

epoll机制

一句话解释:epoll机制可以监听特定的fd,当fd收到内容时,发送事件回调。相比selectpoll机制,效率更高。

epoll API

  1. epoll_create(int size)

参数:

  • size:表示最多可以监听多少个fd,新版本已弃用。

返回值:epoll实例的fd

  • >= 0 成功
  • < 0 失败

作用:
初始化epoll机制,调用API后,操作系统内核会产生一个eventpoll实例,并返回一个fd,这个fd就是epoll实例的句柄。

  1. epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

参数:

  • epfd: 方法1中创建的epoll实例的fd
  • op: 操作指令
    • EPOLL_CTL_ADD: 注册新的fd到epfd中
    • EPOLL_CTL_MOD:修改已注册的fd的监听事件
    • EPOLL_CTL_DEL:从epfd中删除一个fd
  • fd:要监听的fd
  • event:要监听的event
typedef union epoll_data {
   void        *ptr;
   int          fd;
   uint32_t     u32;
   uint64_t     u64;
} epoll_data_t;

struct epoll_event {
   uint32_t     events;  // 表示监听的事件类型(EPOLLIN/EPOLLHUP/EPOLLOUT...)
   epoll_data_t data; // 用户自定义数据,当事件发生时将会原样返回给用户
};

返回值:

  • >= 0 成功
  • < 0 失败

作用:
注册、修改或删除监听的fd和事件。

  1. epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)

参数:

  • epfd:方法1中创建的epoll实例的fd
  • events:和2中结构一样
  • maxevents:events数量
  • timeout:等待超时时间。如果超过timeout还没有事件,则返回

返回值:
到来事件的个数,返回的事件存储在events数组中。

MessageQueue原理

大家都知道,Android的主线程的Looper,本质是运行了一个死循环,不断从MessageQueue中取消息执行,如果没有消息,则会等待在nativePollOnce方法上,这个方法的底层原理就是epoll_wait.

下面一起来看源码,弄清楚Looper的整个流程是怎样的。

首先看Looper.java中的loop方法,这是我们启动一个线程looper的入口。
// Looper.java

public static void loop() {
  final Looper me = myLooper();
  for (;;) {
    if (!loopOnce(me, ident, thresholdOverride)) {
      return;
    }
  }
}

这个方法就是开启一个无限循环,调用loopOnce
// Looper.java

private static boolean loopOnce(final Looper me, final long ident, final int thresholdOverride) {
  // 从messagequeue中获取下一条message
  Message msg = me.mQueue.next();
  // 执行message
  msg.target.dispatchMessage(msg);
  // Android10开始,可以通过添加observer的方式,监听messsage的执行情况
  if (observer != null) {
    observer.messageDispatched(token, msg);
  }
  //  回收message
  msg.recycleUnchecked();
  return true;
}

loopOnce方法返回true之后,又会再次循环调到loopOnce,重复执行。
接下来我们看MessageQueuenext()方法。

// MessageQueue.java

Message next() {
  for (;;) {
    // 通过epoll机制,等待消息,或超时唤醒
    nativePollOnce(ptr, nextPollTimeoutMills);
    synchronized (this) {
       Message prevMsg = null;
       Message msg = mMessages;
       if (msg != null && msg.target == null) {
          // msg.target == null 表示是同步屏障
          // 如果有同步屏障,则直接跳到下一个异步的消息(同步的消息都过滤掉,先不处理)
          do {
             prevMsg = msg;
             msg = msg.next;
          } while (msg != null && !msg.isAsynchronous());
       }
       if (msg != null) {
         if (now < msg.when) {
          // 如果当前还没到达message的执行时间, 则获取当前的时间差作为timeout
          nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
         }else{
           // 省略一些链表的操作 prevMsg.next = msg.next; msg.next = null;
           // 直接返回已经到达执行时间的,第一条message
           return msg;
         }
       }
    }
  }
}

这个方法,首先会调用nativePollOnce这个native方法,等nativePollOnce返回后,会去MessageQueue的链表中取下一条待执行的message。

取message的方法:

  • 取链表头的第一个message(MessageQueue中的message是按照时间顺序排列的,所以第一个就是最近的待执行的message)
  • 如果这个消息是同步屏障,则跳过所有同步消息,直接取下一个异步消息,返回
  • 否则,判断当前message是否到执行时间,如果到执行时间,则直接返回,否则继续调nativePollOnce等待。

接下来看nativePollOnce的实现。

// android_os_MessageQueue_nativePollOnce()

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
    //将Java层传递下来的mPtr转换为nativeMessageQueue
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis); 【3】
}

经过一系列调用,最后调到了Looper中的pollInner方法

Looper.cpp

int Looper::pollInner(int timeoutMillis) {
    ...
    struct epoll_event eventItems[EPOLL_MAX_EVENTS]; //fd最大个数为16
    //等待事件发生或者超时,在nativeWake()方法,向管道写端写入字符,则该方法会返回;
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    //循环遍历,处理所有的事件
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd) {
            if (epollEvents & EPOLLIN) {
                // 如果是唤醒事件,则读取并清空管道数据
                awoken(); 
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                // 如果是之前在mRequests中注册过的fd
                //处理request,生成对应的reponse对象,push到响应数组
                pushResponse(events, mRequests.valueAt(requestIndex));
            }
        }
    }
Done: ;
    //处理Native的Message,调用相应回调方法
    while (mMessageEnvelopes.size() != 0) {
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            {
                handler->handleMessage(message);  // 处理消息事件
            }
        } else {
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    //处理带有Callback()方法的Response事件,执行Reponse相应的回调方法
    for (size_t i = 0; i < mResponses.size(); i++) {
        if (response.request.ident == POLL_CALLBACK) {
            // 处理请求的回调方法
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq); //移除fd
            }
        }
    }
    return result;
}

Looper.pollInner主要做如下事情:

  • 调用epoll_wait,等待在一些特定的fd上
  • epoll_wait返回后(fd发生写入或超时时间到),执行唤醒的事件。
    • 如果唤醒的是mWakeEventFd,则直接调用awoken方法。
    • 如果唤醒的是之前注册在mRequests中的fd,则将Request生成一个对应的Response,加入mResponses集合
  • 处理native message,执行相应的回调方法
  • 处理mResponses集合中的所有Response事件,调用他们callbackhandleEvent回调方法。(点击事件就是在这里被执行的)

我们来看看awoken方法。它的逻辑很简单,就是循环读取fd中的全部内容。
// Looper.cpp

void Looper::awoken() {
    char buffer[16];
    ssize_t nRead;
    do {
        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}

关于mRequestsmResponses的逻辑,先挖个坑,后面的文章再讲。

epoll使用示例

下面我们写一个epoll机制使用的示例代码。

在这个例子中,我们监听了一个sockfd的管道端口,启动一个线程,等待在epoll_wait上。一旦sockfd中写入数据,就可以唤醒我们的线程,进行读取。

#include <sys/socket.h>

void MonitorInit::createEpoll(int sockfd) {
    if(mSockFd == sockfd) return;
    mEpollFd = epoll_create(EPOLL_MAX_EVENTS);
    int epollEvents = 0;
    epollEvents |= EPOLLIN;
    struct epoll_event eventItem;
    memset(&eventItem, 0, sizeof(epoll_event));
    eventItem.events = epollEvents;
    eventItem.data.fd = sockfd;
    int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, sockfd, &eventItem);
    LLOG_ERROR(TAG_LOOPER, "Adding epoll event resutl %d", epollResult);
    if(epollResult < 0){
        LLOG_ERROR(TAG_LOOPER, "Error adding epoll event, fd %d, errno %d", sockfd, epollResult);
    }
    pthread_t thd;
    // 开启一个线程,这个线程用来监听epoll_wait
    pthread_create(&thd, nullptr, epollCallback, nullptr);
    pthread_detach(thd);
    mSockFd = sockfd;
}

void epollCallback(void *arg){
    // 死循环,等待fd消息
    while(loop){
        int timeoutMillis = 100000;
        struct epoll_event eventItems[EPOLL_MAX_EVENTS];
        int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
        LLOG_ERROR(TAG_LOOPER, "receive event count %d", eventCount);
        if(eventCount < 0){
            LLOG_ERROR(TAG_LOOPER, "Poll failed with an unexpected error, errno=%d", errno);
        }
        if(eventCount == 0){
            // 超时时间到
            LLOG_ERROR(TAG_LOOPER, "pollOnce - timeout");
        }
        for(int i = 0; i < eventCount; i++){
            int fd = eventItems[i].data.fd;
            uint32_t epollEvents = eventItems[i].events;
            if(fd == mSockFd){
                // 将fd的内容读出来
            }
        }
    }
}

总结

MessageQueue核心原理:主线程通过Looper中的死循环,不断从MessageQueue中获取待指定的message。

  • 如果有到执行时间的消息时,直接执行。
  • 如果还没有到执行时间的消息,会通过epoll_wait等待在mWakeReadPipeFd端口,等待内容写入,超时时间是下一个message执行时间到现在的时间差。
    • 如果在等待的过程中,有新的消息插入队列,会往mWakeReadPipeFd端口写入数据,这样就能唤醒等待在这个上面的pollInner方法,从而继续执行之后的message
    • 如果等待的过程中,没有新的消息插入,则会在timeout时间到达的时候,唤醒,处理后面的message

扩展阅读

深入理解Linux的epoll机制

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容