一、前言
上一篇,我们分析了Framework层的代码。从Framework层分析了消息的发送、存储、调度逻辑。但是我们并没有解释清楚,Handler为什么没有阻塞其线程,底层使用的是Epoll多路复用机制。接下来我们从MessageQueue中的JNI入手,分析下Native层的功能。
如上图(图源至袁辉辉博客)所示,Native层也有一套对应的消息逻辑,用于处理Native层的消息分发。Framework中的MessageQueue有几个JNI方法,是连接着FrameWork和Native层的入口:
// 用于初始化Native相关
private native static long nativeInit();
// 用于销毁Native相关
private native static void nativeDestroy(long ptr);
// 用于轮询当前的消息集合
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
// 用于唤醒操作
private native static void nativeWake(long ptr);
// 判断是否正在轮询,即处在epoll_wait过程中
private native static boolean nativeIsPolling(long ptr);
// 对外开放了一个监听文件的输入输出等事件的方法
private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
接下来主要按照以上六个JNI方法来阐述Handler Native层逻辑。
用到的Native层代码:
framework/base/core/java/andorid/os/MessageQueue.java
framework/base/core/jni/android_os_MessageQueue.cpp
framework/base/core/java/andorid/os/Looper.java
system/core/libutils/Looper.cpp
system/core/include/utils/Looper.h
system/core/libutils/RefBase.cpp
framework/base/native/android/looper.cpp
framework/native/include/android/looper.h
Tips:
- 可以采用在线阅读的方式阅读代码:http://androidxref.com
- 可以从GItHub上下载源码: github
二、Epoll机制
因为Handler底层就是采用了Epoll多路复用的IO机制,为了更好地理解Handler,必须先了解下Epoll机制。加上最近也才刚研究Epoll机制,难免有理解的不准确的地方,而且网上的相关资料全且深入,所以这里我就说明下Epoll的用法与优点。
【1】、作用
在Linux中,一切对象皆文件,这也说明了文件的作用。为了的增加效率,而出现了监听文件描述符的机制,如果一个线程去监听一个文件描述符,这就大材小用,对于成千上万个文件描述符,主机处理起来就吃力了。事实上,文件的事件很少才会发生状态改变,所以可以采用一个线程监听多个文件描述符事件,如果文件事件响应了再通知调用者进一步的处理,这样会减少线程阻塞事件,毕竟IO阻塞的事件对于CPU来说还是有点长的。
【2】、用法
Android中头文件 #include<sys/epoll.h>
- epoll_create
int epoll_create(int __size)
用于创建一个fd,来管理所监听的fd句柄。__size是我们所监听的fd集合的初始大小,后期可以扩容。返回的就是一个int形的文件描述符,即管理类。
在Android中文件句柄都在/proc/<pid>/fd/中存储着,而且每个进程的文件句柄是有上限的,所以不用时及时清理,否则只有等该进程死亡时文件句柄才会清空。
- epoll_ctl
int epoll_ctl(int __epoll_fd, int __op, int __fd, struct epoll_event* __event);
struct epoll_event {
uint32_t events;
epoll_data_t data;
}
__epoll_fd:即epoll_create创建的文件句柄
__op : 对集合操作事件(增删改)
__fd :被监听的文件句柄
__event :监听文件哪些事件(可读可写等等)
- int epoll_wait
用于等待监听事件的相应
int epoll_wait(int __epoll_fd, struct epoll_event* __events, int __event_count, int __timeout_ms);
__epoll_fd:epoll_create创建的文件句柄
__events:获取具体的响应事件集合;
__event_count : 响应的数量 ;
__timeout_ms:超时时间。
具体可看:
【3】、优点
Epoll的优点也是相对于Poll、Select多路复用机制而言的。
首先,Epoll是基于回调的方式,而Poll和Select是基于轮询的方式,Epoll会在连接多且闲连接多的情况下,优势比较突出
其次,Epoll的被监听的事件是存放在内核空间,这样只需要Copy一次。而Poll和Select是每次都要将其Copy到内核态。
三、MessageQueue
【1】、nativeInit()
nativeInit()用于初始化底层对象,创建该线程的一个监听文件句柄事件的Pipe;
① : MessageQueue中
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
在Framework层MessageQueue中的构造方法中,对nativeInit进行初始化工作,返回的mPtr就是NativeMessageQueue对象的句柄。保证一个线程对应一个NativeMessageQueue对象,也用于之后的其他操作,因为mPtr也即NativeMessageQueue对象,也便于获取NativeMessageQueue对象。
②:android_os_MessageQueue_nativeInit()
在android_os_MessageQueue.cpp中:
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
// 智能指针RefBase 加一
nativeMessageQueue->incStrong(env);
// reinterpret_cast强制类型转换符 比特位不变
return reinterpret_cast<jlong>(nativeMessageQueue);
}
- 首先初始化一个NativeMessageQueue对象
- 智能指针RefBase 加一
- 返回mPtr,其中reinterpret_cast强制类型转换符,但他们的内存是一致的
③:NativeMessageQueue::NativeMessageQueue()
在android_os_MessageQueue.cpp的构造方法中:
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
// 从ThreadLocal中获取一个Looper对象,该Looper是Native中的
mLooper = Looper::getForThread();
if (mLooper == NULL) {
// 没有则创建一个Looper对象
mLooper = new Looper(false);
// 并将Looper对象和当前线程绑定,并放入到Thread中
Looper::setForThread(mLooper);
}
}
- 从ThreadLocal中获取一个Looper对象,该Looper是Native中的
- 没有则创建一个Looper对象
- 并将Looper对象和当前线程绑定,并放入到Thread中
④:Looper
在Looper.cpp
在NativeMessageQueue()中,第一次会初始化一个Looper对象
71Looper::Looper(bool allowNonCallbacks) :
72 mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
73 mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
74 mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
// 构造唤醒事件的mWakeEventFd
75 mWakeEventFd = eventfd(0, EFD_NONBLOCK);
76 LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd. errno=%d", errno);
77
78 AutoMutex _l(mLock);
// 重建Epoll事件 用于监听mWakeEventFd等状态
79 rebuildEpollLocked();
80}
- 首先会初始化mPolling、mEpollFd等对象。mPolling就是是否在轮询的标志,而mEpollFd监听的文件句柄
- 构造唤醒事件的mWakeEventFd,mEpollFd会监听该对象,从而唤醒当前线程,继续轮询。eventfd(0, EFD_NONBLOCK)构造非阻塞形的Pipe文件
- 重建Epoll事件 用于监听mWakeEventFd等状态
⑤:rebuildEpollLocked()
在Looper.cpp中
rebuildEpollLocked()方法主要就是构造一个Epoll事件,用于监听注册的文件句柄
140void Looper::rebuildEpollLocked() {
141 // Close old epoll instance if we have one.
// 关闭上一次的mEpollFd
142 if (mEpollFd >= 0) {
143#if DEBUG_CALLBACKS
144 ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
145#endif
146 close(mEpollFd);
147 }
148
149 // Allocate the new epoll instance and register the wake pipe.
// 重新的构建一个epoll fd。
150 mEpollFd = epoll_create(EPOLL_SIZE_HINT);
151 LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
152
153 struct epoll_event eventItem;
154 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
155 eventItem.events = EPOLLIN;
156 eventItem.data.fd = mWakeEventFd;
// 将mWakeEventFd添加到mEpollFd中,这样mEpollFd就可以管理唤醒事件了
157 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
158 LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d",
159 errno);
160 // 上层调用nativeSetFileDescriptorEvents时候会mRequests增加
// 他会监听mRequests中fd的events
161 for (size_t i = 0; i < mRequests.size(); i++) {
162 const Request& request = mRequests.valueAt(i);
163 struct epoll_event eventItem;
164 request.initEventItem(&eventItem);
165 // 将request.fd添加到mEpollFd中
166 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
167 if (epollResult < 0) {
168 ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d",
169 request.fd, errno);
170 }
171 }
172}
- 因为是重建,所以关闭上一次的mEpollFd,清空上一次Epoll事件
- 构建Epoll事件,初始化容量EPOLL_SIZE_HINT
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
- 将mWakeEventFd添加到mEpollFd中,这样mEpollFd就可以管理唤醒事件了
// 添加操作
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
- 添加监听nativeSetFileDescriptorEvents层事件(后面会说)
主要代码: request.fd添加到mEpollFd中。
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
【2】、nativePollOnce(long ptr, int timeoutMillis)
nativePollOnce是开启一次轮询。即开始监听是否有事件来了
ptr:NativeMessageQueue对象
timeoutMillis:等待的时间。
-1代表一直有事件响应
0代表立刻回复
timeoutMillis则代表超时时间
①:android_os_MessageQueue_nativePollOnce
在android_os_MessageQueue.cpp中:
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
- 首先将传入的ptr转化成nativeMessageQueue对象。
- 调用Native中nativeMessageQueue->pollOnce(env, obj, timeoutMillis);方法
②:pollOnce()
在android_os_MessageQueue.cpp中。
这里的主要的逻辑就是调用mLooper->pollOnce(timeoutMillis);
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
②:mLooper->pollOnce
在Looper.cpp中。
184int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
185 int result = 0;
186 for (;;) {
187 while (mResponseIndex < mResponses.size()) {
188 const Response& response = mResponses.itemAt(mResponseIndex++);
189 int ident = response.request.ident;
// POLL_CALLBACK = -2,
190 if (ident >= 0) {
191 int fd = response.request.fd;
192 int events = response.events;
193 void* data = response.request.data;
194#if DEBUG_POLL_AND_WAKE
195 ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
196 "fd=%d, events=0x%x, data=%p",
197 this, ident, fd, events, data);
198#endif
199 if (outFd != NULL) *outFd = fd;
200 if (outEvents != NULL) *outEvents = events;
201 if (outData != NULL) *outData = data;
202 return ident;
203 }
204 }
205
206 if (result != 0) {
207#if DEBUG_POLL_AND_WAKE
208 ALOGD("%p ~ pollOnce - returning result %d", this, result);
209#endif
210 if (outFd != NULL) *outFd = 0;
211 if (outEvents != NULL) *outEvents = 0;
212 if (outData != NULL) *outData = NULL;
213 return result;
214 }
215
216 result = pollInner(timeoutMillis);
217 }
218}
轮询逻辑都在Looper::pollOnce中。
- 首先处理MResponses队列,java传入的监听文件句柄ident = POLL_CALLBACK 即-2,所以这里面处理的显示Native中的消息
- result = pollInner(timeoutMillis);在这里处理的是我们的Framework层的Message事件。
③:pollInner(timeoutMillis)
在Looper.cpp中。
220int Looper::pollInner(int timeoutMillis) {
...........
238
239 // Poll.
240 int result = POLL_WAKE;
241 mResponses.clear();
242 mResponseIndex = 0;
243
244 // We are about to idle.
// 当前是否正在轮询中
245 mPolling = true;
246
247 struct epoll_event eventItems[EPOLL_MAX_EVENTS];
248 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
249
250 // No longer idling.
251 mPolling = false;
252
253 // Acquire lock.
254 mLock.lock();
255
256 // Rebuild epoll set if needed.
257 if (mEpollRebuildRequired) {
258 mEpollRebuildRequired = false;
259 rebuildEpollLocked();
260 goto Done;
261 }
262
263 // Check for poll error.
// 轮询错误
264 if (eventCount < 0) {
265 if (errno == EINTR) {
266 goto Done;
267 }
268 ALOGW("Poll failed with an unexpected error, errno=%d", errno);
269 result = POLL_ERROR;
270 goto Done;
271 }
272
273 // Check for poll timeout.
// 轮询timeout
274 if (eventCount == 0) {
275#if DEBUG_POLL_AND_WAKE
276 ALOGD("%p ~ pollOnce - timeout", this);
277#endif
278 result = POLL_TIMEOUT;
279 goto Done;
280 }
281
282 // Handle all events.
283#if DEBUG_POLL_AND_WAKE
284 ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
285#endif
286
287 for (int i = 0; i < eventCount; i++) {
288 int fd = eventItems[i].data.fd;
289 uint32_t epollEvents = eventItems[i].events;
// 区分事件 是否是唤醒事件
290 if (fd == mWakeEventFd) {
291 if (epollEvents & EPOLLIN) {
// 处理唤醒操作
292 awoken();
293 } else {
294 ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
295 }
296 } else { // 其它事件 将request放入到 responses中
297 ssize_t requestIndex = mRequests.indexOfKey(fd);
298 if (requestIndex >= 0) {
299 int events = 0;
300 if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
301 if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
302 if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
303 if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
304 pushResponse(events, mRequests.valueAt(requestIndex));
305 } else {
306 ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
307 "no longer registered.", epollEvents, fd);
308 }
309 }
310 }
Done: ;
312
313 // Invoke pending message callbacks.
314 mNextMessageUptime = LLONG_MAX;
// 处理Native中Message
315 while (mMessageEnvelopes.size() != 0) {
316 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
// mMessageEnvelopes 信封集合(持有uptime/Message/Handler)
317 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
318 if (messageEnvelope.uptime <= now) {
319 // Remove the envelope from the list.
320 // We keep a strong reference to the handler until the call to handleMessage
321 // finishes. Then we drop it so that the handler can be deleted *before*
322 // we reacquire our lock.
323 { // obtain handler
324 sp<MessageHandler> handler = messageEnvelope.handler;
325 Message message = messageEnvelope.message;
326 mMessageEnvelopes.removeAt(0);
327 mSendingMessage = true;
328 mLock.unlock();
329
330#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
331 ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
332 this, handler.get(), message.what);
333#endif
// 处理消息
334 handler->handleMessage(message);
335 } // release handler
336
337 mLock.lock();
338 mSendingMessage = false;
//POLL_CALLBACK = -2,
339 result = POLL_CALLBACK;
340 } else {
341 // The last message left at the head of the queue determines the next wakeup time.
342 mNextMessageUptime = messageEnvelope.uptime;
343 break;
344 }
345 }
346
347 // Release lock.
348 mLock.unlock();
349
350 // Invoke all response callbacks.
351 for (size_t i = 0; i < mResponses.size(); i++) {
352 Response& response = mResponses.editItemAt(i);
// 处理POLL_CALLBACK 事件
353 if (response.request.ident == POLL_CALLBACK) {
354 int fd = response.request.fd;
355 int events = response.events;
356 void* data = response.request.data;
357#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
358 ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
359 this, response.request.callback.get(), fd, events, data);
360#endif
361 // Invoke the callback. Note that the file descriptor may be closed by
362 // the callback (and potentially even reused) before the function returns so
363 // we need to be a little careful when removing the file descriptor afterwards.
364 int callbackResult = response.request.callback->handleEvent(fd, events, data);
365 if (callbackResult == 0) {
366 removeFd(fd, response.request.seq);
367 }
368
369 // Clear the callback reference in the response structure promptly because we
370 // will not clear the response vector itself until the next poll.
371 response.request.callback.clear();
372 result = POLL_CALLBACK;
373 }
374 }
375 return result;
376}
- epoll_wait,首先开始轮询事件。传入的时间也是调用者传入的。
- 需要rebuildEpollLocked、错误、以及等待事件超时,都将进入到DONE中
接下来是处理触发的事件
287 for (int i = 0; i < eventCount; i++) {
288 int fd = eventItems[i].data.fd;
289 uint32_t epollEvents = eventItems[i].events;
// 区分事件 是否是唤醒事件
290 if (fd == mWakeEventFd) {
291 if (epollEvents & EPOLLIN) {
// 处理唤醒操作
292 awoken();
293 } else {
294 ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
295 }
296 } else { // 其它事件 将request放入到 responses中
297 ssize_t requestIndex = mRequests.indexOfKey(fd);
298 if (requestIndex >= 0) {
299 int events = 0;
300 if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
301 if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
302 if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
303 if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
304 pushResponse(events, mRequests.valueAt(requestIndex));
305 } else {
306 ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
307 "no longer registered.", epollEvents, fd);
308 }
309 }
310 }
- 如果fd == mWakeEventFd,处理的是唤醒管道句柄。以及是EPOLLIN事件,那么处理awoken();唤起工作。这里面就是将mWakeEventFd文件里面的数据清空。
- 如果是mRequests中的事件,那边放入到mResponses队列中,等到后面再做处理,这里面涉及到了处理的优先级
接下来会走到Done逻辑中。
- 在Done中开始逻辑中(代码315行),这里面处理的是Native层Message。前提是messageEnvelope.uptime <= now,即事件到时间了。
- Native中的Message封装并存在mMessageEnvelopes中,最终调用handler->handleMessage(message);方法
接下来会走到对mResponses处理中,即最开始存入的。
- 这里面只会处理response.request.ident == POLL_CALLBACK,也就是调用nativeSetFileDescriptorEvents而注册的事件。这个优先级是最低的。
- 具体的调用在nativeSetFileDescriptorEvents节中介绍。
小结:
Framework层loop()的时候,会发生wait,也就是:
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
分别在以下的情况下才会被唤醒:
- timeoutMillis超时,自动唤醒
- 管道有消息传入,即可以EPOLLIN操作的时候
- 发生错误
【3】、nativeWake(long ptr)
nativeWake是用来唤醒阻塞这的Epoll事件的。比如当有一个插入了一个急需处理的Message,就会触发该方法。
①:MessageQueue#enqueueMessage()
在MessageQueue.java中
boolean enqueueMessage(Message msg, long when) {
..........
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
- 在插入一个消息的时候,会触发对nativeWake方法调用的检查,如果needWake等于true的时候,就会唤醒阻塞的Epoll。
②:android_os_MessageQueue_nativeWake()
在android_os_MessageQueue.cpp中
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
- JNI中就是调用nativeMessageQueue->wake();
③:nativeMessageQueue->wake()
在android_os_MessageQueue.cpp中
void NativeMessageQueue::wake() {
mLooper->wake();
}
- NativeMessageQueue中也只是一个代理,最终还是调用Looper中的代码
④:mLooper->wake()
在Looper.cpp中
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
唤醒的操作就是向Pipe文件中写入数据,这样pollOnce的时候就可以监听到,从而达到唤醒操作,上面提到监听到唤醒时,执行awoken(),清空Pipe文件。所以唤醒操作就是简单的写入数据TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
TEMP_FAILURE_RETRY() 就是一个知道写成功才会退出的宏。
小结:
可以看出来,唤醒工作就是向初始化的时候创建的Pipe文件写入一个uint64_t数据。监听Pipe的Epoll就会被唤醒。
【4】、nativeDestroy(long ptr)
Destroy见名知意,就是销毁的时候,做些清理工作。
①:dispose()
在MessageQueue.java中
private void dispose() {
if (mPtr != 0) {
nativeDestroy(mPtr);
mPtr = 0;
}
}
- 如果mPtr 不等于 0,就触发nativeDestroy,之后再将mPtr置0,防止多次调用出错。
②:dispose()
NativeMessageQueue.cpp中
static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->decStrong(env);
}
- 直接调用nativeMessageQueue->decStrong(env);而nativeMessageQueue继承自RefBase,decStrong调用的是RefBase->decStrong()
③:decStrong(env)
RefBase.cpp中
void RefBase::decStrong(const void* id) const
{
weakref_impl* const refs = mRefs;
// 移除强引用
refs->removeStrongRef(id);
const int32_t c = android_atomic_dec(&refs->mStrong);
if (c == 1) {
refs->mBase->onLastStrongRef(id);
if ((refs->mFlags&OBJECT_LIFETIME_MASK) == OBJECT_LIFETIME_STRONG) {
delete this;
}
}
// 移除弱引用
refs->decWeak(id);
}
- RefBase中,也就是清除掉引用
【5】、nativeIsPolling(long ptr)
nativeIsPolling是用来判断当前Epoll是否在阻塞中。
同理,最终调用到Looper.cpp中的isPolling()方法;直接返回mPolling变量。
bool Looper::isPolling() const {
return mPolling;
}
在Looper的pollOnce中,epoll_wait轮询结束后就赋值为false。
// We are about to idle.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
【6】、nativeSetFileDescriptorEvents(long ptr, int fd, int events);
由之前的介绍,Handler底层就是使用Epoll机制,来监听fd文件句柄,之前注册的事件(读或者写等等)。
所以Handler提供一个入口,用来监听传入的文件句柄。
①:MessageQueue中
public void addOnFileDescriptorEventListener(@NonNull FileDescriptor fd,
@OnFileDescriptorEventListener.Events int events,
@NonNull OnFileDescriptorEventListener listener) {
if (fd == null) {
throw new IllegalArgumentException("fd must not be null");
}
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
synchronized (this) {
updateOnFileDescriptorEventListenerLocked(fd, events, listener);
}
}
- fd : 被监听的文件句柄
- events :触发的事件
/** @hide */
@Retention(RetentionPolicy.SOURCE)
@IntDef(flag=true, value={EVENT_INPUT, EVENT_OUTPUT, EVENT_ERROR})
public @interface Events {}
主要是EVENT_INPUT和EVENT_OUTPUT事件。
- OnFileDescriptorEventListener : 事件响应后的回调
public interface OnFileDescriptorEventListener {
@Events int onFileDescriptorEvents(@NonNull FileDescriptor fd, @Events int events);
}
当事件被响应后,就会触发该回调,那我们就可以根据事件来做对应的处理,从而避免IO阻塞。
private void updateOnFileDescriptorEventListenerLocked(FileDescriptor fd, int events,
OnFileDescriptorEventListener listener) {
final int fdNum = fd.getInt$();
int index = -1;
FileDescriptorRecord record = null;
if (mFileDescriptorRecords != null) {
index = mFileDescriptorRecords.indexOfKey(fdNum);
if (index >= 0) {
record = mFileDescriptorRecords.valueAt(index);
if (record != null && record.mEvents == events) {
return;
}
}
}
if (events != 0) {
events |= OnFileDescriptorEventListener.EVENT_ERROR;
if (record == null) {
if (mFileDescriptorRecords == null) {
mFileDescriptorRecords = new SparseArray<FileDescriptorRecord>();
}
record = new FileDescriptorRecord(fd, events, listener);
mFileDescriptorRecords.put(fdNum, record);
} else {
record.mListener = listener;
record.mEvents = events;
record.mSeq += 1;
}
nativeSetFileDescriptorEvents(mPtr, fdNum, events);
} else if (record != null) {
record.mEvents = 0;
mFileDescriptorRecords.removeAt(index);
}
}
- updateOnFileDescriptorEventListenerLocked中主要做些存储工作,将fd、event与回调封装在FileDescriptorRecord并保存在MessageQueue中,当事件触发的时候就可以执行回调了。最终会执行nativeSetFileDescriptorEvents方法。
②:NativeMessageQueue.cpp中
JNI层会调用Native中的setFileDescriptorEvents方法。
void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) {
if (events) {
int looperEvents = 0;
if (events & CALLBACK_EVENT_INPUT) {
looperEvents |= Looper::EVENT_INPUT;
}
if (events & CALLBACK_EVENT_OUTPUT) {
looperEvents |= Looper::EVENT_OUTPUT;
}
mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this,
reinterpret_cast<void*>(events));
} else {
mLooper->removeFd(fd);
}
}
- 如果events大于0,就会注册,反之就是解注册
- 将传入的events转化成Native中的looperEvents ,并触发 mLooper->addFd
③:Looper.cpp中
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
events, callback.get(), data);
#endif
if (!callback.get()) {
if (! mAllowNonCallbacks) {
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
}
if (ident < 0) {
ALOGE("Invalid attempt to set NULL callback with ident < 0.");
return -1;
}
} else {
ident = POLL_CALLBACK;
}
{ // acquire lock
AutoMutex _l(mLock);
Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.seq = mNextRequestSeq++;
request.callback = callback;
request.data = data;
if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1
struct epoll_event eventItem;
request.initEventItem(&eventItem);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
if (epollResult < 0) {
if (errno == ENOENT) {
// Tolerate ENOENT because it means that an older file descriptor was
// closed before its callback was unregistered and meanwhile a new
// file descriptor with the same number has been created and is now
// being registered for the first time. This error may occur naturally
// when a callback has the side-effect of closing the file descriptor
// before returning and unregistering itself. Callback sequence number
// checks further ensure that the race is benign.
//
// Unfortunately due to kernel limitations we need to rebuild the epoll
// set from scratch because it may contain an old file handle that we are
// now unable to remove since its file descriptor is no longer valid.
// No such problem would have occurred if we were using the poll system
// call instead, but that approach carries others disadvantages.
#if DEBUG_CALLBACKS
ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor "
"being recycled, falling back on EPOLL_CTL_ADD, errno=%d",
this, errno);
#endif
epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error modifying or adding epoll events for fd %d, errno=%d",
fd, errno);
return -1;
}
scheduleEpollRebuildLocked();
} else {
ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}
- 因为没有callback所以 ident = POLL_CALLBACK;即 ident = -2
- 将必要的参数存放入Request ,如果mRequests之前没有监听该文件句柄,那就添加。
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.add(fd, request);
- 如果该文件句柄注册过,那就修改EPOLL_CTL_MOD,以及检测其它错误。
④:Event事件响应
在之前的pollOnce代码中
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
..........
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
// Invoke the callback. Note that the file descriptor may be closed by
// the callback (and potentially even reused) before the function returns so
// we need to be a little careful when removing the file descriptor afterwards.
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
}
- response.request.ident == POLL_CALLBACK,这个就是之前注册过的事件
- callback就是NativeMessageQueue对象,也就是执行其中handleEvent的方法。
- 如果返回为0,那就解绑。Framework层OnFileDescriptorEventListener的回调,如果返回0,那之后就不监听该事件了。
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
接下来会带NativeMessageQueue中:
int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) {
........
int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj,
gMessageQueueClassInfo.dispatchEvents, fd, events);
if (!newWatchedEvents) {
return 0; // unregister the fd
}
if (newWatchedEvents != oldWatchedEvents) {
setFileDescriptorEvents(fd, newWatchedEvents);
}
return 1;
}
- mPollEnv->CallIntMethod就会Native层调用Java层代码。gMessageQueueClassInfo.dispatchEvents:
int register_android_os_NativeHandle(JNIEnv *env) {
jclass clazz = FindClassOrDie(env, CLASS_PATH);
gNativeHandleFields.clazz = MakeGlobalRefOrDie(env, clazz);
gNativeHandleFields.constructID = GetMethodIDOrDie(env, clazz, "<init>", "([I[IZ)V");
gNativeHandleFields.getFdsID = GetMethodIDOrDie(env, clazz, "getFdsAsIntArray", "()[I");
gNativeHandleFields.getIntsID = GetMethodIDOrDie(env, clazz, "getInts", "()[I");
return 0;
}
这样我们又回到了MessageQueue中了。
// Called from native code.
private int dispatchEvents(int fd, int events) {
// Get the file descriptor record and any state that might change.
final FileDescriptorRecord record;
final int oldWatchedEvents;
final OnFileDescriptorEventListener listener;
final int seq;
synchronized (this) {
record = mFileDescriptorRecords.get(fd);
if (record == null) {
return 0; // spurious, no listener registered
}
oldWatchedEvents = record.mEvents;
events &= oldWatchedEvents; // filter events based on current watched set
if (events == 0) {
return oldWatchedEvents; // spurious, watched events changed
}
listener = record.mListener;
seq = record.mSeq;
}
// Invoke the listener outside of the lock.
int newWatchedEvents = listener.onFileDescriptorEvents(
record.mDescriptor, events);
if (newWatchedEvents != 0) {
newWatchedEvents |= OnFileDescriptorEventListener.EVENT_ERROR;
}
// Update the file descriptor record if the listener changed the set of
// events to watch and the listener itself hasn't been updated since.
if (newWatchedEvents != oldWatchedEvents) {
synchronized (this) {
int index = mFileDescriptorRecords.indexOfKey(fd);
if (index >= 0 && mFileDescriptorRecords.valueAt(index) == record
&& record.mSeq == seq) {
record.mEvents = newWatchedEvents;
if (newWatchedEvents == 0) {
mFileDescriptorRecords.removeAt(index);
}
}
}
}
// Return the new set of events to watch for native code to take care of.
return newWatchedEvents;
}
- dispatchEvents方法中,从mFileDescriptorRecords根据fd查找,回调方法,最终listener.onFileDescriptorEvents(record.mDescriptor, events);返回值为0代表取消监听,大于0就是重新注册的事件。