事件总线框架-EventBus源码解析

只有越来越强大,才能越来越童话。——《美人鱼》

前言

该系列以源码阅读为主,框架基础使用请自行查看官方文档。另源码基于3.2.0版本

第一步:获取Eventbus实例

使用Eventbus前需要先获取Eventbus实例,获取方式有两种,下面分开来分析源码(这部分的逻辑比较简单,请仔细源码源码注释部分为后文理解打基础):

方式一:EventBus.getDefault()

其源码如下:

public class EventBus {
    
    public static String TAG = "EventBus";
    //单例模式下的默认实例
    static volatile EventBus defaultInstance;
    //默认EventBusBuilder实例
    private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
    //事件类型缓存集合
    private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<>();
    //事件订阅集合,key:订阅的事件类型,value:订阅这个事件的所有订阅者集合
    private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
    //订阅者类型的集合,key:订阅者对象,value:这个订阅者订阅的事件集合
    private final Map<Object, List<Class<?>>> typesBySubscriber;
    //粘性事件集合,粘性事件就是指订阅者注册事件后,会发送一次最近发生的事件。key:粘性事件类型, value:事件对象
    private final Map<Class<?>, Object> stickyEvents;
    //currentPostingThreadState是一个ThreadLocal,他的特点是获取当前线程一份独有的变量数据,不受其他线程影响。
    private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };

    // @Nullable Eventbus支持Android和Java,这个在Android中,用来标记Android的UI线程
    private final MainThreadSupport mainThreadSupport;
    // 1 @Nullable 主线程的poster,获取主线程的loop,通过handler执行
    private final Poster mainThreadPoster;
    // 2 Background线程的poster(通过Executors.newCachedThreadPool()一个事件一个事件执行。)
    private final BackgroundPoster backgroundPoster;
    // 3 事件异步处理的poster
    private final AsyncPoster asyncPoster;
    //订阅者响应函数信息存储和查找类
    private final SubscriberMethodFinder subscriberMethodFinder;
    //ExecutorService是Executor直接的扩展接口,也是最常用的线程池接口
    //Eventbus中设置其默认的值为Executors.newCachedThreadPool(),源码见EventBusBuilder类中的private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
    private final ExecutorService executorService;
    //以下为EventBusBuilder类中的成员变量,在EventBusBuilder类中加以说明
    private final boolean throwSubscriberException;
    private final boolean logSubscriberExceptions;
    private final boolean logNoSubscriberMessages;
    private final boolean sendSubscriberExceptionEvent;
    private final boolean sendNoSubscriberEvent;
    private final boolean eventInheritance;
    //是EventBuilder.subscriberInfoIndexes.size()的值,即subscriberInfo索引数量,subscriberInfoIndexes是什么会在后面讲解
    private final int indexCount;
    //日志管理类
    private final Logger logger;
    
    /** 使用单例便捷创建进程范围内EventBus实例*/
    public static EventBus getDefault() {
        EventBus instance = defaultInstance;
        if (instance == null) {
            synchronized (EventBus.class) {
                instance = EventBus.defaultInstance;
                if (instance == null) {
                    instance = EventBus.defaultInstance = new EventBus();
                }
            }
        }
        return instance;
    }

    public EventBus() {
        //4
        this(DEFAULT_BUILDER);
    }
    //5
    EventBus(EventBusBuilder builder) {
        logger = builder.getLogger();
        subscriptionsByEventType = new HashMap<>();
        typesBySubscriber = new HashMap<>();
        stickyEvents = new ConcurrentHashMap<>();
        //返回持有UI线程looper的MainThreadSupport对象
        mainThreadSupport = builder.getMainThreadSupport();
        //构建与主线程通信的HandlerPoster对象
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
        backgroundPoster = new BackgroundPoster(this);
        asyncPoster = new AsyncPoster(this);
        indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
        //构建SubscriberMethodFinder实例,由下文EventBusBuilder 源码可知ignoreGeneratedIndex默认为false,即默认SubscriberMethodFinder查找订阅方法时不能忽略注解器生成的MyEventBusIndex索引
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
        logSubscriberExceptions = builder.logSubscriberExceptions;
        logNoSubscriberMessages = builder.logNoSubscriberMessages;
        sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
        sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
        throwSubscriberException = builder.throwSubscriberException;
        eventInheritance = builder.eventInheritance;
        executorService = builder.executorService;
    }
...
}

可见EventBus 内部使用双重检查加锁实现了进程范围的单例模式,其中注释1、2、3处分别创建了三个线程模型,他们是实现发送和处理消息线程切换的关键,关于他们的讲解在后面给出。注释4处可见其会以new EventBusBuilder()作为参数来调用注释儿处的构造方法,而EventBusBuilder()源码为空构造函数,所以通过EventBus.getDefault()方法获取实例是以EventBusBuilder成员变量的默认值以及EventBus成员变量的默认值去构建EventBus实例。

方式二:通过EventBus.builder().build()构建实例

很明显,此方式为通过构建者模式去获取实例,当你希望自定义参数设置时可使用该方法,EventBus.builder()源码如下:

public static EventBusBuilder builder() {
        return new EventBusBuilder();
    }

再来看EventBusBuilder类的源码(同样请仔细阅读注释部分,帮助理解后文):

public class EventBusBuilder {
    //EventBus内部默认维护的线程池
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
    //订阅者异常日志是否开启,默认为true
    boolean logSubscriberExceptions = true;
    //没有订阅者消息异常是否开启,默认为true
    boolean logNoSubscriberMessages = true;
    //是否发送订阅者事件异常,默认为true
    boolean sendSubscriberExceptionEvent = true;
    //如果没有订阅者是否发送事件,默认为true
    boolean sendNoSubscriberEvent = true;
    //是否抛出订阅者异常,默认false
    boolean throwSubscriberException;
    //有继承关系的事件的订阅者是否接收该事件,默认为true
    boolean eventInheritance = true;
    // 1 是否忽略注解器生成的MyEventBusIndex索引,默认为false(注解生成器EventBusAnnotationProcessor为3.0以后新增特性,其可以在编译期读取@Subscribe()注解并且解析其中所包含的信息,以供运行时直接使用,从而运行时的性能大大提高了)
    boolean ignoreGeneratedIndex;
    //是否开启方法严格验证,默认为false
    boolean strictMethodVerification;
    //在ThreadMode.ASYNC模式下默认的线程池
    ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
    //不进行方法验证的类的集合
    List<Class<?>> skipMethodVerificationForClasses;
    //订阅者索引信息集合
    List<SubscriberInfoIndex> subscriberInfoIndexes;
    //日志管理者
    Logger logger;
    //Eventbus支持Android和Java,这个在Android中,用来标记Android的UI线程
    MainThreadSupport mainThreadSupport;

    EventBusBuilder() {
    }

