拆轮子系列——EventBus 源码解析

项目地址:EventBus,本文分析版本: 3.1.1

一、概述

EventBus 是一个 Android 事件发布/订阅框架,通过解耦发布者和订阅者简化 Android 事件传递,这里的事件可以理解为消息,本文中统一称为事件。事件传递既可用于 Android 四大组件间通讯,也可以用户异步线程和主线程间通讯等等。
传统的事件传递方式包括:Handler、BroadCastReceiver、Interface 回调,相比之下 EventBus 的优点是代码简洁,使用简单,并将事件发布和订阅充分解耦。

  • 事件(Event):又可称为消息,本文中统一用事件表示。其实就是一个对象,可以是网络请求返回的字符串,也可以是某个开关状态等等。事件类型(EventType)指事件所属的 Class。
    • 事件分为一般事件和 Sticky 事件,相对于一般事件,Sticky 事件不同之处在于,当事件发布后,再有订阅者开始订阅该类型事件,依然能收到该类型事件最近一个 Sticky 事件(所谓「最近一个」指的就是该类型事件「最后一次发出」)。
  • 订阅者(Subscriber):订阅某种事件类型的对象。当有发布者发布这类事件后,EventBus 会执行订阅者的 onEvent 函数,这个函数叫事件响应函数。订阅者通过 register 接口订阅某个事件类型,unregister 接口退订。订阅者存在优先级,优先级高的订阅者可以取消事件继续向优先级低的订阅者分发,默认所有订阅者优先级都为 0。
  • 发布者(Publisher):发布某事件的对象,通过 post 接口发布事件。

二、如何使用

2.1 添加依赖

方式一,运行期处理注解

在app 的 build.gradle 文件中添加依赖

dependencies {
    implementation 'org.greenrobot:eventbus:3.1.1'
}

方式二,编译期预处理注解

Android Studio 3.0 及以上

在 app 的 build.gradle 文件中添加

android {
    //……
    defaultConfig {
        //……
        javaCompileOptions {
            annotationProcessorOptions {
                arguments = [eventBusIndex: 'org.greenrobot.eventbusperf.MyEventBusIndex']
            }
        }
    }
}

dependencies {
    implementation 'org.greenrobot:eventbus:3.1.1'
    annotationProcessor  'org.greenrobot:eventbus-annotation-processor:3.0.1'
}

build 之后,会生成一个 MyEventBusIndex.java类。

然后在使用 EventBus 实例之前,又有两种方式可以将配置MyEventBusIndex.java配置到类中是哟经。

方式一 在构造 EventBus 时传入我们自定义的 EventBusIndex,
EventBus mEventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();
方式二 将索引应用到默认的单例中

使用 EventBus 之前,先调用下面的代码初始化 EventBus。

EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();

2.2 定义事件类

public class CustomEvent {
    private String mEventName;

    public CustomEvent() {
    }

    public String getEventName() {
        return mEventName;
    }

    public void setEventName(String eventName) {
        mEventName = eventName;
    }
}

2.3 注册为监听者

