Looper循环中,如果messageQueue没有消失,还会一直循环下去吗
这个问题涉及linuex里面的pipe(管道)和epoll机制,
先给出答案:不会一直循环下去,阻塞起来
首先说下pipe
pipe:中文意思是管道,使用I/O流操作,实现跨进程通信,管道的一端的读,另一端写,标准的生产者消费者模式
下面说下5种I/O模型 参考大话 Select、Poll、Epoll https://cloud.tencent.com/developer/article/1005481
[1] blocking IO - 阻塞IO [2] nonblocking IO - 非阻塞IO [3] IO multiplexing - IO多路复用 [4] signal driven IO - 信号驱动IO [5] asynchronous IO - 异步IO
其中前面4种IO都可以归类为synchronous IO - 同步IO,在介绍select、poll、epoll之前,首先介绍一下这几种IO模型,signal driven IO平时用的比较少,这里就不介绍了。
1. IO - 同步、异步、阻塞、非阻塞
下面以network IO中的read读操作为切入点,来讲述同步(synchronous) IO和异步(asynchronous) IO、阻塞(blocking) IO和非阻塞(non-blocking)IO的异同。一般情况下,一次网络IO读操作会涉及两个系统对象:(1) 用户进程(线程)Process;(2)内核对象kernel,两个处理阶段:
[1] Waiting for the data to be ready - 等待数据准备好 [2] Copying the data from the kernel to the process - 将数据从内核空间的buffer拷贝到用户空间进程的buffer
IO模型的异同点就是区分在这两个系统对象、两个处理阶段的不同上。
1.1 同步IO 之 Blocking IO
官方描述: 如上图所示,用户进程process在Blocking IO读recvfrom操作的两个阶段都是等待的。在数据没准备好的时候,process原地等待kernel准备数据。kernel准备好数据后,process继续等待kernel将数据copy到自己的buffer。在kernel完成数据的copy后process才会从recvfrom系统调用中返回。
本人描述: 用户进程向内核对象请求数据,如果内核对象数据没准备好,则用户进程一直等下去,直到内核将数据准备好,并且复制到用户进程
1.2 同步IO 之 NonBlocking IO
官方描述: 从图中可以看出,process在NonBlocking IO读recvfrom操作的第一个阶段是不会block等待的,如果kernel数据还没准备好,那么recvfrom会立刻返回一个EWOULDBLOCK错误。当kernel准备好数据后,进入处理的第二阶段的时候,process会等待kernel将数据copy到自己的buffer,在kernel完成数据的copy后process才会从recvfrom系统调用中返回。
本人描述:实际上就是用户进程不断轮寻,看内核进程的数据是否准备好,当然第二阶段(kernel将数据copy到自己的buffer)用户进程是需要等待的
1.3 同步IO 之 IO multiplexing
IO多路复用,就是我们熟知的select、poll、epoll模型。从图上可见,在IO多路复用的时候,process在两个处理阶段都是block住等待的。初看好像IO多路复用没什么用,其实select、poll、epoll的优势在于可以以较少的代价来同时监听处理多个IO。
1.4 异步IO
从上图看出,异步IO要求process在recvfrom操作的两个处理阶段上都不能等待,也就是process调用recvfrom后立刻返回,kernel自行去准备好数据并将数据从kernel的buffer中copy到process的buffer在通知process读操作完成了,然后process在去处理。遗憾的是,linux的网络IO中是不存在异步IO的,linux的网络IO处理的第二阶段总是阻塞等待数据copy完成的。真正意义上的网络异步IO是Windows下的IOCP(IO完成端口)模型。
很多时候,我们比较容易混淆non-blocking IO和asynchronous IO,认为是一样的。但是通过上图,几种IO模型的比较,会发现non-blocking IO和asynchronous IO的区别还是很明显的,non-blocking IO仅仅要求处理的第一阶段不block即可,而asynchronous IO要求两个阶段都不能block住。
select与epoll
阻塞I/O模式下,一个线程只能处理一个流的I/O事件。如果想要同时处理多个流,要么多进程(fork),要么多线程(pthread_create),很不幸这两种方法效率都不高。
我们只要不停的把所有流从头到尾问一遍,又从头开始。这样就可以处理多个流了,但这样的做法显然不好,因为如果所有的流都没有数据,那么只会白白浪费CPU。这里要补充一点,阻塞模式下,内核对于I/O事件的处理是阻塞或者唤醒,而非阻塞模式下则把I/O事件交给其他对象(后文介绍的select以及epoll)处理甚至直接忽略。
为了避免CPU空转,可以引进了一个代理(一开始有一位叫做select的代理,后来又有一位叫做poll的代理,不过两者的本质是一样的)。这个代理比较厉害,可以同时观察许多流的I/O事件,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有I/O事件时,就从阻塞态中醒来,于是我们的程序就会轮询一遍所有的流(于是我们可以把“忙”字去掉了)
于是,如果没有I/O事件产生,我们的程序就会阻塞在select处。但是依然有个问题,我们从select那里仅仅知道了,有I/O事件发生了,但却并不知道是那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。
但是使用select,我们有O(n)的无差别轮询复杂度,同时处理的流越多,没一次无差别轮询时间就越长。再次
说了这么多,终于能好好解释epoll了
epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll之会把哪个流发生了怎样的I/O事件通知我们。此时我们对这些流的操作都是有意义的。(复杂度降低到了O(1))
epoll与select/poll的区别
select,poll,epoll都是IO多路复用的机制。I/O多路复用就通过一种机制,可以监视多个描述符,一旦某个描述符就绪,能够通知程序进行相应的操作。
select的本质是采用32个整数的32位,即32*32= 1024来标识,fd值为1-1024。当fd的值超过1024限制时,就必须修改FD_SETSIZE的大小。这个时候就可以标识32*max值范围的fd。
poll与select不同,通过一个pollfd数组向内核传递需要关注的事件,故没有描述符个数的限制,pollfd中的events字段和revents分别用于标示关注的事件和发生的事件,故pollfd数组只需要被初始化一次。
epoll还是poll的一种优化,返回后不需要对所有的fd进行遍历,在内核中维持了fd的列表。select和poll是将这个内核列表维持在用户态,然后传递到内核中。与poll/select不同,epoll不再是一个单独的系统调用,而是由epoll_create/epoll_ctl/epoll_wait三个系统调用组成,后面将会看到这样做的好处。epoll在2.6以后的内核才支持。
select/poll的几大缺点:
1、每次调用select/poll,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
2、同时每次调用select/poll都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
3、针对select支持的文件描述符数量太小了,默认是1024
为什么epoll相比select/poll更高效
传统的poll函数相当于每次调用都重起炉灶,从用户空间完整读入ufds,完成后再次完全拷贝到用户空间,另外每次poll都需要对所有设备做至少做一次加入和删除等待队列操作,这些都是低效的原因。
epoll的解决方案中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次。select, poll和epoll都是使用waitqueue调用callback函数去wakeup你的异步等待线程的,如果设置了timeout的话就起一个hrtimer,select和poll的callback函数并没有做什么事情,但epoll的waitqueue callback函数把当前的有效fd加到ready list,然后唤醒异步等待进程,所以epoll函数返回的就是这个ready list, ready list中包含所有有效的fd,这样一来kernel不用去遍历所有的fd,用户空间程序也不用遍历所有的fd,而只是遍历返回有效fd链表。
epoll用法
1. int epoll_create(int size);
创建一个epoll的句柄,size用来告诉内核需要监听的数目一共有多大。当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close() 关闭,否则可能导致fd被耗尽。epoll通过epoll_ctl来对监控的fds集合来进行增、删、改,那么必须涉及到fd的快速查找问题,于是,一个低时间复杂度的增、删、改、查的数据结构来组织被监控的fds集合是必不可少的了。在linux 2.6.8之前的内核,epoll使用hash来组织fds集合,于是在创建epoll fd的时候,epoll需要初始化hash的大小。于是epoll_create(int size)有一个参数size,以便内核根据size的大小来分配hash的大小。在linux 2.6.8以后的内核中,epoll使用红黑树来组织监控的fds集合,于是epoll_create(int size)的参数size实际上已经没有意义了。
2.int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数,第一个参数是 epoll_create() 的返回值,第二个参数表示动作,使用如下三个宏来表示:
EPOLL_CTL_ADD //注册新的fd到epfd中;
EPOLL_CTL_MOD //修改已经注册的fd的监听事件;
EPOLL_CTL_DEL //从epfd中删除一个fd;
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event 结构如下:
typedef union epoll_data
{
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events 可以是以下几个宏的集合:
EPOLLIN //表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT //表示对应的文件描述符可以写;
EPOLLPRI //表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR //表示对应的文件描述符发生错误;
EPOLLHUP //表示对应的文件描述符被挂断;
EPOLLET //将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT//只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
参数events用来从内核得到事件的集合,maxevents 告之内核这个events有多大,这个 maxevents 的值不能大于创建 epoll_create() 时的size,参数 timeout 是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
LT VS ET
说这个之前,先说两个概念
阻塞模式:针对的是fd的read/write阻塞,如果当前没有fd数据可读,那么read阻塞,如果当前没有数据可写(可能管道满了),那么write阻塞
非阻塞模式:针对的是fd的read/write,如果当前没有fd数据可读,那么read后返回调用返回-1,errno值为EAGAIN,如果当前没有数据可写(可能管道满了),那么write后返回调用返回-1,errno值为EAGAIN
EPOLL事件有两种模型 Level Triggered (LT) 和 Edge Triggered (ET):
LT(level triggered,水平触发模式)是缺省的工作方式,并且同时支持 block 和 non-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。
说白了就是以下两点
当管道不为空时,有数据可读,则可读事件一直触发直到将管道中的数据read完毕
当管道不为空时,有数据可写,则可写事件一直触发
它强调的是处于某种状态
补充一点:EPOLL可以监听多个fd,如果fd1的模式设为阻塞模式的话,当对fd1进行read操作时,不要单独把read放在死循环里面读取,因为read完所有数据之后,再去read的话会阻塞,导致该此代码阻塞,此时如果fd2有数据可读时(来自其他线程或者进程向fd2写数据),也不会解除阻塞,导致无法处理数据,这样明显是不对的,除非有其他线程或者进程向fd1写数据,才能解除fd1的read阻塞,
所以fd1的模式设为阻塞模式的话,可以将epoll_wait和read按顺序一起放在死循环,每次循环执行epoll_wait和reada读取固定的数据(epoll_wait在LT模式下,如果有数据可读,不会阻塞),这样当fd1的数据读取完毕之后,epoll_wait就会阻塞,不会执行read操作,避免read阻塞,此时如果fd2有数据可读时,epoll_wait就可以解除阻塞,处理fd2的数据了,不会受其他fd的影响
ET(edge-triggered,边缘触发模式)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,等到下次有新的数据进来的时候才会再次出发就绪事件。
说白了就是以下两点
当管道中有新的数据来临时(有空变为非空或者在非空的情况下有新的数据),则触发可读事件
当管道中的有满载变为非满载状态时,有数据可写,则触发可写事件
它强调的是某种状态变化时触发事件,只触发一次,过了就不会触发,
也就是如果管道中有新的数据来临时,epoll_wait结束等待,此时如果不读取管道中的数据,那么代码再次循环到epoll_wait,阻塞,直到下次有新的数据来临时才会返回,所以如果是ET模式的话,可读事件触发时一定要将数据及时取出来,不要死扛着,过了这村就没那店了,另一方面因为可能数据比较大,所以要多次读取,比如在while里面读取例如110代码,如果此时fd是阻塞模式,那么当读完毕之后,管道为null,必然会导致read阻塞,影响其他fd的数据处理,明显不符合题意,所以这里设置fd为非阻塞模式,当读取完毕之后会返回-1,而且errno == EAGAIN(115行代码),这样就可以直接break,结束这次读取工作,不会导致阻塞.所以ET强制fd为非阻塞模型
来看个示例代码
1 //
2 // Created by user on 2020/1/31.
3 //
4
5 #include "TestBKEPoll.h"
6
7 #include <sys/epoll.h>
8 #include <unistd.h>
9 #include <iostream>
10 #include <iostream>
11 #include <pthread.h>
12 #include <unistd.h>
13 #include <thread>
14 #include <stdio.h>
15 #include <sys/prctl.h>
16 #include <fcntl.h>
17
18 using namespace std;
19
20 void *threadwrite(void *data) {
21 printf("write pid=%d,ppid=%d,tid=%d\n", getpid(), getppid(),
22 this_thread::get_id());
23 sleep(5);
24 int *fd = (int *) data;
25 char *buf = "iamcj";
26 write(*fd, buf, 3);
27 write(fd[1], buf, 3);
28 write(fd[2], buf, 3); char *buf1 = "kl";
29 // sleep(5);
30 // write (*fd, buf1, 5);
31 printf("\n");
32 printf("write *fd=%d\n", *fd);
33 // printf("start pid=%d,ppid=%d,tid=%d\n", getpid(), getppid(),this_thread::get_id());
34 }
35
36 int setfd_nonblock(int fd) {
37 int old_flags = fcntl(fd, F_GETFD);
38 fcntl(fd, F_SETFL, old_flags | O_NONBLOCK);
39 return old_flags;
40 }
41
42 int testepoll() {
43 int ret = 0;
44 int pipe_fd[2];
45 int pipe_fd1[2];
46 int pipe_fd2[2];
47 /**
48 * 调用pipe函数时在内核中开辟一块缓冲区(称为管道)用于通信,它有一个读端和一个写端,
49 * 然后通过fd参数传出给用户程序两个文件描述符,fd[0]指向管道的读端,fd[1]指向管道的写端(
50 * 很好记,就像0是标准输入1是标准输出一样)。
51 * 所以管道在用户程序看起来就像一个打开的文件,
52 * 通过read(fd[0])或者write(fd[1])向这个文件读写数据其实是在读写内核缓冲区。
53 * pipe函数调用成功返回0,调用失败返回-1。
54 */
55 if ((ret = pipe(pipe_fd)) < 0) {
56 cout << "create pipe fail:" << ret << ",errno:" << errno << endl;
57 return -1;
58 }
59 if ((ret = pipe(pipe_fd1)) < 0) {
60 cout << "create pipe1 fail:" << ret << ",errno:" << errno << endl;
61 return -1;
62 }
63 if ((ret = pipe(pipe_fd2)) < 0) {
64 cout << "create pipe1 fail:" << ret << ",errno:" << errno << endl;
65 return -1;
66 }
67 // child=fork();
68 printf("testCreateThread pid=%d,ppid=%d,tid=%d\n", getpid(), getppid(), this_thread::get_id());
69
70 pthread_t thread;
71 //创建一个线程并自动执行
72 int pd[3] = {pipe_fd[1], pipe_fd1[1], pipe_fd2[1]};
73 int id = pthread_create(&thread, NULL, threadwrite, (void *) pd);
74 printf("testCreateThread pipe_fd[0]=%d,pipe_fd1[0]=%d,pipe_fd2[0]=%d\n",
75 pipe_fd[0], pipe_fd1[0], pipe_fd2[0]);
76 printf("testCreateThread pipe_fd[1]=%d,pipe_fd1[1]=%d,pipe_fd2[1]=%d\n",
77 pipe_fd[1], pipe_fd1[1], pipe_fd2[1]);
78 struct epoll_event ev, ev1, ev2, events[20]; //事件临时pipe_fd[1]变量
79 ev.data.fd = pipe_fd[0]; //设置监听文件描述符
80 ev.events = EPOLLET | EPOLLIN;
81 ev1.data.fd = pipe_fd1[0]; //设置监听文件描述符
82 ev1.events = EPOLLET | EPOLLIN; //设置要处理的事件类型
83 ev2.data.fd = pipe_fd2[0]; //设置监听文件描述符
84 ev2.events = EPOLLET | EPOLLIN; //设置要处理的事件类型
85 int epfd = epoll_create(1);
86 printf("epfd=%d\n", epfd);
87 if (ev.events & EPOLLET) {
88 set_nonblock(pipe_fd[0]);
89 }
90 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd[0], &ev);
91 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd1[0], &ev1);
92 ret = epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd2[0], &ev2);
93 for (;;) {
94 printf("sleep before\n");
95 sleep(2);
96 printf("sleep after\n");
97 int count = epoll_wait(epfd, events, 2, -1);
98 printf("count is %d,pid=%d,ppid=%d\n", count, getpid(), getppid());
99 char r_buf[100];
100 for (int i = 0; i < count; i++) {
101 printf("pipe_fd[0]=%d,events[%d].data.fd=%d,events=%d,%d\n",
102 pipe_fd[0], i, events[i].data.fd,
103 events[i].events, (events[i].events & EPOLLIN));
104 if ((events[i].data.fd == pipe_fd[0] ||
105 events[i].data.fd == pipe_fd1[0] ||
106 events[i].data.fd == pipe_fd2[0])
107 && (events[i].events & EPOLLIN)) {
108 printf("ok\n");
109 printf("EPOLLET\n");
110 // while (true) {
111 int r_num = read(events[i].data.fd, r_buf, 1);
112 printf(
113 "read r_num is %d bytes data from the pipe,value is %s atoi is %d and errno is %d \n",
114 r_num, r_buf, atoi(r_buf), errno);
115 if (r_num < 0 && errno == EAGAIN) {
116 break;
117 }
118 // }
119
120 }
121 }
122 }
123 return 0;
124 }
下面来解读下这部分代码,注意因为mac上没有epoll机制,所以这段代码只能在linux上运行
先看看55行-66行,这部分代码主要是调用pipe函数时在内核中开辟一块缓冲区(称为管道)用于通信,它有一个读端和一个写端,
然后通过fd参数传出给用户程序两个文件描述符,fd[0]指向管道的读端,fd[1]指向管道的写端
执行完这段代码之后会生成相应的文件描述符,可以通过
ls -l /proc/pid/fd查看,比如
lr-x------ 1 user user 64 1月 31 20:24 3 -> pipe:[83141]
l-wx------ 1 user user 64 1月 31 20:24 4 -> pipe:[83141]
lr-x------ 1 user user 64 1月 31 20:24 5 -> pipe:[83142]
l-wx------ 1 user user 64 1月 31 20:24 6 -> pipe:[83142]
lr-x------ 1 user user 64 1月 31 20:24 7 -> pipe:[83143]
l-wx------ 1 user user 64 1月 31 20:24 8 -> pipe:[83143]
3,4,5,6,7,8就是对应的fd,在代码里面打印fd的值也是跟这个是输出是匹配的,其实3和4是一对,表示读写两端,其他的以此类推,记住这些fd用完之后要释放(close),避免占用资源73行:创建一个线程,为了向pipe中写入数据,触发可读事件
79行:这行表示事件关联的文件描述符,说白了就是当有可读或者可写事件发生时,epoll得知道这个事件是来源于哪个fd,以便从这个fd读写数据,这行代码就是这个作用,跟104行的代码对应起来了
85行:创建一个epoll的句柄,返回一个文件描述符,可以认为是操作多个fd监听事件的操作句柄,集合了一下,可以通过ls -l /proc/pid/fd查看该fd,示例如下
lrwx------ 1 user user 64 1月 31 20:24 9 -> anon_inode:[eventpoll]
后面也表示是eventpoll的fd87-89行:如果事件是ET(edge trigger)边缘模式,则设置监听该事件的fd是非阻塞了,上面LT VS ET已经解释过为什么这么做了。
90-92行:在三个不同的fd上注册监听事件,这里坚监听的都是EPOLLIN,表示可读事件,其实笔者思考了下,综合79行代码,可以看出event与fd是双向关联关系,相互持有,其实我很困惑为什么不在注册监听event的时候,把fd的句柄赋值到ev.data.fd,为什么还要单独显式的写出来,刚开始学习这个的时候很容易忘记写ev.data.fd = pipe_fd[0],导致104行的判断代码一致失败,很疑惑。可能是开放给开发者,给开发者更大的发挥空间
111行:读取管道中的数据,
对于LT模式来说,如果这次read没有读取完毕(比如只读取一部分数据)或者把这样代码注释掉,那么在循环中epoll_wait将一直返回,它会执行97行之后的语句,不会阻塞,,因为epoll_wait原本的语意是:监控并探测socket是否有数据可读(对于读事件而言)。LT模式保留了其原本的语意,只要socket还有数据可读,它就能不断反馈,于是,我们想什么时候读取处理都可以,我们永远有再次poll的机会去探测是否有数据可以处理,这样带来了编程上的很大方便,不容易死锁造成某些socket饿死。相反,ET模式修改了epoll_wait原本的语意,变成了:监控并探测socket是否有新的数据可读。
对于ET模式来说
即使这次read没有读取完毕(比如只读取一部分数据)或者把这样代码注释掉,再次循环回到epoll_wait时,它会阻塞,因为可读事件只会触发一次
补充:在Android的中epoll跟上面的说代码很类似,只是Android的fd不是通过pipe获取,而是通过eventfd()函数获取,Android代码如下
大概了解下eventfd()函数
eventfd()函数会创建一个“eventfd”对象,用户空间的应用程序可以用这个eventfd来实现事件的等待或通知机制,也可以用于内核通知新的事件到用户空间应用程序。 这个对象包含一个64-bit的整形计数器,内核空间维护这个计数器,创建这个计数器的时候使用第一个入参initval来初始化计数器。
对于eventfd的读写操作对应函数read和write
多余的不说了,可以参考下这两篇文章
通过实例来理解 eventfd 函数机制
Linux进程间通信——eventfd
用我个人的了解就是eventfd创建的时候会初始化一个计数器,对于epoll_wait来说,只要计数器的值不为0,那么epoll_wait就不会阻塞,因为可读,当向eventfd中write数据时,write函数相当于是向计数器中进行“添加”,比如说计数器中的值原本是2,如果write了一个3,那么计数器中的值就变成了5,
当read数据时:
针对无EFD_SEMAPHORE的flag来说,read一次就把计数器清零,
执行完read,继续epoll_wait等待
针对有EFD_SEMAPHORE的flag来说,read一次计数器减1,循环read,直到计数器为0,设置条件,退出read,继续epoll_wait等待
所以write函数里面的count参数,直对EFD_SEMAPHORE有效,没有EFD_SEMAPHORE时,其实count>0即可,无关次数.
eventfd的示例代码:
#include "TestBKEPoll.h"
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <iostream>
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <thread>
#include <stdio.h>
#include <sys/prctl.h>
#include <fcntl.h>
using namespace std;
void *threadwrite(void *data) {
printf("write pid=%d,ppid=%d,tid=%d\n", getpid(), getppid(),
this_thread::get_id());
sleep(5);
int *fd = (int *) data;
uint64_t count = 3;//写入的计数器的值
write(*fd, &count, sizeof(uint64_t));
printf("\n");
printf("write *fd=%d\n", *fd);
}
int testepoll() {
printf("testCreateThread pid=%d,ppid=%d,tid=%d\n", getpid(), getppid(), this_thread::get_id());
int testEventfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
pthread_t thread;
//创建一个线程并自动执行
int id = pthread_create(&thread, NULL, threadwrite, (void *) pd);
struct epoll_event ev; //事件临时pipe_fd[1]变量
ev.data.fd = testEventfd; //设置监听文件描述符
ev.events = EPOLLET | EPOLLIN;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, eventfd, &ev);
for (;;) {
printf("sleep before\n");
sleep(2);
printf("sleep after\n");
int count = epoll_wait(epfd, events, 2, -1);
printf("count is %d,pid=%d,ppid=%d\n", count, getpid(), getppid());
char r_buf[100];
for (int i = 0; i < count; i++) {
printf("pipe_fd[0]=%d,events[%d].data.fd=%d,events=%d,%d\n",
pipe_fd[0], i, events[i].data.fd,
events[i].events, (events[i].events & EPOLLIN));
if ((events[i].data.fd == testEventfd)
&& (events[i].events & EPOLLIN)) {
printf("ok\n");
printf("EPOLLET\n");
uint64_t count = 0;//读取的计数器值,eventfd的flag有EFD_SEMAPHORE时,每次read时count减1,没有EFD_SEMAPHORE时,只read一次,count为write过来的数字,即3
while (true) {
int r_num = read(events[i].data.fd, &count, sizeof(uint64_t));
printf(
"read r_num is %d bytes data from the pipe,value is %s atoi is %d and errno is %d \n",
r_num, count, atoi(count), errno);
if (r_num < 0 && errno == EAGAIN) {
break;
}
}
}
}
}
return 0;
}
思考
1.如果messagequeue里面没有message,那么looper会阻塞,相当于主线程阻塞,那么点击事件是怎么传入到主线程呢?
首先上面说loop之所有会阻塞,是因为epoll机制,代码位于Loop.cpp中的pollOnce方法,所以要让主线程接触阻塞,必须要往管道里面write,那么当点击屏幕时
谁来往当前app的主线程的管道wrtite呢,看看调用栈
"main@4011" prio=5 runnable
java.lang.Thread.State: RUNNABLE
at android.os.MessageQueue.enqueueMessage(MessageQueue.java:534)
at android.os.Handler.enqueueMessage(Handler.java:631)
at android.os.Handler.sendMessageAtTime(Handler.java:600)
at android.os.Handler.sendMessageDelayed(Handler.java:570)
at android.os.Handler.postDelayed(Handler.java:398)
at android.view.View.postDelayed(View.java:13011)
at android.view.View.onTouchEvent(View.java:10370)
at android.widget.TextView.onTouchEvent(TextView.java:8300)
at android.view.View.dispatchTouchEvent(View.java:9300)
at android.view.ViewGroup.dispatchTransformedTouchEvent(ViewGroup.java:2553)
at android.view.ViewGroup.dispatchTouchEvent(ViewGroup.java:2197)
at android.view.ViewGroup.dispatchTransformedTouchEvent(ViewGroup.java:2553)
at android.view.ViewGroup.dispatchTouchEvent(ViewGroup.java:2197)
at android.view.ViewGroup.dispatchTransformedTouchEvent(ViewGroup.java:2553)
at android.view.ViewGroup.dispatchTouchEvent(ViewGroup.java:2197)
at android.view.ViewGroup.dispatchTransformedTouchEvent(ViewGroup.java:2553)
at android.view.ViewGroup.dispatchTouchEvent(ViewGroup.java:2197)
at android.view.ViewGroup.dispatchTransformedTouchEvent(ViewGroup.java:2553)
at android.view.ViewGroup.dispatchTouchEvent(ViewGroup.java:2197)
at android.view.ViewGroup.dispatchTransformedTouchEvent(ViewGroup.java:2553)
at android.view.ViewGroup.dispatchTouchEvent(ViewGroup.java:2197)
at android.view.ViewGroup.dispatchTransformedTouchEvent(ViewGroup.java:2553)
at android.view.ViewGroup.dispatchTouchEvent(ViewGroup.java:2197)
at com.android.internal.policy.PhoneWindow$DecorView.superDispatchTouchEvent(PhoneWindow.java:2403)
at com.android.internal.policy.PhoneWindow.superDispatchTouchEvent(PhoneWindow.java:1737)
at android.app.Activity.dispatchTouchEvent(Activity.java:2771)
at com.android.internal.policy.PhoneWindow$DecorView.dispatchTouchEvent(PhoneWindow.java:2364)
at android.view.View.dispatchPointerEvent(View.java:9520)
at android.view.ViewRootImpl$ViewPostImeInputStage.processPointerEvent(ViewRootImpl.java:4230)
at android.view.ViewRootImpl$ViewPostImeInputStage.onProcess(ViewRootImpl.java:4096)
at android.view.ViewRootImpl$InputStage.deliver(ViewRootImpl.java:3642)
at android.view.ViewRootImpl$InputStage.onDeliverToNext(ViewRootImpl.java:3695)
at android.view.ViewRootImpl$InputStage.forward(ViewRootImpl.java:3661)
at android.view.ViewRootImpl$AsyncInputStage.forward(ViewRootImpl.java:3787)
at android.view.ViewRootImpl$InputStage.apply(ViewRootImpl.java:3669)
at android.view.ViewRootImpl$AsyncInputStage.apply(ViewRootImpl.java:3844)
at android.view.ViewRootImpl$InputStage.deliver(ViewRootImpl.java:3642)
at android.view.ViewRootImpl$InputStage.onDeliverToNext(ViewRootImpl.java:3695)
at android.view.ViewRootImpl$InputStage.forward(ViewRootImpl.java:3661)
at android.view.ViewRootImpl$InputStage.apply(ViewRootImpl.java:3669)
at android.view.ViewRootImpl$InputStage.deliver(ViewRootImpl.java:3642)
at android.view.ViewRootImpl.deliverInputEvent(ViewRootImpl.java:5922)
at android.view.ViewRootImpl.doProcessInputEvents(ViewRootImpl.java:5896)
at android.view.ViewRootImpl.enqueueInputEvent(ViewRootImpl.java:5857)
at android.view.ViewRootImpl$WindowInputEventReceiver.onInputEvent(ViewRootImpl.java:6025)
at android.view.InputEventReceiver.dispatchInputEvent(InputEventReceiver.java:185)
at android.os.MessageQueue.nativePollOnce(MessageQueue.java:-1)
at android.os.MessageQueue.next(MessageQueue.java:323)
at android.os.Looper.loop(Looper.java:135)
at android.app.ActivityThread.main(ActivityThread.java:5417)
at java.lang.reflect.Method.invoke(Method.java:-1)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
这个dispatchInputEvent实际上是从native方法里面调用的,Looper.cpp 中pollInner方法中的handleEvent最终会调用dispatchInputEvent,这个里面没有
的调用实际上已经在app的主线程啦,所以write不是在这儿,这里只是想看下调用栈
真正write是在system_server进程中主要是InputTransport.cpp中的send方法,这里面的调用比较复杂,可以参考
Input系统—事件处理全过程 http://gityuan.com/2016/12/31/input-ipc/,这篇文章
2.如果messagequeue里面没有message,那么looper会阻塞,相当于主线程阻塞,那么广播事件怎么传入主线程?
直接看调用栈
"Binder_3@4082" prio=5 runnable
java.lang.Thread.State: RUNNABLE
at android.os.MessageQueue.enqueueMessage(MessageQueue.java:534)
at android.os.Handler.enqueueMessage(Handler.java:631)
at android.os.Handler.sendMessageAtTime(Handler.java:600)
at android.os.Handler.sendMessageDelayed(Handler.java:570)
at android.os.Handler.sendMessage(Handler.java:507)
at android.app.ActivityThread.sendMessage(ActivityThread.java:2281)
at android.app.ActivityThread.sendMessage(ActivityThread.java:2258)
at android.app.ActivityThread.-wrap25(ActivityThread.java:-1)
at android.app.ActivityThread$ApplicationThread.scheduleReceiver(ActivityThread.java:696)
at android.app.ApplicationThreadNative.onTransact(ApplicationThreadNative.java:217)
at android.os.Binder.execTransact(Binder.java:453),
,这个就很清晰了,在binder线程调用enqueueMessage,enqueueMessage方法里面有nativewait即可唤醒主线程
总结:也就是说唤醒主线程有两种方式
1.外部进程通过管道唤醒
2.自己的进程通过bind线程唤醒
参考链接:
我读过的最好的epoll讲解:http://blog.51cto.com/yaocoder/888374
大话 Select、Poll、Epoll : https://cloud.tencent.com/developer/article/1005481