    public EventBus build() {
        return new EventBus(this);
    }
  ...
  //对以onEvent开头的方法进行方法名称验证,以避免输入错误; 使用此方法,可以从此检查中排除订阅者类型。 同时禁用对方法修饰符(公共,非静态或抽象)的检查
  public EventBusBuilder skipMethodVerificationFor(Class<?> clazz) {
        if (skipMethodVerificationForClasses == null) {
            skipMethodVerificationForClasses = new ArrayList<>();
        }
        skipMethodVerificationForClasses.add(clazz);
        return this;
    }
  /** 添加由EventBus的注释预处理器生成的索引。 */
    public EventBusBuilder addIndex(SubscriberInfoIndex index) {
        if (subscriberInfoIndexes == null) {
            subscriberInfoIndexes = new ArrayList<>();
        }
        subscriberInfoIndexes.add(index);
        return this;
    }
  
  MainThreadSupport getMainThreadSupport() {
        //如果我们自己传入了MainThreadSupport则直接返回
        if (mainThreadSupport != null) {
            return mainThreadSupport;
         //这里isAndroidLogAvailable方法判断Android SDK里包含android.util.Log类没有
        } else if (Logger.AndroidLogger.isAndroidLogAvailable()) {
        //通过getAndroidMainLooperOrNull方法取得一个主线程的消息循环对象,该方法只是调用了 Looper.getMainLooper()
            Object looperOrNull = getAndroidMainLooperOrNull();
            //不为空则利用这个消息循环对象构建出AndroidHandlerMainThreadSupport对象,也就是MainThreadSupport的子类
            return looperOrNull == null ? null :
                    new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
        } else {
            return null;
        }
    }
  static Object getAndroidMainLooperOrNull() {
        try {
            return Looper.getMainLooper();
        } catch (RuntimeException e) {
            // Not really a functional Android (e.g. "Stub!" maven dependencies)
            return null;
        }
    }
  //将当前设置应用于 Default EventBus
  public EventBus installDefaultEventBus() {
        synchronized (EventBus.class) {
            if (EventBus.defaultInstance != null) {
                throw new EventBusException("Default instance already exists." +
                        " It may be only set once before it's used the first time to ensure consistent behavior.");
            }
            EventBus.defaultInstance = build();
            return EventBus.defaultInstance;
        }
    }
  
    public EventBus build() {
        return new EventBus(this);
    }
}

省略代码为其成员变量的设置方法,在调用build()方法前可以调用这些方法来自定义成员变量的值。在调用build() 方法时又会去调用上面注释5中的EventBus(EventBusBuilder builder) 方法,所以这种方式构建实例其实只是比第一种多了一步EventBusBuilder参数的自定义步骤。其中注释1处的注解生成器EventBusAnnotationProcessor为3.0以后新增特性用以优化其性能,其可以在编译期读取@Subscribe()注解并且解析其中所包含的信息生成MyEventBusIndex类我们称之为订阅者索引,以供运行时直接使用,从而使运行时的性能大大提高了,我们通常使用EventBus时不会去设置该功能,关于注解生成器的作用在后面部分再加以补充,我们先重点关注EventBus的整体逻辑实现。
总结:两种方式本质相同,只是第一种方式下使用默认设置,第二种方式可自定义EventBusBuilder参数设置

第二步:得到eventbus对象后进行注册订阅

注册代码eventBus.register(this),其源码如下:

/**
     * 注册监听者去接收事件. 监听者在对事件不感兴趣时必须调用unregister(Object)方法来取消监听
     * 订阅方法必须使用@Subscribe进行标注,同时可以设置@ThreadMode以及优先级
     */
    public void register(Object subscriber) {
        //首先获取注册的对象的类型
        Class<?> subscriberClass = subscriber.getClass();
        //1然后获取注册的对象的订阅方法
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        //2对当前实例加锁并循环注册订阅方法
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                // 对订阅方法进行注册订阅
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

代码逻辑注释已写出,这里的SubscriberMethod封装了订阅方法(使用@Subscribe注解的方法)的信息,其源码如下所示:

public class SubscriberMethod {
    final Method method;// 订阅方法
    final ThreadMode threadMode; // 标识在哪个线程执行,有POSTING,MAIN,BACKGROUND,ASYNC 四种模式
    final Class<?> eventType; // 事件类型
    final int priority;// 优先级
    final boolean sticky;//是否为粘性事件
    /** Used for efficient comparison */
    String methodString;

    public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) {
        this.method = method;
        this.threadMode = threadMode;
        this.eventType = eventType;
        this.priority = priority;
        this.sticky = sticky;
    }
...
}

可见,在SubscriberMethod类中,主要用来保存@Subscribe注解的订阅方法的Method对象、线程模式、事件类型、优先级、是否是黏性事件等属性。接着来看上面register(Object subscriber)源码中注释1部分的逻辑,其通过subscriberMethodFinder.findSubscriberMethods(Class<?> subscriberClass)方法从注册者类中找出并返回了所有订阅方法的SubscriberMethod实例组成的列表,来看看该方法的源码:

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        // 这里首先从缓存当中尝试去取该订阅者的订阅方法
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
        // 当缓存中没有找到该订阅者的订阅方法的时候使用下面的两种方法获取方法信息
        //ignoreGeneratedIndex参数表示是否忽略注解器生成的MyEventBusIndex索引,该值默认为false
        if (ignoreGeneratedIndex) {
            //如果 ignoreGeneratedIndex为true 就通过反射获取订阅方法信息
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            //如果ignoreGeneratedIndex为false,通过EventBusAnnotationProcessor(注解处理器)生成的MyEventBusIndex中获取订阅方法信息
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        //如果未找到订阅方法则报错
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            //将找到的订阅方法放置到缓存当中
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

代码逻辑注释已注明,其中ignoreGeneratedIndex默认为false,这在前面EventBus(EventBusBuilder builder) 方法中已经分析过了,那么我们就先来看看ignoreGeneratedIndex为false时执行的findUsingInfo(subscriberClass)的源码:

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        // 通过FindState对象来存储找到的方法信息
        FindState findState = prepareFindState();
        // 初始化FindState
        findState.initForSubscriber(subscriberClass);
        // 从当前类开始循环遍历该类的所有父类
        while (findState.clazz != null) {
            // 1 如果我们通过EventBusBuilder配置了MyEventBusIndex,便会通过getSubscriberInfo(findState)方法获取到subscriberInfo ,通常情况下我们使用EventBus时并没有配置
            findState.subscriberInfo = getSubscriberInfo(findState);
            if (findState.subscriberInfo != null) {
                // 如果使用了MyEventBusIndex,将会进入到这里并获取订阅方法信息
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                // 2 未使用MyEventBusIndex将会进入这里使用反射获取订阅方法信息
                findUsingReflectionInSingleClass(findState);
            }
            // 将findState.clazz设置为当前的findState.clazz的父类
            findState.moveToSuperclass();
        }
        //返回查找到的所有订阅信息并释放FindState对象
        return getMethodsAndRelease(findState);
    }