在合适的地方(比如 Activity#onCreate、Fragment#onCreateView)通过下方代码进行注册

EventBus.getDefault().register(this);

2.4 编写响应事件的订阅方法

@Subscribe(threadMode = ThreadMode.BACKGROUND, sticky = true, priority = 100)
public void onMessage(CustomEvent event) {
    Log.d(TAG, "onMessage: " + event.getEventName());
}

使用编译期注解处理的情况下,订阅方法的访问控制权限必须是 非 private 并且非 static 的

使用运行期反射处理的情况下,订阅方法的访问控制权限必须是 public 的

  • 通过 ThreadMode 可以指定订阅方法在哪个线程执行,有四种选择
    • ThreadMode.MAIN 事件订阅方法会在 UI 线程中执行
      • 使用此模式的事件订阅方法必须快速返回以避免阻塞主线程。
    • ThreadMode.POSTING (默认的模式)表示事件在哪个线程中发布出来的,事件订阅方法就会在这个线程中运行;
      • 该模式避免了线程切换,适用于那些在很短的时间内完成的简单任务,无需主线程。使用这种模式的事件订阅方法必须快速返回以避免阻塞发布线程(发布线程可能是主线程)。
    • ThreadMode.MAIN_ORDERED
      • 在 Android 上,订阅方法将在 Android 的主线程中被调用。事件将会排队等待交付,这确保了 post 调用是非阻塞的。
    • ThreadMode.BACKGROUND 子线程执行,如果本来就在子线程,直接在该子线程执行
      • EventBus 使用一个后台线程,将按顺序发送所有事件。使用这种模式的订阅方法应该尽快返回以避免阻塞后台线程。
        • 注意:「一个后台线程」所指的并不是 Executors.newSingleThreadPool(),而是使用 EventBus 在实例化时创建的 cacheThreadPool 中的某一个线程。
    • ThreadMode.ASYNC 新建子线程执行。适用于耗时操作
      • 发布事件永远不会等待使用此模式的订阅方法。适用于比较耗时的订阅方法,比如用于网络请求。使用时应该避免同时触发大量长时间运行的异步订阅方法来限制并发线程的数量。 EventBus 使用线程池有效地重用已完成的异步用户通知中的线程。
  • 通过 sticky 指定是否接收粘性事件,默认为 false
  • 通过 priority 设置接收订阅方法的优先级,相同的事件,优先级越高的订阅方法 越早收到事件

2.5 发送事件

通过EventBuspost()方法来发送事件, 发送之后就会执行注册过这个事件的对应类的方法. 或者通过postSticky()来发送一个粘性事件。

2.6 解除注册

在合适的地方(比如 Activity#onDestroy)使用下面的代码进行解除注册 EventBus.getDefault().unregister(this);

2.7 小结

要实现订阅,需要进行注册,以及解注册,订阅方法以「目标事件」作为方法的参数, 使用 Subscribe 注解,可以指定订阅方法执行的线程、是否接收 sticky 事件、订阅方法的优先级。

至于发送方,只需要创建相应的 事件实例,然后调用 post 或者 postSticky 将事件发送出去即可。

三、实现

3.1 初始化 EventBus

开发者通常是调用 EventBus#getDefault 方法获取 EventBus 实例。

public static EventBus getDefault() {
    if (defaultInstance == null) {
        synchronized (EventBus.class) {
            if (defaultInstance == null) {
                defaultInstance = new EventBus();
            }
        }
    }
    return defaultInstance;
}

getDefault 通过双重校验锁的方式来实现单例

构造方法

private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();

public EventBus() {
    this(DEFAULT_BUILDER);
}

通过 getDefault 获取的 EventBus 对象是通过默认的 EventBusBuilder 构造而成的。

private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();//默认为 CachedThreadPool,不限制线程数

boolean logSubscriberExceptions = true;
boolean logNoSubscriberMessages = true;
boolean sendSubscriberExceptionEvent = true;
boolean sendNoSubscriberEvent = true;
boolean throwSubscriberException;

boolean strictMethodVerification;//
ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;//默认的线程池
List<Class<?>> skipMethodVerificationForClasses;
List<SubscriberInfoIndex> subscriberInfoIndexes;
MainThreadSupport mainThreadSupport;

EventBus(EventBusBuilder builder) {
    //……
    subscriptionsByEventType = new HashMap<>();
    typesBySubscriber = new HashMap<>();
    stickyEvents = new ConcurrentHashMap<>();
    //……
    eventInheritance = builder.eventInheritance;
    executorService = builder.executorService;
}

主要看以下几个单例的实现。

boolean eventInheritance = true;//是否允许事件继承
boolean ignoreGeneratedIndex;//是否忽略 生成的 index,默认为 false,也就是会先尝试寻找编译期注解生成的订阅方法信息,找不到再使用反射去获取。

private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
private final Map<Object, List<Class<?>>> typesBySubscriber;
private final Map<Class<?>, Object> stickyEvents;

private final HandlerPoster mainThreadPoster;
private final BackgroundPoster backgroundPoster;
private final AsyncPoster asyncPoster;

  • subscriptionsByEventType ,key 是事件类型,value 为 订阅了该事件的方法列表
  • typesBySubscriber,key 为订阅者,value 某个订阅者订阅的事件列表
  • stickyEvents,key 为事件类型,value 为具体的事件实例
  • mainThreadPoster 主线程分发
  • backgroundPoster 后台线程分发
  • asyncPoster 异步线程分发

3.2 注册订阅

org.greenrobot.eventbus.EventBus#register

public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();//获取订阅者的 class 对象
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);//查找订阅者中所有的订阅方法
    synchronized (this) {
        //迭代遍历订阅者中所有的订阅方法
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}

EventBus#findSubscriberMethods

找出给定 class 中所有的订阅方法

private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap();//以 class 为 key,方法列表为 value 的,Map 作为缓存

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    List subscriberMethods = (List)METHOD_CACHE.get(subscriberClass);//缓存中获取
    if(subscriberMethods != null) {
        return subscriberMethods;//缓存命中,直接返回
    } else {
        if(this.ignoreGeneratedIndex) {//忽略编译期生成的 订阅方法信息
            subscriberMethods = this.findUsingReflection(subscriberClass);//通过反射获取订阅方法信息
        } else {
            //获取编译期生成的 订阅方法信息
            subscriberMethods = this.findUsingInfo(subscriberClass);//
        }

        if(subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);//添加到缓存中
            return subscriberMethods;//返回
        }
    }
}

