本文将从EventBus 创建 - > 注册--->发送消息--->解除注册 几个方面来读一下EventBus 3.0+ 的源码。
EventBus 简单订阅 (例如,在MainActivity.class ,注册String.class 类型的事件) 代码如下:
//MainActivity.class
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
//....
EventBus.getDefault().register(this);
}
//类中的,订阅方法, 事件类型为String
@Subscribe(threadMode = ThreadMode.MAIN, priority = 100, sticky = false)
public void onStringEvent(String eventString) {
}
上面代码在别的地方发送 事件类型为 String.class
则会调用到 MainActivity#onStringEvent
方法。
一、EventBus 创建
EventBus.getDefault(); // 返回的是EventBus对象
这里的 EventBus.getDefault()
其实是拿到的EventBus对象,其中涉及到了一个 线程安全的单利模式,代码如下(参数解释标注在了注释中):
EventBus.java
static volatile EventBus defaultInstance;
//线程安全的单例模式
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {//加锁
if (defaultInstance == null) {
defaultInstance = new EventBus(); //如果不存在,则去创建
}
}
}
return defaultInstance;
}
如果不存在,就走下面的创建方法:
EventBus.java
//创建过程
EventBus(EventBusBuilder builder) {
//private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
//key:订阅的事件,value:订阅这个事件的所有订阅者集合
subscriptionsByEventType = new HashMap<>();
//private final Map<Object, List<Class<?>>> typesBySubscriber;
//key:订阅者对象,value:这个订阅者订阅的事件集合
typesBySubscriber = new HashMap<>();
//粘性事件 key:粘性事件的class对象, value:事件对象
//private final Map<Class<?>, Object> stickyEvents;
stickyEvents = new ConcurrentHashMap<>();
//ui线程的 Handler
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
//Background 处理
backgroundPoster = new BackgroundPoster(this);
//异步事件处理
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
二、EventBus注册
1,使用(下面的this,就是上面例子中的 MainActivity.class
)
//最常用的EventBus注册
EventBus.getDefault().register(this);
1.1 主要看 EventBus.java ->register()
EventBus.java
/**
* 大概翻译一下,该方法上的注释的意思。
* 1,记得调用 {@link #unregister(Object)} 进行注册解除
* 2,订阅者,需要有一个包含 {@link Subscribe} 注解的方法。
*/
public void register(Object subscriber) {
//1,拿到订阅的class 上面例子中的 `MainActivity.class`
Class<?> subscriberClass = subscriber.getClass();
//2,通过subscriberMethodFinder来找到订阅者订阅了哪些事件.返回一个SubscriberMethod对象的List,SubscriberMethod
//里包含了这个方法的Method对象,以及将来响应订阅是在哪个线程的ThreadMode,以及订阅的事件类型eventType,以及订阅的优
//先级priority,以及是否接收粘性sticky事件的boolean值.
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
//3,从步骤2得到的集合,说明一个类中可能有很多个订阅方法,在下面通过foreach依次 订阅。
for (SubscriberMethod subscriberMethod : subscriberMethods) {
//
subscribe(subscriber, subscriberMethod);
}
}
}
1.2 跟到:SubscriberMethodFinder.java --> findSubscriberMethods()
方法中,可以理解为:寻找到 类中所有带@Subscribe
注解的方法
/**
* METHOD_CACHE 本质是一个 key = class ,value = List<SubscriberMethod> 的线程安全的map集合。
*/
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
//-------------------------------------------------
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//METHOD_CACHE :缓存,先从缓存中获取
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
//如果不为null,说明之前缓存的Map集合中有,直接return,返回
return subscriberMethods;
}
//ignoreGeneratedIndex 默认是 false
if (ignoreGeneratedIndex) {
// 使用反射方式获取(反射继续跟会到这个方法中:findUsingReflectionInSingleClass(findState);,在这个方法中,通过@Subscriber注解,找到相应的方法)
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 使用SubscriberIndex方式获取
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
//在这里也可以看到,如果没有一个 声明为 public + 含有 @Subscribe 的方法,就会抛异常!
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//在这里,将传入的class 作为key,存到 subscriberMethods 中。
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
由于ignoreGeneratedIndex
默认是 false,则进入 else 中分析findUsingInfo()
方法:
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);//初始化
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
-
getSubscriberInfo(findState);
该方法默认为null,因为,在findState.initForSubscriber(subscriberClass)
初始化的时候,findState.subscriberInfo = null
第一个 if 判断不会走。subscriberInfoIndexes
涉及到了 apt ,默认也是 null (可以进入EventBusBuilder # addIndex()
进行验证)
private SubscriberInfo getSubscriberInfo(FindState findState) {
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}
1.3 继续看 findUsingReflectionInSingleClass
, 找到 添加注解方法的核心(关键代码在注解中进行解释)
// SubscriberMethodFinder.java
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// 这里的 clazz 也就是平常我们写的类,如:MainActivity等等
//反射,得到 类 方法的数组
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
//遍历方法,找到包含注解的方法 (上面例子中 `onStringEvent(String eventString)` 方法)
for (Method method : methods) {
int modifiers = method.getModifiers();
//判断修饰符,是不是 public
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
//保证必须只有一个参数,其实就是注解方法中只能有一个事件(上面例子中的 String.class 类型)
if (parameterTypes.length == 1) {
// 拿到注解
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
总结:
上面步骤主要做的是,找到订阅class中的订阅方法 SubscriberMethod
,并保存。
继续分析 EventBus.java ->register() --> subscribe()
订阅方法
//这里的 subscriber(订阅者) 也就是 `MainActivity.class` ,
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//获取方法参数事件的 class ,例如上面例子的:`String.class`
Class<?> eventType = subscriberMethod.eventType;
//创建Subscription对象
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//从subscriptionsByEventType里检查是否已经添加过该Subscription,如果添加过就抛出异常
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
//根据优先级priority来添加Subscription对象,ps:优先级越大越靠前
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//将订阅者对象以及订阅的事件保存到typesBySubscriber里.
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//如果接收sticky事件,立即分发sticky事件
if (subscriberMethod.sticky) {
//eventInheritance 表示是否分发订阅了响应事件类父类事件的方法
if (eventInheritance) {
//
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
:key:代表的是事件类型,如String.class
;value:线程安全的List,保存的是一组订阅该(String.class)事件的订阅。通俗来说就是:String.class
事件可能被 A B C...等等类订阅,这个List就是订阅String.class
所有集合。
完成以上过程就完成了注册过程。黏性事件,在
EventBus # subscribe
方法中,如果sticky = true
,则会直接调用该类中的sticky = true
订阅方法,具体实现在EventBus # checkPostStickyEventToSubscription
方法中。
注意:黏性事件,如果不需要接收,则需要调用EventBus.getDefault().removeStickyEvent()
进行事件移除,否则每次进入该页面,都会收到该事件。
三、EventBus 事件发送过程
EventBus 通过调用 post(普通事件)、postSticky(粘性事件) 来发送事件。
EventBus.java
public void post(Object event) {
//获得当前发送线程相关状态(ThreadLocal保证线程安全)
PostingThreadState postingState = currentPostingThreadState.get();
//将事件添加到队列中
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//一直将队列中发送完
while (!eventQueue.isEmpty()) {
//发送单个事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
注意:这里的 PostingThreadState
是通过 ThreadLocal(一个线程内部的数据存储类,通过它可以在指定的线程中存储数据) 来获取每个线程中的 PostingThreadState
状态。
继续看发送事件的方法:
EventBus.java
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
//是否触发订阅了该事件(eventClass)的父类,以及接口的类的响应方法
if (eventInheritance) { // 这个变量默认为 true
//查找eventClass类所有的父类以及接口
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
//循环postSingleEventForEventType
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//只要右边有一个为true,subscriptionFound就为true
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//post单个
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//如果没发现,订阅者
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
//发送一个NoSubscriberEvent事件,如果我们需要处理这种状态,接收这个事件就可以了
post(new NoSubscriberEvent(this, event));
}
}
}
postSingleEventForEventType
方法:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
//获取订阅了这个事件的Subscription列表.(这里的 eventClass,对应上面的`String.class`)
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
//判断订阅者是否为null
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
//是否被中断
boolean aborted = false;
try {
//分发给订阅者
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
//将事件 post给具体的订阅者,涉及到线程切换
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
invokeSubscriber
方法:
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
从 subscriptionsByEventType
获得订阅这个 事件(event) 的 Subscription
订阅者列表,通过 postToSubscription
方法将事件分发给订阅者,这个方法中涉及到了线程的不同,最后是通过反射调用了 invoke
订阅者的方法
其中 ThreadMode
是一个枚举类,包含四种类型:
-
POSTING
:默认的 ThreadMode,表示在执行 Post 操作的线程直接调用订阅者的事件响应方法,不论该线程是否为主线程(UI 线程)。当该线程为主线程时,响应方法中不能有耗时操作,否则有卡主线程的风险。适用场景:对于是否在主线程执行无要求,但若 Post 线程为主线程,不能耗时的操作; -
MAIN
:在主线程中执行响应方法。如果发布线程就是主线程,则直接调用订阅者的事件响应方法,否则通过主线程的 Handler 发送消息在主线程中处理——调用订阅者的事件响应函数。显然,MainThread类的方法也不能有耗时操作,以避免卡主线程。适用场景:必须在主线程执行的操作; -
BACKGROUND
:在后台线程中执行响应方法。如果发布线程不是主线程,则直接调用订阅者的事件响应函数,否则启动唯一的后台线程去处理。由于后台线程是唯一的,当事件超过一个的时候,它们会被放在队列中依次执行,因此该类响应方法虽然没有PostThread类和MainThread类方法对性能敏感,但最好不要有重度耗时的操作或太频繁的轻度耗时操作,以造成其他操作等待。适用场景:操作轻微耗时且不会过于频繁,即一般的耗时操作都可以放在这里; -
ASYNC
:不论发布线程是否为主线程,都使用一个空闲线程来处理。和BackgroundThread不同的是,Async类的所有线程是相互独立的,因此不会出现卡线程的问题。适用场景:长耗时操作,例如网络访问。
Poster 包含Poster、BackgroundPoster、AsyncPoster
在postToSubscription
方法中也起了重要作用,简单分析一下mainThreadPoster.enqueue(subscription, event);
//EventBus.java
private final Poster mainThreadPoster;
//创建
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
MainThreadSupport
是一个接口,实现类为: AndroidHandlerMainThreadSupport
public interface MainThreadSupport {
boolean isMainThread();
Poster createPoster(EventBus eventBus);
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
private final Looper looper;
//这里传入的是 主线程的Looper
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}
@Override
public Poster createPoster(EventBus eventBus) {
return new HandlerPoster(eventBus, looper, 10); //重点
}
}
}
上述代码重点在 new HandlerPoster(eventBus, looper, 10);
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
HandlerPoster
需要关注的有:
- 该类是通过 Handler 机制来实现。
-
HandlerPoster # enqueue
方法,将事件添加到了队列中,并通过handler发送message ,通知handleMessage
进行处理(此时线程达到了切换)。 -
handleMessage
中,处理消息, 比较关键的代码是eventBus.invokeSubscriber(pendingPost);
//EventBus.java
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event; // 这里的event,对应时间,也就是上面的 String.class
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);//通过反射,调用方法
}
}
四、EventBus 反注册
简单来说,就是把订阅者从 typesBySubscriber
移除,下次再发送消息的时候,就不会通知了。
EventBus.java
/** Unregisters the given subscriber from all event classes. */
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
//查找订阅事件类型
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
//分别类中的每种类型
unsubscribeByEventType(subscriber, eventType);
}
//从 typesBySubscriber 移除
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
unsubscribeByEventType
方法:
/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
//取消订阅
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}