这里注释1处先通过prepareFindState()找到方法信息并封装成FindState对象存储起来,先来看看FindState类的源码定义:

static class FindState {
        final List<SubscriberMethod> subscriberMethods = new ArrayList<>(); // 订阅方法信息列表
        final Map<Class, Object> anyMethodByEventType = new HashMap<>(); //以事件类型为key, 方法信息为value的集合 
        final Map<String, Class> subscriberClassByMethodKey = new HashMap<>(); // 以methodkey为key,订阅者类为value的集合
        final StringBuilder methodKeyBuilder = new StringBuilder(128); // 生成methodkey的stringbuilder

        Class<?> subscriberClass; // 订阅者类
        Class<?> clazz; 
        boolean skipSuperClasses; // 是否跳过父类
        SubscriberInfo subscriberInfo; 
        //初始化订阅者类下的FindState
        void initForSubscriber(Class<?> subscriberClass) {
            this.subscriberClass = clazz = subscriberClass;
            skipSuperClasses = false;
            subscriberInfo = null;
        }
        //回收FindState占用的资源,释放内存
        void recycle() {
            subscriberMethods.clear();
            anyMethodByEventType.clear();
            subscriberClassByMethodKey.clear();
            methodKeyBuilder.setLength(0);
            subscriberClass = null;
            clazz = null;
            skipSuperClasses = false;
            subscriberInfo = null;
        }
    // checkAdd这部分的作用是检测订阅者中注册的事件响应方法是否可以合法的加入到订阅方法信息中,分为两层检查
        boolean checkAdd(Method method, Class<?> eventType) {
            //第一层检查同一事件类型下是否已有订阅方法信息
            Object existing = anyMethodByEventType.put(eventType, method);
            if (existing == null) {
                return true;//如果还未有订阅方法监听此事件,则可添加此订阅方法信息
            } else {
                if (existing instanceof Method) {
                // 检测到同一事件类型下已经有订阅事件响应方法,则继续进行第二层方法签名的检查
                    if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                        // Paranoia check
                        throw new IllegalStateException();
                    }
                    // Put any non-Method object to "consume" the existing Method
                    anyMethodByEventType.put(eventType, this);
                }
                return checkAddWithMethodSignature(method, eventType);
            }
        }
        // 使用订阅方法签名检测是否可以加入订阅方法信息列表
        private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
            methodKeyBuilder.setLength(0);
            methodKeyBuilder.append(method.getName());
            methodKeyBuilder.append('>').append(eventType.getName());//使用订阅方法名以及订阅事件类型构造methordKey

            String methodKey = methodKeyBuilder.toString();
            Class<?> methodClass = method.getDeclaringClass();
            Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass); //判断是否存方法签名相同的事件响应方法,并比较相同方法签名的订阅方法所在类的关系
            if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
                // Only add if not already found in a sub class
                // 如果不存在同样方法签名的订阅方法或者,之前保存的订阅方法所在的类为当前将要添加的订阅方法所在的类的子类(目前不存在此情况,因为只会从子类向父类查找),则可以合法添加此订阅方法信息
                return true;
            } else {
                // Revert the put, old class is further down the class hierarchy
                //subscriberClassByMethodKey只保存父子继承关系的最下层子类,目的是为了在子类注册监听事件时,如果父类中有相同的事件响应方法,应该调用子类的覆写方法。
                subscriberClassByMethodKey.put(methodKey, methodClassOld);
                return false;
            }
        }

        void moveToSuperclass() {
            if (skipSuperClasses) {
                clazz = null;
            } else {
                clazz = clazz.getSuperclass();
                String clazzName = clazz.getName();
                /** Skip system classes, this just degrades performance. */
                if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")) {
                    clazz = null;
                }
            }
        }
    }

可见其主要用来帮助从订阅者类中查找出订阅方法信息并封装起来,其中方法用于条件校验及查找订阅者类的父类。这里不多做分析。回到上面findUsingInfo(Class<?> subscriberClass)源码中,通过注释1可知通常我们会走到注释2逻辑中,来看看findUsingReflectionInSingleClass(findState)方法的源码:

private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // This is faster than getMethods, especially when subscribers are fat classes like Activities
            methods = findState.clazz.getDeclaredMethods(); // 通过反射来获取到订阅者类的所有方法
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            // 找到所有声明为 public 的方法
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes(); // 找到方法参数并且参数数量为1
                if (parameterTypes.length == 1) {
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); // 得到注解,并将方法添加到订阅信息中去
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        if (findState.checkAdd(method, eventType)) {
                            // 将订阅方法信息加入到列表中
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }

可见其逻辑为:

  1. 通过反射得到当前 class 的所有方法
  2. 过滤掉不是 public 和是 abstract、static、bridge、synthetic 的方法
  3. 找出所有参数只有一个的方法
  4. 找出被Subscribe注解的方法
  5. 把method 方法和 事件类型eventtype添加到 findState 中
  6. 把 method 方法、事件类型、threadMode、priority、sticky 封装成 SubscriberMethod 对象,然后添加到 findState.subscriberMethods
    这里我们再回到findSubscriberMethods(Class<?> subscriberClass) 源码中,如果 ignoreGeneratedIndex为true 就是忽略注解生成器生成的订阅者索引MyEventBusIndex类,便会运行findUsingReflection(subscriberClass)方法,来看其源码:
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
        // 通过FindState对象来存储找到的方法信息
        FindState findState = prepareFindState(); 
        findState.initForSubscriber(subscriberClass); // 初始化FindState
        while (findState.clazz != null) { 
            findUsingReflectionInSingleClass(findState);// 寻找某个类中的所有事件响应方法
            findState.moveToSuperclass(); //继续寻找当前类父类中注册的事件响应方法
        }
         //返回查找到的所有订阅信息并释放FindState对象
        return getMethodsAndRelease(findState);
    }