EventBus#subscribe()

//必须从同步块中调用
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;//事件类型
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);//新建一个 Subscription,存储订阅的对象以及 响应的方法
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);//Map<Class<?>, CopyOnWriteArrayList<Subscription>> 从 map 中获取相应订阅类型的 列表
    if (subscriptions == null) {//如果没有则新建一个
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);//没有则抛出异常。
        }
    }

    int size = subscriptions.size();
    //遍历监听列表,将新的 subscription 插入到正确位置。列表按照优先级递减的顺序排序
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);
    //如果触发的方法 要接收「粘性事件」,获取相应类型的 Event 并触发相应的方法
    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventType have to be considered.
            // Note: Iterating over all events may be inefficient with lots of sticky events,
            // thus data structure should be changed to allow a more efficient lookup
            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}

该方法会将相应的事件插入对应事件的列表中。如果在方法注解中声明了 sticky,还会马上调用该方法。

检测 stick 事件,如果相应的事件定义有子类的话,会遍历事件的事件子类逐一通知该方法。

3.2.1 通过反射处理注解

SubscriberMethodFinder#findUsingReflection

private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
        findUsingReflectionInSingleClass(findState);
        findState.moveToSuperclass();
    }
    return getMethodsAndRelease(findState);
}

SubscriberMethodFinder#findUsingReflectionInSingleClass

