前言
EventBus是一个发布/订阅框架。用两个字来概括它,解耦。它简化了组件间事件传递,也可用于线程间事件传递。
创建
我们先从向eventbus注册成为订阅者这行代码开始,点进getDefault(),
EventBus.getDefault().register(this);
这里我们看到了double check的单例模式,这意味着我们在任意地方调用EventBus.getDefault()得到的都是同一个实例。
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
再来看看EventBus的构造方法,可以看到构造方法是public的,再看看第一个参数,
EventBusBuilder ,在这里可以推测出我们可以通过构建者模式来构造不同的EventBus对象。而不同的EventBus对象之间是相互独立,相互隔离的。你在A对象中发布事件,向B对象注册成为订阅者是不可能收到事件的。
public EventBus() {
this(DEFAULT_BUILDER);
}
EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
//代码省略
}
我们来解释一下几个重要的成员变量
subscriptionsByEventType:从命名来看,以EventType为key,subscriptions为
value。subscription是一个包装类,包装了订阅者和订阅方法。注意CopyOnWriteArrayList是一个线程安全的容器。
typesBySubscriber:从命名来看,以Subscriber为key,EventType为value。
stickyEvents :存储了所有的粘性事件。
这里实例化了三个 poster ,用于线程调度,分别是mainThreadPoster、 backgroundPoster、asyncPoster等。
注册
下面我们来分析注册的过程
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
//找到所有的订阅者方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
// 遍历订阅者方法,订阅者逐一订阅方法
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
这里我们可以看到通过subscriberMethodFinde这个对象找到该类中所有的订阅方法,然后放入一个集合中,之后遍历集合,订阅者订阅每一个找到的方法.先来看看
SubscriberMethodFinder#findSubscriberMethods方法。
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//判断缓存中有无subscriberMethods,有则直接返回
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
//是否忽略注解器生成的类
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//从注解器生成的类中读取订阅类的订阅方法信息
//1. findUsingInfo
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//将找到的subscriberMethods放入缓存中
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
从这里直接跟进SubscriberMethodFinder#finduserinfo
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
//FindState保存了订阅者类的信息
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//2、到了这里
findUsingReflectionInSingleClass(findState);
}
//移动到父类继续查找
findState.moveToSuperclass();
}
//3. getMethodsAndRelease(findState)
return getMethodsAndRelease(findState);
}
findstate 保存了 订阅者类的信息,我们来看看Findstate类
static class FindState {
//所有订阅方法
final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
//以event为key,method为value
final Map<Class, Object> anyMethodByEventType = new HashMap<>();
//以method的名字生成一个methodkey,以订阅者类为value
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
final StringBuilder methodKeyBuilder = new StringBuilder(128);
}
下面跟进SubscriberMethodFinder#findUsingReflectionInSingleClass(findstate)
private void findUsingReflectionInSingleClass(FindState findState) {
//省略代码
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
//参数是否为1
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
//是否包含Subscribe注解
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
//3.checkadd方法
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
//把SubscriberMethod放入findsate的subscriberMethods成员变量中
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
注释里3.调用了SubscriberMethodFinder#checkadd方法,我们跟进来分析看看
boolean checkAdd(Method method, Class<?> eventType) {
// 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
// Usually a subscriber doesn't have methods listening to the same event type.
//如果之前没有相同的EventType,existing 为null,则直接返回true
Object existing = anyMethodByEventType.put(eventType, method);
if (existing == null) {
return true;
} else {
if (existing instanceof Method) {
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
// Paranoia check
throw new IllegalStateException();
}
// Put any non-Method object to "consume" the existing Method
anyMethodByEventType.put(eventType, this);
}
return checkAddWithMethodSignature(method, eventType);
}
}
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());
//method由方法名和eventtype共同决定
String methodKey = methodKeyBuilder.toString();
Class<?> methodClass = method.getDeclaringClass();
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
// Only add if not already found in a sub class
return true;
} else {
// Revert the put, old class is further down the class hierarchy
//4.methodClassOld不为空
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}
这里我们看到注释4,methodClassOld不为空,说明集合中已经存在methodKey相同,即方法名和eventtype都相同的订阅方法。就把旧的订阅方法放入集合中。这说明如果子类重写了父类的订阅方法,则父类的订阅方法无效。
我们在回到上面,在findUsingInfo方法的最后一行调用了SubscriberMethodFinder# getMethodsAndRelease方法
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
//把findState.subscriberMethods赋值给subscriberMethods ,并回收findState,最后返回subscriberMethods
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
findState.recycle();
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}
subscribe
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
//将订阅者,订阅方法封装进Subscription
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
//将eventType, subscriptions保存进 subscriptionsByEventType集合中
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
//将优先级高的方法放在前面
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
//将subscriber, subscribedEvents保存进 typesBySubscriber方法中
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//省略代码
}
unregister
public synchronized void unregister(Object subscriber) {
//通过subscriber找到所有的订阅事件
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
//遍历
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
//在subscriptionsByEventType中,找到所有的subscriptions
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
//如果当前的subscriber与subscription中的subscriber相等,则移除集合中的元素
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
发送事件
EventBus.getDefault().post(new Message(message));
先来看EventBus#post方法,
public void post(Object event) {
//1. PostingThreadState
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
//2. postSingleEvent
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
可以看到PostingThreadState 封装了当前信息,subscription,event等
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
而currentPostingThreadState又是什么呢?
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
可以看到currentPostingThreadState是ThreadLocal类型的。ThreadLocal是一个线程内部的数据存储类,通过它在指定的线程中存储数据,只能在指定线程中获取。
下面看 EventBus#postSingleEvent
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
//是否要考虑事件的父类
if (eventInheritance) {
//查找事件的所有父类
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//调用postSingleEventForEventType
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//在subscriptionsByEventType集合中找到所有的subscriptions
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//调用postToSubscription
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//根据threadMode来判断
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
//当前是否是主线程,
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
//如果发布事件是在主线程,就用 backgroundPoster切换到后台
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
//不管当前发布事件在哪个进程,都用asyncPoster新建一个线程来处理
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
这里来讲解一下这三个Poster.
- handlerPoster继承了Handler,获取主线程的Looper。所以handleMessage方法在主线程中执行。在handleMessage方法中,开启了循环从队列中获取数据,调用 invokeSubscriber方法分发事件
2.BackGroundPoster实现了Runnable接口。从队列中获取数据进行分发,直到取完为止。
3.AsycPoster实现了Runnable接口。与BackGroundPoster不一样的是它只获取队列中的一个数据,然后进行分发。