可见其逻辑与findUsingInfo(Class<?> subscriberClass)方法的逻辑基本相同,只是findUsingInfo(Class<?> subscriberClass)多一个判断我们使用EventBusBuilder时是否配置了MyEventBusIndex的逻辑,如果我们没有配置MyEventBusIndex其之后逻辑与findUsingReflection逻辑相同,都是通过findUsingReflectionInSingleClass(findState)方法通过反射获取订阅方法信息的。所以我们了解到要使用订阅者索引MyEventBusIndex必须同时满足两个条件,一是ignoreGeneratedIndex为false(其默认为false,EventBus默认推荐我们使用订阅者索引),二是我们使用EventBus时通过EventBusBuilder配置了MyEventBusIndex。通过订阅者索引MyEventBusIndex我们能更高效的查找到订阅者方法的信息,但我们重点关注EventBus的实现逻辑,先暂且不论性能问题,不论使不使用订阅者索引MyEventBusIndex我们都拿到了订阅者方法信息并封装了起来,接下来回到上面的register(Object subscriber) 源码看注释2后面的逻辑,其拿到所有订阅方法后会去遍历并通过subscribe(subscriber, subscriberMethod)方法订阅所有订阅方法,subscribe(subscriber, subscriberMethod)源码如下:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType; // 获取订阅方法的事件类
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod); // 创建订阅类
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); // 获取订阅了此事件类的所有订阅者信息列表,如果不存在则将其加入订阅者信息列表,如果已经存在此订阅者信息则抛出已注册的异常
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }
        
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
        // 将订阅者信息按照优先级加入到订阅者信息列表中
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); // 获取此订阅者实例订阅的所有事件类的列表
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType); // 将此事件类加入 订阅者事件类列表中
        // 判断是否是粘性事件,对粘性事件立即进行分发
        if (subscriberMethod.sticky) {
            if (eventInheritance) { 
                // 所有事件类的子类都要处理
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                // 如果当前粘性事件集合中存在此事件类对应的粘性事件对象,则进入事件分发流程
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

其主要逻辑如下:

  1. 如果当前订阅信息没有注册过,则按照订阅类型和优先级将订阅信息的封装类Subscription加入到订阅信息集合subscriptionsByEventType中(subscriptionsByEventType以事件类型为key,以订阅信息列表为value的订阅信息集合)。
  2. 将事件类型加入到订阅事件类型集合typesBySubscriber(typesBySubscriber 以订阅者对象为key,以订阅事件类型为value的订阅类型信息集合)。
  3. 如果当前订阅的为粘性事件,则进入事件分发的流程。
    Map<Class<?>, Object> stickyEvents; 以事件类为key,以事件对象为value的粘性事件集合

方法中的Subscription是订阅信息类,封装了订阅者对象,订阅方法,是否处于激活状态等信息,源码如下:

final class Subscription {
    final Object subscriber; // 订阅者对象
    final SubscriberMethod subscriberMethod;  // 订阅方法
    /**
     * Becomes false as soon as {@link EventBus#unregister(Object)} is called, which is checked by queued event delivery
     * {@link EventBus#invokeSubscriber(PendingPost)} to prevent race conditions.
     */
    volatile boolean active; // 是否处于激活状态

    Subscription(Object subscriber, SubscriberMethod subscriberMethod) {
        this.subscriber = subscriber;
        this.subscriberMethod = subscriberMethod;
        active = true;
    }

    @Override
    public boolean equals(Object other) {
        if (other instanceof Subscription) {
            Subscription otherSubscription = (Subscription) other;
            return subscriber == otherSubscription.subscriber
                    && subscriberMethod.equals(otherSubscription.subscriberMethod);
        } else {
            return false;
        }
    }

    @Override
    public int hashCode() {
        return subscriber.hashCode() + subscriberMethod.methodString.hashCode();
    }
}

到此为止,EventBus的注册流程就分析完了,总结如下:


EventBus的注册流程图

第三步:注册流程结束后,发送消息并执行订阅方法

继续来讲解EventBus的事件分发,即post一个事件之后怎么传达到对应的订阅者的。发送消息的方式有两种,一种为发送普通消息post(Object event),一种为发送粘性消息postSticky(Object event),先来看看发送普通消息post(Object event)方法的源码:

private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
    @Override
    protected PostingThreadState initialValue() {
        return new PostingThreadState();
    }
};
/** Posts the given event to the event bus. */
public void post(Object event) {
    //currentPostingThreadState是一个ThreadLocal,他的特点是获取当前线程一份独有的变量数据,不受其他线程影响。
    PostingThreadState postingState = currentPostingThreadState.get();
    //postingState就是获取到的线程独有的变量数据 
    List<Object> eventQueue = postingState.eventQueue;
    //把post的事件添加到事件队列
    eventQueue.add(event);
    // 如果没有处在事件发布状态,那么开始发送事件并一直保持发布状态
    if (!postingState.isPosting) {
        postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            while (!eventQueue.isEmpty()) {
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

/** For ThreadLocal, much faster to set (and get multiple values). */
final static class PostingThreadState {
    //事件队列
    final List<Object> eventQueue = new ArrayList<Object>();
    boolean isPosting;
    boolean isMainThread;
    Subscription subscription;
    Object event;
    boolean canceled;
}

可见post方法的大致流程为 :

  1. 首先根据 currentPostingThreadState 获取当前线程状态 postingState 。currentPostingThreadState 其实就是一个 ThreadLocal 类的对象,不同的线程根据自己独有的索引值可以得到相应属于自己的 postingState 数据。
  2. 然后把事件 event 加入到 eventQueue 队列中排队。
  3. 循环遍历 eventQueue ,取出事件发送事件。

发送单个事件是调用 postSingleEvent(Object event, PostingThreadState postingState) 方法。其方法源码:

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    // 得到事件的类型
    Class<?> eventClass = event.getClass();
    // 是否找到订阅者
    boolean subscriptionFound = false;
    // 如果支持事件继承,默认为支持
    if (eventInheritance) {
        // 查找 eventClass 的所有父类和接口
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            // 依次向 eventClass 的父类或接口的订阅方法发送事件
            // 只要有一个事件发送成功,返回 true ,那么 subscriptionFound 就为 true
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        // 1 发送事件
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    // 如果没有订阅者
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            Log.d(TAG, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            // 发送 NoSubscriberEvent 事件,可以自定义接收
            post(new NoSubscriberEvent(this, event));
        }
    }
}

在postSingleEvent方法中,事件发送逻辑根据 eventInheritance分成两种,大致流程为:
第一种:eventInheritance 默认为true,支持事件继承:得到 eventClass 的所有父类和接口,然后循环依次发送事件;
第二种:eventInheritance 为false,不支持事件继承:直接发送eventClass 事件。
若找不到订阅者,默认会发送 NoSubscriberEvent 事件。开发者可以自定义订阅方法接收这个事件。
关于事件发送的具体操作由注释1处可知需进一步到 postSingleEventForEventType方法中去看:

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        // 得到订阅者
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        // 依次遍历订阅者
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted = false;
            try {
                // 发送事件
                postToSubscription(subscription, event, postingState.isMainThread);
                // 是否被取消了
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            // 如果被取消,则跳出循环
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}

postSingleEventForEventType方法主要做的事情:

  • 从订阅者注册列表中取出eventClass事件对应的订阅者Subscription列表,
  • 遍历了订阅者Subscription,然后依次调用 postToSubscription 方法发送事件。
    postToSubscription(subscription, event, postingState.isMainThread)源码如下:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    // 根据不同的线程模式执行对应
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING: // 和发送事件处于同一个线程
            invokeSubscriber(subscription, event);
            break;
        case MAIN: // 主线程
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case BACKGROUND: // 子线程
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC: // 和发送事件处于不同的线程
            asyncPoster.enqueue(subscription, event);
            break;
        default: // 抛出异常
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

void invokeSubscriber(Subscription subscription, Object event) {
    try {
        // 通过反射执行订阅方法
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

看到了这里不知道你还记不记一开始说过的EventBus维护的3个线程模型变量mainThreadPoster、
backgroundPoster、asyncPoster,我们知道在使用EventBus时,我们在使用@Subscribe注解订阅方法时也可以指定threadMode,代表订阅方法所运行的线程;指定sticky,代表是否是粘性事件;指定priority,代表优先级。其中threadMode有五种模式:

  • (POSTING)同一个线程:表示订阅方法所处的线程和发布事件的线程是同一个线程。
  • (MAIN)主线程:如果发布事件的线程是主线程则直接执行,若不是主线程需回调到主线程执行
  • (MAIN_ORDERED)主线程顺序执行:订阅者在主线程中调用。但和 MAIN 模式不同的,是本模式的消息会直接进入属于主线程的消息队列中排队等待执行,而不是阻塞主线程直接运行。
  • (BACKGROUND)子线程:如果发布事件的线程是主线程,那么创建子线程执行;否则直接执行;
  • (ASYNC)异步线程:无论发布事件执行在主线程还是子线程,都利用一个子线程来执行订阅方法。
    这四种设置的实现原理就在上面这出代码里,前面也说过了其中mainThreadPoster、
    backgroundPoster、asyncPoster三个线程模型是实现这4中模式的关键。根据我们对四种模式的理解和源码逻辑不难看出,invokeSubscriber(subscription, event)方法的作用应该就是在当前线程调用订阅方法,而其中的mainThreadPoster.enqueue(subscription, event)作用应该是回到到主线程然后调用订阅方法,backgroundPoster.enqueue(subscription, event)和asyncPoster.enqueue(subscription, event)作用应该都是启动子线程调用订阅方法。现在我来分别分析一下这三个线程模型的源码,先来看mainThreadPoster,跟踪其源码至初始化代码处为:
ainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;

根据前面的分析知android环境下mainThreadSupport!=null,那么来看mainThreadSupport.createPoster(this) 方法源码:

public interface MainThreadSupport {

    boolean isMainThread();

    Poster createPoster(EventBus eventBus);

    class AndroidHandlerMainThreadSupport implements MainThreadSupport {
        
        private final Looper looper;

        public AndroidHandlerMainThreadSupport(Looper looper) {
            this.looper = looper;
        }

        @Override
        public boolean isMainThread() {
            return looper == Looper.myLooper();
        }

        @Override
        public Poster createPoster(EventBus eventBus) {
            return new HandlerPoster(eventBus, looper, 10);
        }
    }

}

可见mainThreadPoster其实为HandlerPoster对象该对象持有主线程的 looper对象。在分析HandlerPoster类之前我先来说明几个概念便于讲解HandlerPoster类的原理

Poster接口
发送事件是一个具体的行为,为此EventBus抽象出来一个Poster接口出来;其源码如下:

interface Poster {
   //由于发送事件有先后顺序,这里定义了一个enqueue方法
  //subscription:事件接收者
  //event:具体的事件
   void enqueue(Subscription subscription, Object event);
}

PendingPost
PendingPost为EventBus 中一个重要的类,其源码如下:

final class PendingPost {
    private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

    Object event;
    Subscription subscription;
    PendingPost next;

    private PendingPost(Object event, Subscription subscription) {
        this.event = event;
        this.subscription = subscription;
    }
    
    static PendingPost obtainPendingPost(Subscription subscription, Object event) {
       //从pendingPostPool获取一个PendingPost
       synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                //从集合末尾获取一个
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        //如果pendingPostPool没有,则创建一个
        return new PendingPost(event, subscription);
    }
    //释放PendingPost对象,其实为保留PendingPost对象,仅仅将其变量置null,类似倒了水的空瓶以备下次复用
    static void releasePendingPost(PendingPost pendingPost) {
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        synchronized (pendingPostPool) {
            // Don't let the pool grow indefinitely
            if (pendingPostPool.size() < 10000) {
                //将pendingPost对象放入pendingPost池中,便于之后复用
                pendingPostPool.add(pendingPost);
            }
        }
    }

}

通过该类的成员变量可以看出PendingPost除了封装了事件对象event,事件接收者对象Subscription之外,还弄了个next引用,说明其希望构建一个链表结构releasePendingPost。而pendingPostPool为PendingPost对象池用于保存PendingPost对象。其结合obtainPendingPost(Subscription subscription, Object event) 与releasePendingPost(PendingPost pendingPost) 方法实现PendingPost实例的复用线程池。首先所以我们来看看EventBus是怎么构建这个PendingPost链表的,具体的构建过程在PendingPostQueue类里面,先看看这个类的源码:

PendingPostQueue

final class PendingPostQueue {
    //链表头部引用
    private PendingPost head;
    //链表尾部引用
    private PendingPost tail;

    synchronized void enqueue(PendingPost pendingPost) {
        if (pendingPost == null) {
            throw new NullPointerException("null cannot be enqueued");
        }
        if (tail != null) {
            tail.next = pendingPost;
            tail = pendingPost;
        } else if (head == null) {
            //第一次执行是走这个分支
            head = tail = pendingPost;
        } else {
            throw new IllegalStateException("Head present, but no tail");
        }
        //唤醒等待的线程
        notifyAll();
    }

    synchronized PendingPost poll() {
        PendingPost pendingPost = head;
        if (head != null) {
            head = head.next;
            if (head == null) {
                tail = null;
            }
        }
        return pendingPost;
    }

    synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
        if (head == null) {
            wait(maxMillisToWait);
        }
        return poll();
    }

}

该类也算是EventBus的核心类之一,在AsyncPoster,BackgroundPost,HandlerPoster里都有用到,该类的作用就是构建具有head和tail引用的PendingPost链表结构。具体的构建行为在PendingPostQueue对象的enqueue方法里面,而其取出表头对象的逻辑在poll()方法里面,其构建了一个先进先出队列。

现在再回到HandlerPoster类的源码中就容易理解多了:

public class HandlerPoster extends Handler implements Poster {
    //维护PendingPost的队列
    private final PendingPostQueue queue;
    //handler中最长处理message的时间
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    //handler活跃状态标志
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
         //获取一个PendingPost对象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //将PendingPost入队到PendingPostQueue队列中
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                //如果handler处于非活跃状态则是其活跃起来
                handlerActive = true;
                //发送Message消息
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            //执行开始计时
            long started = SystemClock.uptimeMillis();
            //不断循环遍历PendingPostQueue中的PendingPost对象并处理直到队列为null
            while (true) {
                //取出一个PendingPost对象
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        //双重检查加锁后再次获取PendingPost对象
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            //如果取出的PendingPost对象仍然为null,则将handler设置为非活跃状态
                            handlerActive = false;
                            return;
                        }
                    }
                }
                //如果取出的PendingPost对象不为空则请求其封装的订阅方法
                eventBus.invokeSubscriber(pendingPost);
                //如果在规定的时间内还没有处理完毕,则重新回调handleMessage处理
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

可见HandlerPoster 为Handler子类,其又持有主线程的looper对象,所以它可通过hander机制与主线程进行通信。在postToSubscription(Subscription subscription, Object event, boolean isMainThread) 方法中如果threadMode为MAIN类型,但当前线程在子线程时会去调用mainThreadPoster.enqueue(subscription, event)方法,即HandlerPoster的enqueue(subscription, event)方法,其会将subscription, event封装成PendingPost对象并入队到维护的PendingPostQueue变量中,并发送Message消息触发UI线程去执行其定义的handleMessage(Message msg) 方法,方法中会不断循环获取链表中的PendingPost,调用PendingPost中封装的订阅方法,当链表中的事件处理完毕后退出while循环。或者是在规定的时间内maxMillisInsideHandleMessage没有将事件处理完,则继续调用sendMessage。继而重新回调handleMessage方法,事件会继续执行。(这是为了避免队列过长或者事件耗时较长时,while循环阻塞主线程造成卡顿。算是个小技巧。)从而达到通过handler通信机制完成订阅方法在主线程执行的任务。
接着再来分析一下BackgroundPoster ,其源码如下:

final class BackgroundPoster implements Runnable, Poster {
    //维护PendingPost的队列
    private final PendingPostQueue queue;
    private final EventBus eventBus;
    //线程池是否在运行的标志
    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        //获取一个PendingPost对象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //将PendingPost入队到PendingPostQueue队列中
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                如果线程池未在运行状态则使其运行
                executorRunning = true; 
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                //不断循环遍历PendingPostQueue中的PendingPost对象并处理直到队列为null
                while (true) {
                    //1、从连表中获取一个PendingPost,链表为null则等待一秒
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            //双重检查加锁后再次获取PendingPost对象
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                //如果还为null,认定此时没有事件产生
                                executorRunning = false;
                                //退出run方法
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

可见BackgroundPoster类的代码逻辑与HandlerPoster大同小异,只是BackgroundPoster实现了Runnable方法并在postToSubscription(Subscription subscription, Object event, boolean isMainThread) 方法中调用backgroundPoster.enqueue(subscription, event)方法后其会把自身交于EventBus自身维护的线程池ExecutorService去子线程中执行,即执行其实现的run()方法。其run()方法逻辑与HandlerPoster的handleMessage(Message msg) 逻辑相似,需要额外注意的是,在while循环中,其先调用poll(1000)方法,如果事件链表为null,则wait(1000),等待事件产生;而后再次在用poll()获取,如果此时仍然为null,则EventBus认为现阶段没有事件,则run方法执行结束,直到下次事件来临后继续调用eventBus.getExecutorService().execute(this)执行。还需要注意的是因为在fun方法中循环便利事件链表,所以为了防止事件阻塞,在使用ThreadModel.BACKGROUND的时候,要求此@Subricebe 方法不要执行耗时的逻辑。那么耗时的逻辑应该在哪儿执行呢?这就用到了ThreadModel.ASYNC模式,其实现依赖AsyncPoster类,由于AsyncPoster类与BackgroundPoster类内容除了实现的run()方法外其他内容相同,这里就只贴出核心的不同部分:

 //AsyncPoster的run方法
  public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }

可见其逻辑很简单就是获取链表中的一个事件,执行之;而不像BackGround那样循环整个事件链表挨个执行之。也因此不会导致事件阻塞。

分析道这里我们已经弄清楚了EventBus是如何控制订阅方法在哪个线程执行的了,其中在执行invokeSubscriber(pendingPost)方法源码中

void invokeSubscriber(PendingPost pendingPost) {
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        //前面说的PendingPost通过空瓶回收实现的PendingPost复用池
        PendingPost.releasePendingPost(pendingPost);
        if (subscription.active) {
            invokeSubscriber(subscription, event);
        }
    }

可以知道,不管使用哪种ThreadMode,其最终都会去调用invokeSubscriber(subscription, event)方法通过反射的方式调用订阅方法,源码如下:

void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

至此,EventBus的Post和执行订阅方法逻辑就分析完了,总结如下:


EventBus的Post和执行订阅方法逻辑

那EventBus发送粘性事件的逻辑是怎样的呢?一般情况,我们使用 EventBus 都是准备好订阅事件的方法,然后注册事件,最后在发送事件,即要先有事件的接收者。但粘性事件却恰恰相反,我们可以先发送事件,后续再准备订阅事件的方法、注册事件。

由于这种差异,我们分析粘性事件原理时,先从事件发送开始,发送一个粘性事件通过如下方式:

EventBus.getDefault().postSticky("Hello World!");

来看postSticky()方法是如何实现的:

 public void postSticky(Object event) {
        synchronized (stickyEvents) {
            stickyEvents.put(event.getClass(), event);
        }
        // Should be posted after it is putted, in case the subscriber wants to remove immediately
        post(event);
    }

postSticky()方法主要做了两件事,先将事件类型和对应事件保存到stickyEvents中,方便后续使用;然后执行post(event)继续发送事件,这个post()方法就是之前发送的post()方法。所以,如果在发送粘性事件前,已经有了对应类型事件的订阅者,及时它是非粘性的,依然可以接收到发送出的粘性事件。

发送完粘性事件后,再准备订阅粘性事件的方法,并完成注册。核心的注册事件流程还是我们之前的register()方法中的subscribe()方法,前边分析subscribe()方法时,有一段没有分析的代码,就是用来处理粘性事件的:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        ......
        ......
        ......
        // 如果当前订阅事件的方法的Subscribe注解的sticky属性为true,即该方法可接受粘性事件
        if (subscriberMethod.sticky) {
            // 默认为true,表示是否向上查找事件的父类
            if (eventInheritance) {
                // stickyEvents就是发送粘性事件时,保存了事件类型和对应事件
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    // 如果candidateEventType是eventType的子类或
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        // 获得对应的事件
                        Object stickyEvent = entry.getValue();
                        // 处理粘性事件
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

可以看到,处理粘性事件就是在 EventBus 注册时,遍历stickyEvents,如果当前要注册的事件订阅方法是粘性的,并且该方法接收的事件类型和stickyEvents中某个事件类型相同或者是其父类,则取出stickyEvents中对应事件类型的具体事件,做进一步处理。继续看checkPostStickyEventToSubscription()处理方法:

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
        if (stickyEvent != null) {
            postToSubscription(newSubscription, stickyEvent, isMainThread());
        }
    }

最终还是通过postToSubscription()方法完成粘性事件的处理,这就是粘性事件的整个处理流程。

第四步:解除注册订阅

当我们不在需要订阅EventBus发送的时间时,我们可以通过EventBus的unregister(Object subscriber)方法解除订阅,其源码如下:

public synchronized void unregister(Object subscriber) {
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber); // 获取订阅对象的所有订阅事件类列表
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unsubscribeByEventType(subscriber, eventType); // 将订阅者的订阅信息移除
            }
            typesBySubscriber.remove(subscriber); // 将订阅者从列表中移除
        } else {
            Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }

private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); // 获取事件类的所有订阅信息列表,将订阅信息从订阅信息集合中移除,同时将订阅信息中的active属性置为FALSE
        if (subscriptions != null) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    subscription.active = false; // 将订阅信息激活状态置为FALSE
                    subscriptions.remove(i); // 将订阅信息从集合中移除
                    i--;
                    size--;
                }
            }
        }
    }