private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // This is faster than getMethods, especially when subscribers are fat classes like Activities
            methods = findState.clazz.getDeclaredMethods();//获取的是类的所有公有方法,这就包括自身 和从基类继承的、从接口实现的所有 public 方法。
            //getDeclareMethods 返回的是该类中定义的「所有方法」,但是不包括从父类继承而来的方法
        } catch (Throwable th) {
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        for (Method method : methods) {//遍历方法
            int modifiers = method.getModifiers();
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                if (parameterTypes.length == 1) {//参数列表长度为 0
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        if (findState.checkAdd(method, eventType)) {
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            //将订阅方法信息添加到 findState 中
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }
ignoreGeneratedIndex 是什么?

由于反射成本高,而且 EventBus 3.0 引入了 EventBusAnnotationProcessor,故默认 ignoreGeneratedIndex 为 false,需要注意的是,如果设置 ignoreGeneratedIndex 为 true,则前面使用的 MyEventBusIndex 无效,还是会走反射解析的分支

3.2.2 使用编译期生成的订阅方法信息

网上有很多介绍 EventBus 的文章,但是几乎没有提到 EventBusAnnotationProcessor 的。在 3.0 版本开始,EventBus提供了一个EventBusAnnotationProcessor注解处理器来在编译期通过读取@Subscribe()注解并解析,处理其中所包含的信息,然后生成java类来保存所有订阅者关于订阅的信息,这样就比在运行时使用反射来获得这些订阅者的信息速度要快.我们可以参考EventBus项目里的EventBusPerformance这个例子,编译后我们可以在build文件夹里找到这个类,MyEventBusIndex 类,当然类名是可以自定义的.我们大致看一下生成的MyEventBusIndex类是什么样的:

订阅者中的 订阅方法

public class ReceiveEventFragment extends Fragment {
    //……
    
    @Subscribe(threadMode = ThreadMode.MAIN, priority = 0)
    public void onReceive(MsgEvent event) {
        Log.d(TAG, "onReceive: " + event);
        mTvMsg.setText(String.format("msgId:%d\nmsg:%s", event.msgId, event.msg));
    }

    @Subscribe(threadMode = ThreadMode.MAIN, priority = 10000)
    public void ShowToast(MsgEvent event) {
        Toast.makeText(getActivity(), String.format("msgId:%d\nmsg:%s", event.msgId, event.msg), Toast.LENGTH_SHORT).show();
    }
}
/** This class is generated by EventBus, do not edit. */
public class MyEventBusIndex implements SubscriberInfoIndex {
    private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;

    static {
        SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();//以订阅者为 key,以订阅者中的 订阅方法列表为 value 的 map
        //将 ReceiveEventFragment 的订阅信息存储到 map 中
        putIndex(new SimpleSubscriberInfo(com.test.commentdemo.eventbusdemo.ReceiveEventFragment.class,
                true, new SubscriberMethodInfo[] {
            new SubscriberMethodInfo("onReceive", com.test.commentdemo.eventbusdemo.MsgEvent.class,
                    ThreadMode.MAIN),
            new SubscriberMethodInfo("ShowToast", com.test.commentdemo.eventbusdemo.MsgEvent.class,
                    ThreadMode.MAIN, 10000, false),
        }));//订阅者中所有的订阅方法
        
        //……代码省略(存储其他订阅者的订阅信息到 map 中)
    }

    private static void putIndex(SubscriberInfo info) {
        SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
    }

    @Override
    public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
        SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);//根据类型从 Map 获取订阅者的订阅信息
        if (info != null) {
            return info;
        } else {
            return null;
        }
    }
}

继续前面 ignoreGeneratedIndex 为 false 时,会执行以下分支。

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
        findState.subscriberInfo = getSubscriberInfo(findState);
        if (findState.subscriberInfo != null) {
            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();//获取订阅的方法信息
            for (SubscriberMethod subscriberMethod : array) {
                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                    findState.subscriberMethods.add(subscriberMethod);
                }
            }
        } else {
            findUsingReflectionInSingleClass(findState);//如果找不到相应的订阅方法信息(可能使用 EventBus 实例之前,没有将MyEventBusIndex ),需要通过反射获取订阅方法信息
        }
        findState.moveToSuperclass();
    }
    return getMethodsAndRelease(findState);
}

findUsingInfo()方法,其无非就是通过查找我们前面所说的MyEventBusIndex类中的信息,来转换成List<SubscriberMethod>从而获得订阅类的相关订阅函数的各种信息

SubscriberMethodFinder#getSubscriberInfo

private SubscriberInfo getSubscriberInfo(SubscriberMethodFinder.FindState findState) {
    if(findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) 
        SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();//获取父类的订阅方法信息
        if(findState.clazz == superclassInfo.getSubscriberClass()) {//
            return superclassInfo;
        }
    }

    if(this.subscriberInfoIndexes != null) {
        Iterator superclassInfo1 = this.subscriberInfoIndexes.iterator();
        //遍历,从 index 中获取 订阅信息
        while(superclassInfo1.hasNext()) {
            SubscriberInfoIndex index = (SubscriberInfoIndex)superclassInfo1.next();
            SubscriberInfo info = index.getSubscriberInfo(findState.clazz);//从自动生成的 MyEventBusIndex 类中的 SUBSCRIBER_INDEX 里面获取订阅方法信息
            if(info != null) {
                return info;
            }
        }
    }
    return null;
}

