前言
当需要发布一个普通事件时,我们一般会这样写:
EventBus.getDefault().post(event);
那么,post发布方法里面到底做了什么呢?本章我们就来一探究竟。
一、主要流程
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
在分析发布流程之前,我们需要先了解一下PostingThreadState这个类。
1.1 PostingThreadState
PostingThreadState是EventBus的一个静态内部类,它的定义如下:
/** For ThreadLocal, much faster to set (and get multiple values). */
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();//事件队列
boolean isPosting;//是否正在发布
boolean isMainThread;//发布的操作是否处于主线程
Subscription subscription;//订阅信息
Object event;//当前事件
boolean canceled;//发布的事件是否被取消了
}
PostingThreadState记录的是发布线程的状态信息,它在EventBus中是作为ThreadLocal变量来使用的,即线程本地变量,这种使用方式可以避免多线程并发问题。
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
eventQueue用于保存当前线程发布的事件(即调用post方法时所在的线程)。由于发布线程与订阅方法执行的线程可能是不同的,因此发布的事件会累积,所以需要eventQueue队列来维护。
1.2 步骤分析
现在回到post主流程,梳理步骤如下:
- 将发布的事件加入到线程本地对象postingState维护的事件队列中
- 如果isPosting的值是false,由于此时需要发布事件,所以将isPosting置位成true,并判断发布线程是否是主线程
- 遍历事件队列,取出每个事件,调用postSingleEvent处理,并将相应事件从队列中移除
- 直到事件队列都处理完毕,此次发布结束,将isPosting和isMainThread置位成false
那么,这里重点就需要关注postSingleEvent方法了
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
该方法的逻辑很清晰,如果事件是允许继承的,则找出事件的所有超类。之后,无论事件是否可继承,都会调用postSingleEventForEventType方法,该方法最终会返回一个布尔值,标识订阅信息是否存在。如果订阅信息没有查找到,则会给出如下log信息,并且会发送NoSubscriberEvent事件。
No subscribers registered for event " + eventClass
代码跟踪到这一步,已经越来越接近终点了,让我们一起来看看postSingleEventForEventType方法的逻辑。
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
postSingleEventForEventType的逻辑是这样的:
- 根据事件类型找出所有订阅了该事件的订阅信息
- 如果订阅信息为空,直接返回false
- 否则,遍历订阅信息集合,调用postToSubscription处理发布事件
看到这里有没有似曾相识的感觉?是的,postToSubscription在第三章的register方法里已经介绍过了,就是根据不同的threadMode,在不同的线程执行订阅方法。忘记的读者请自行移步地第三章查看,这里不再重复讲解了。
到这里,我们其实就清楚了,无论是普通事件还是粘性事件,它们的处理都是由postToSubscription来执行的,这也就是所谓的殊途同归了。
二、结束语
文章最后,我们可以将发布流程简单归纳成如下步骤
- 将待发布的事件加入到线程本地栈的事件队列中
- 遍历事件队列,根据不同线程模式,执行相应的订阅方法