可见解除注册订阅的流程为:

  1. 将订阅者的订阅信息从订阅信息集合中移除,同时将订阅信息激活状态置为FALSE
  2. 将订阅者信息从订阅者订阅类型集合中移除
    解除注册订阅的流程图如下:


    解除注册订阅的流程

至此我们对EventBus源码的逻辑和实现原理就分析完了。

补充

Subscriber Index

EventBus默认的实现是运行时通过反射来查找订阅事件的方法进行注册,如果项目中有大量的订阅事件的方法,必然会对项目运行时的性能产生影响。所以EventBus3.0以后。增加了EventBusAnnotationProcessor新特性用以优化其性能,其可以在编译期读取@Subscribe()注解并且解析其中所包含的信息生成一个辅助的索引类Subscriber Index来代替运行时通过反射查找注解方法导致的性能下降的问题,下面就来说说这种方式下实现逻辑。

使用Subscriber Index的前提

订阅者和订阅事件类必须是Public的,订阅者方法才能被索引到;另外,由于Java注解机制自身的限制,在匿名类中的@Subscribe注解是不能被识别的,从而不能生成索引。

假如订阅方法不满足这个前提也这不会造成致命问题,因为在之前订阅注册过程中存在这样的逻辑:如果使用不了索引,那么程序将自动使用反射机制。虽然这又回到了运行时期做事情的老路上,使得性能下降,但EventBus依然可以正常工作。