3.3 解注册

/** Unregisters the given subscriber from all event classes. */
public synchronized void unregister(Object subscriber) {
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);//获取订阅者订阅的事件列表
    if (subscribedTypes != null) {
        for (Class<?> eventType : subscribedTypes) {//遍历订阅者所订阅的事件
            //从事件对应的订阅列表中 订阅者的订阅信息
            unsubscribeByEventType(subscriber, eventType);
        typesBySubscriber.remove(subscriber);//移除订阅者
    } else {
        logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
    }
}
/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
    List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    //从事件对应的订阅列表中 订阅者的订阅信息
    if (subscriptions != null) {
        int size = subscriptions.size();
        for (int i = 0; i < size; i++) {
            Subscription subscription = subscriptions.get(i);
            if (subscription.subscriber == subscriber) {
                subscription.active = false;
                subscriptions.remove(i);
                i--;
                size--;
            }
        }
    }
}

3.4 发送事件

EventBus#post

1. 获取订阅列表

public class EventBus {

    private final ThreadLocal<EventBus.PostingThreadState> currentPostingThreadState;//线程私有变量——当前发布线程状态

    public void post(Object event) {
        EventBus.PostingThreadState postingState = (EventBus.PostingThreadState)this.currentPostingThreadState.get();//记录发布线程的状态(比如是否是主线程)
        List eventQueue = postingState.eventQueue;//从发布状态中获取事件队列
        eventQueue.add(event);//添加进事件队列的队尾
        if(!postingState.isPosting) {//当前不是处于分发状态
            postingState.isMainThread = this.isMainThread();//是否在主线程
            postingState.isPosting = true;//将状态改为发布中
            if(postingState.canceled) {//取消发布
                throw new EventBusException("Internal error. Abort state was not reset
            }

            try {
                while(!eventQueue.isEmpty()) {//只要事件队列非空,就一直往外取出事件并发布
                    this.postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }
                                            
    static final class PostingThreadState {
        final List<Object> eventQueue = new ArrayList();//事件队列
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event;
        boolean canceled;

        PostingThreadState() {
        }
    }                                                                   
}
final static class PostingThreadState {
    final List<Object> eventQueue = new ArrayList<>();//事件队列
    boolean isPosting;//是否正在分发中
    boolean isMainThread;//是否在主线程
    Subscription subscription;
    Object event;//事件
    boolean canceled;//取消
}

private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
    @Override
    protected PostingThreadState initialValue() {
        return new PostingThreadState();
    }
};//currentPostingThreadState 是一个线程局部变量


/** Posts the given event to the event bus. */
public void post(Object event) {
    PostingThreadState postingState = currentPostingThreadState.get();//获取当前线程事件分发状态
    List<Object> eventQueue = postingState.eventQueue;
    eventQueue.add(event);//添加到事务队列中

    if (!postingState.isPosting) {//如果当前不是在分发状态,则进入分发状态
        postingState.isMainThread = isMainThread();
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            //编译事件队列,逐一进行分发
            while (!eventQueue.isEmpty()) {
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

EventBus#postSingleEvent

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) {//如果事件支持子类型,查找该事件的所有子类型
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

EventBus#postSingleEventForEventType,

 //查找 事件所对应的订阅者列表,然后迭代列表,切换到目标线程执行
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        subscriptions = subscriptionsByEventType.get(eventClass);//查找 事件所对应的订阅者列表
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted = false;
            try {
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {//事件取消了
                break;
            }
        }
        return true;
    }
    return false;
}

2.切换到指定线程

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING://
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster not decoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND://后台线程
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC://新建一个子线程处理
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}
BACKGROUND
public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);//包装为一个 PendingPost
    synchronized (this) {//加锁,进入同步块
        queue.enqueue(pendingPost);//添加到队列中
        if (!executorRunning) {//如果现在没有在执行 后台任务,则获取一个新线程执行任务
            executorRunning = true;
            eventBus.getExecutorService().execute(this);
        }
    }
}

@Override
public void run() {
    try {
        try {
            while (true) {//循环,直到队列为空
                PendingPost pendingPost = queue.poll(1000);//获取 PendingPost,最多等到 1 秒

                if (pendingPost == null) {
                    //前面取不到 PendingPost,下面进行双重校验检查
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();//
                        if (pendingPost == null) {
                            //队列确实为空,停止运行
                            executorRunning = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);//通过反射调用相应的方法
            }
        } catch (InterruptedException e) {
            eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
        }
    } finally {
        executorRunning = false;
    }
}
MAIN
mainThreadPoster.enqueue(subscription, event);

HandlerPoster#enqueue

public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {//加锁
        queue.enqueue(pendingPost);//加入 pendingQueue 中。该队列会在 handleMessage 方法中调用
        if (!handlerActive) {//如果当前 handler 不是处于活跃状态,则退出
            handlerActive = true;
            if (!sendMessage(obtainMessage())) {//发送信息,提示
                throw new EventBusException("Could not send handler message");
            }
        }
    }
}

org.greenrobot.eventbus.HandlerPoster#handleMessage

@Override
public void handleMessage(Message msg) {
    boolean rescheduled = false;
    try {
        long started = SystemClock.uptimeMillis();
        while (true) {
            PendingPost pendingPost = queue.poll();
            if (pendingPost == null) {
                synchronized (this) {
                    // Check again, this time in synchronized
                    pendingPost = queue.poll();
                    if (pendingPost == null) {
                        handlerActive = false;
                        return;
                    }
                }
            }
            eventBus.invokeSubscriber(pendingPost);
            long timeInMethod = SystemClock.uptimeMillis() - started;
            if (timeInMethod >= maxMillisInsideHandleMessage) {
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
                rescheduled = true;
                return;
            }
        }
    } finally {
        handlerActive = rescheduled;
    }
}

在 handleMessage 方法内部停留时间不能大于 10 毫秒,从

MAIN_ORDERED

新增加的模式,但是当前版本的实现还不完善。

ASYN

将事件添加 cachedThreadPool 中执行(如果当前有空闲线程,则复用空闲线程,如果没有就创建新线程)

public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    queue.enqueue(pendingPost);
    eventBus.getExecutorService().execute(this);//放到默认的 cachedThreaPool 中执行
}