使用Subscriber Index需要的配置

Android Gradle 插件2.2.0之前,我们使用android-apt的方式,在编译期生成Subscriber Index类。所谓APT,即Annotation Processing Tool,其结合EventBus提供的EventBusAnnotationProcessor,就可以就在编译期,对源代码文件进行检测,找出其中的Annotation,从而生成目标文件,即Subscriber Index类。
然而,毕竟android-apt是第三方插件,自Android Gradle 插件2.2.0 之后,其官方提供了相同的能力,即annotationProcessor ,来完全代替 android-apt,配置起来更简单,推荐大家使用。
方式一:使用android-apt

buildscript {
    dependencies {
        classpath 'com.neenbedankt.gradle.plugins:android-apt:1.8'
    }
}
 
apply plugin: 'com.neenbedankt.android-apt'
 
dependencies {
    compile 'org.greenrobot:eventbus:3.0.0'
    apt 'org.greenrobot:eventbus-annotation-processor:3.0.1'
}
 
apt {
    arguments {
        eventBusIndex "com.monster.android.wild.MyEventBusIndex"
    }
}

其中com.monster.android.wild.MyEventBusIndex就是我们想要生成的Subscriber Index类。
方式二:使用annotationProcessor

android {
    defaultConfig {
        javaCompileOptions {
            annotationProcessorOptions {
                arguments = [eventBusIndex:'com.monster.android.wild.MyEventBusIndex']
            }
        }
    }
}

dependencies {
    compile 'org.greenrobot:eventbus:3.0.0'
    annotationProcessor 'org.greenrobot:eventbus-annotation-processor:3.0.1'
}   

同上,com.monster.android.wild.MyEventBusIndex就是我们想要生成的Subscriber Index类。可以看到,这样配置起来更简单。
在build.gradle中配置完毕之后,再进行编译,Subscriber Index类MyEventBusIndex就会自动为我们生成了,见下图工程目录中。

我们的工程中需要有使用EventBus相关的代码才能生成Subscriber Index类。

工程目录

Subscriber Index类MyEventBusIndex是如何生成的
<meta charset="utf-8">

首先,我们需要明确,MyEventBusIndex是在编译时期,通过第三方提供的android-apt或者Gradle本身提供的annotationProcessor,结合EventBus提供的EventBusAnnotationProcessor,共同协作而生成的。而EventBusAnnotationProcessor由EventBus提供,也就说明具体生成逻辑是由EventBus控制,这算是经典的模板模式。
下面,我们大致看一下EventBusAnnotationProcessor是如何定义生成逻辑并生成目标Java类文件的。大家可以从EventBus 的Github上面看到完整源代码,这里只是简要的阐述:

@SupportedAnnotationTypes("org.greenrobot.eventbus.Subscribe")
@SupportedOptions(value = {"eventBusIndex", "verbose"})
public class EventBusAnnotationProcessor extends AbstractProcessor {

    // process()是该类的核心函数,就是在这里,实现收集和评估注解的代码,以及生成Java文件
    @Override
    public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment env) {
    // 保存订阅者的订阅方法信息
    private final ListMap<TypeElement, ExecutableElement> methodsByClass = new ListMap<>();
    // 保存需要跳过的订阅者
    private final Set<TypeElement> classesToSkip = new HashSet<>();
    
        try {
            // 从build.gradle中读取到arguments,即com.monster.android.wild.MyEventBusIndex
            String index = processingEnv.getOptions().get(OPTION_EVENT_BUS_INDEX);
            // 收集,填充methodsByClass 
            collectSubscribers(annotations, env, messager);
            // 评估,填充classesToSkip 
            checkForSubscribersToSkip(messager, indexPackage);

            if (!methodsByClass.isEmpty()) {
                // 生成java文件(根据methodsByClass和classesToSkip)
                createInfoIndexFile(index);
            } else {
            }
        } catch (RuntimeException e) {
        }
        return true;
    }
}

    // 生成java文件,其中有一部分是hardcode
    private void createInfoIndexFile(String index) {
        BufferedWriter writer = null;
        try {
            JavaFileObject sourceFile = processingEnv.getFiler().createSourceFile(index);
            int period = index.lastIndexOf('.');
            String myPackage = period > 0 ? index.substring(0, period) : null;
            String clazz = index.substring(period + 1);
            writer = new BufferedWriter(sourceFile.openWriter());
            if (myPackage != null) {
                writer.write("package " + myPackage + ";\n\n");
            }
            writer.write("import org.greenrobot.eventbus.meta.SimpleSubscriberInfo;\n");
            writer.write("import org.greenrobot.eventbus.meta.SubscriberMethodInfo;\n");
            writer.write("import org.greenrobot.eventbus.meta.SubscriberInfo;\n");
            writer.write("import org.greenrobot.eventbus.meta.SubscriberInfoIndex;\n\n");
            writer.write("import org.greenrobot.eventbus.ThreadMode;\n\n");
            writer.write("import java.util.HashMap;\n");
            writer.write("import java.util.Map;\n\n");
            writer.write("/** This class is generated by EventBus, do not edit. */\n");
            writer.write("public class " + clazz + " implements SubscriberInfoIndex {\n");
            writer.write("    private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;\n\n");
            writer.write("    static {\n");
            writer.write("        SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();\n\n");
            writeIndexLines(writer, myPackage);
            writer.write("    }\n\n");
            writer.write("    private static void putIndex(SubscriberInfo info) {\n");
            writer.write("        SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);\n");
            writer.write("    }\n\n");
            writer.write("    @Override\n");
            writer.write("    public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {\n");
            writer.write("        SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);\n");
            writer.write("        if (info != null) {\n");
            writer.write("            return info;\n");
            writer.write("        } else {\n");
            writer.write("            return null;\n");
            writer.write("        }\n");
            writer.write("    }\n");
            writer.write("}\n");
        } catch (IOException e) {
        } finally {
        }
    }