@Override
public void run() {
    PendingPost pendingPost = queue.poll();
    if(pendingPost == null) {
        throw new IllegalStateException("No pending post available");
    }
    eventBus.invokeSubscriber(pendingPost);
}

3.反射调用订阅方法

org.greenrobot.eventbus.EventBus#invokeSubscriber(org.greenrobot.eventbus.PendingPost)

void invokeSubscriber(PendingPost pendingPost) {
    Object event = pendingPost.event;
    Subscription subscription = pendingPost.subscription;
    PendingPost.releasePendingPost(pendingPost);
    if (subscription.active) {
        invokeSubscriber(subscription, event);
    }
}
void invokeSubscriber(Subscription subscription, Object event) {
    try {
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

切换到指定线程之后,通过反射的方法,调用订阅方法。

四、总结

EventBus 的实现原理可以归结为以下三点

  • 注册—>扫描订阅方法,添加到订阅方法列表中
  • 发送事件—>根据事件的类型,遍历方法列表,反射调用订阅方法
  • 解注册—>从订阅方法列表中移除相应的订阅方法

EventBus 虽然不是标准的观察者模式的实现, 但是它的整体就是一个发布 / 订阅框架, 也拥有观察者模式的优点, 比如: 发布者和订阅者的解耦。

五、参考资料与学习资源推荐

由于本人水平有限,可能出于误解或者笔误难免出错,如果发现有问题或者对文中内容存在疑问欢迎在下面评论区告诉我。谢谢!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容