接下来我们看看生成的Subscriber Index类MyEventBusIndex

/** This class is generated by EventBus, do not edit. */
public class MyEventBusIndex implements SubscriberInfoIndex {
    private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;

    static {
        //用来保存当前注册类的 Class 类型和其中事件订阅方法的信息。
        SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();

        // 从以上createInfoIndexFile()的实现来看,除了类名MyEventBusIndex,只有这一段代码是非hardcode。
        // 以订阅者类为单位,将该订阅者类中的所有订阅函数以及相关参数,封装到SimpleSubscriberInfo类对象中,
        // 以供EventBus在注册过程中使用。注意SimpleSubscriberInfo类对象是在编译时期就生成的,
        // 因而在运行时期可以直接使用,省去了运行时期通过反射做相似操作的时间和资源消耗,从而提高效率,这里就是全文的精髓所在。
        putIndex(new SimpleSubscriberInfo(com.monster.android.wild.myeventbusdemo.MainActivity.class, true,
                new SubscriberMethodInfo[] {
            new SubscriberMethodInfo("onEvent",
                    com.monster.android.wild.myeventbusdemo.MainActivity.EventBusEvent.class, ThreadMode.MAIN),
        }));

    }

    private static void putIndex(SubscriberInfo info) {
        SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
    }

    // 将会在EventBus在注册过程中使用,等会大家会看到
    @Override
    public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
        SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
        if (info != null) {
            return info;
        } else {
            return null;
        }
    }
}

接着,我们想使用生成的Subscriber Index类,就可以在构造EventBus时,如下这样

EventBus eventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();

但如果每次都这样写一大串,未免有些繁琐。既然我们引入了Subscriber Index,我们当然希望我们的工程中全部使用这样方式,这时,就可以这样

// 只设置一次,并且要在我们第一次使用EventBus之前进行设置
EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();
// 这样每次获取到默认实例,都是使用Subscriber Index的了,代码得到了精简。
EventBus eventBus = EventBus.getDefault();

接下来分析下使用 Subscriber Index 时 EventBus 的注册流程,我们先分析:

EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();

首先创建一个EventBusBuilder,然后通过addIndex()方法添加索引类的实例:

public EventBusBuilder addIndex(SubscriberInfoIndex index) {
        if (subscriberInfoIndexes == null) {
            subscriberInfoIndexes = new ArrayList<>();
        }
        subscriberInfoIndexes.add(index);
        return this;
    }

即把生成的索引类MyEventBusIndex的实例保存在subscriberInfoIndexes集合中,然后用installDefaultEventBus()创建默认的 EventBus实例:

public EventBus installDefaultEventBus() {
        synchronized (EventBus.class) {
            if (EventBus.defaultInstance != null) {
                throw new EventBusException("Default instance already exists." +
                        " It may be only set once before it's used the first time to ensure consistent behavior.");
            }
            EventBus.defaultInstance = build();
            return EventBus.defaultInstance;
        }
    }
public EventBus build() {
        // this 代表当前EventBusBuilder对象
        return new EventBus(this);
    }

即用当前EventBusBuilder对象创建一个 EventBus 实例,这样我们通过EventBusBuilder配置的 Subscriber Index 也就传递到了EventBus实例中,然后赋值给EventBus的 defaultInstance成员变量。之前我们在分析 EventBus 的getDefault()方法时已经见到了defaultInstance:

public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
    }

所以在 Application 中生成了 EventBus 的默认单例,这样就保证了在项目其它地方执行EventBus.getDefault()就能得到唯一的 EventBus 实例!之前在分析注册流程时有一个
方法findUsingInfo():

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        // 通过FindState对象来存储找到的方法信息
        FindState findState = prepareFindState();
        // 初始化FindState
        findState.initForSubscriber(subscriberClass);
        // 从当前类开始循环遍历该类的所有父类
        while (findState.clazz != null) {
            // 1 如果我们通过EventBusBuilder配置了MyEventBusIndex,便会通过getSubscriberInfo(findState)方法获取到subscriberInfo ,通常情况下我们使用EventBus时并没有配置
            findState.subscriberInfo = getSubscriberInfo(findState);
            if (findState.subscriberInfo != null) {
                // 如果使用了MyEventBusIndex,将会进入到这里并获取订阅方法信息
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                // 未使用MyEventBusIndex将会进入这里使用反射获取订阅方法信息
                findUsingReflectionInSingleClass(findState);
            }
            // 将findState.clazz设置为当前的findState.clazz的父类
            findState.moveToSuperclass();
        }
        //返回查找到的所有订阅信息并释放FindState对象
        return getMethodsAndRelease(findState);
    }

当我们使用了MyEventBusIndex时再来看看注释1处的逻辑,getSubscriberInfo(findState)的源码如下:

private SubscriberInfo getSubscriberInfo(FindState findState) {
        // 该条件不成立
        if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
            SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
            if (findState.clazz == superclassInfo.getSubscriberClass()) {
                return superclassInfo;
            }
        }
        // 该条件成立
        if (subscriberInfoIndexes != null) {
            // 遍历索引类实例集合
            for (SubscriberInfoIndex index : subscriberInfoIndexes) {
                // 根据注册类的 Class 类查找SubscriberInfo
                SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
                if (info != null) {
                    return info;
                }
            }
        }
        return null;
    }

其中subscriberInfoIndexes就是在前边addIndex()方法中创建的保存了项目中索引类MyEventBusIndex的实例
,遍历并调用MyEventBusIndex类对象的getSubscriberInfo()方法便得到了封装了订阅方法信息的SubscriberInfo对象。至此,代码逻辑就捋顺了~~~
总结
到这里,如何生成Subscriber Index类以及如何使用Subscriber Index类就分析完毕了。分析过程中,大家应该就可以看到了,核心思想就是把运行时所需要做的耗时操作转移到了编译时期去做,从而提高了整体性能。

参考文章
https://www.jianshu.com/p/e9fc6484bd9d
https://www.jianshu.com/p/d9516884dbd4

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容