博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
EventBus3.0源码解析
阅读量:6218 次
发布时间:2019-06-21

本文共 30224 字,大约阅读时间需要 100 分钟。

EventBus 是一个用于组件间通信的框架。它为开发提供一种非常简便的方式来是实现组件间解耦通信,并且提供了线程切换、优先级设置等功能。

从官方的示意图中不难看出,EventBus使用的是观察者模式Subscriber注册到EventBus, 当Publisher使用post方法将Event发送给EventBusEventBus就会回调SubscriberonEvent方法。观察者模式能将观察者和订阅者最大程度的解耦,这也是EventBus的功能所在。

具体用法就不多说了,具体可见官方主页

本文解析使用的EventBus版本是3.0.0

进行源码分析之前,先思考一下,如果是自己,会如何实现?

首先,我们需要将注册的类、声明的订阅方法,以及方法执行的线程、优先级都保存下来; 其次,我们要可以根据接收的事件去快速查找到对应的订阅者,当有消息通知时,可以高效通知; 再者,我们还需要自己去处理线程间的切换以满足不同的应用场景; 最后,我们应该提供注销功能以便取消消息订阅。

带着思路,一起来看看源码是如何一步一步实现的。本文根据我们使用EventBus的步骤来进行讲解的。

注解@Subscribe

在声明订阅方法时,要求使用 @Subscribe注解,先来看下它的具体定义

@Documented@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.METHOD})public @interface Subscribe {    ThreadMode threadMode() default ThreadMode.POSTING;    boolean sticky() default false;    int priority() default 0;}复制代码

先来看三个元注解:

@Documented :生成java文档时,会将该注解也写进文档里

@Retention(RetentionPolicy.RUNTIME):有效周期是在运行时

@Target({ElementType.METHOD}):指定对方法有效。

注解里声明了三个成员变量:

  • threadMode(订阅方法执行的线程)
  • sticky(是否是粘性事件)
  • priority(优先级)。

ThreadMode是一个枚举类,定义了四种线程模式:

public enum ThreadMode {    POSTING,    MAIN,    BACKGROUND,    ASYNC}复制代码

POSTING: 和发送事件的线程在同一个线程,避免了线程切换开销。

MAIN:订阅在主线程,事件将排队等待交付(非阻塞)。使用此模式的订阅者必须快速返回,以避免阻塞主线程。

BACKGROUND:如果是在主线程发布,则会订阅在一个后台线程,依次排队执行;如果不是在主线程发布,则会订阅在发布所在的线程。

ASYNC: 在非主线程和发布线程中订阅。当处理事件的方法 是耗时的,需要使用此模式。尽量避免同时触发大量的耗时较长的异步操作,EventBus 使用线程池高效的复用已经完成异步操作的线程。

EventBus 之所以要求在订阅方法上加上@Subscribe注解,就是相当于给订阅者打标签,框架根据注解去找到订阅者。

注册

先来看看注册的过程

public void register(Object subscriber) {    //拿到订阅者的运行时类型Class    Class
subscriberClass = subscriber.getClass(); //利用订阅者的Class去查找类中声明的订阅方法 List
subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); //循环遍历逐个将订阅者和订阅方法订阅到EventBus synchronized (this) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } }}复制代码

主要有三步,第一步拿到订阅者的类对象,第二步通过订阅者类对象找到类中的所有订阅方法,第三步进行订阅。

查找订阅方法

在上述第二步使用了一个SubscriberMethodFinder实例来进行方法查找。SubscriberMethodFinder这个类是专门用来查找订阅方法的,findSubscriberMethods()最后返回了一个SubscriberMethod集合。SubscriberMethod类则就是对我们声明的订阅方法和参数的封装。可以先略过。

接着看findSubscriberMethods()是如何通过订阅者的Class对象来进行方法查找的。

List
findSubscriberMethods(Class
subscriberClass) { //从缓存中查找,如果已经有,则直接返回。 List
subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null) { return subscriberMethods; } //如果缓存中没有查找到,就通过订阅者Class对象进行查找。 if (ignoreGeneratedIndex) { subscriberMethods = findUsingReflection(subscriberClass); } else { // 使用apt提前解析的订阅者信息 subscriberMethods = findUsingInfo(subscriberClass); } // 将查找到的方法存入缓存并返回 if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation"); } else { METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } }复制代码

这里做了个缓存处理METHOD_CACHE,因为查找是比较耗时的操作,缓存可以提高效率。

private static final Map
, List
> METHOD_CACHE = new ConcurrentHashMap<>();复制代码

METHOD_CACHE是一个以订阅者Class为Key, 订阅方法集合为Value的线程安全的的HashMap

第一次执行肯定是没有缓存的,然后会根据ignoreGeneratedIndex来执行不同的方法。从方法名来看,一个是使用反射去查找,另一个是使用已有的信息去查找。

其实这里就是3.0.0引入的优化点:3.0.0引入了APT(Annotation Processing Tool),它可以在编译阶段就提前解析注解,提前检索到订阅方法。这样就提高了运行时效率。

ignoreGeneratedIndex这个值默认是false,因为反射开销大,所以默认是走findUsingInfo()分支,但是在findUsingInfo()方法中会检查本地是否有apt预先解析出的订阅者信息,如果没有,还是会执行反射方法findUsingReflectionInSingleClass()

启动apt的部分涉及到了注解处理器,所以会单独起一篇来讲解,先来看看不使用apt的情况:

private List
findUsingReflection(Class
subscriberClass) { //准备一个FindeState实例 FindState findState = prepareFindState(); //将订阅者Class与FindeState实例关联 findState.initForSubscriber(subscriberClass); //从子类到父类去逐一查找订阅方法 while (findState.clazz != null) { //使用反射查找一个类的订阅方法 findUsingReflectionInSingleClass(findState); //将父类赋给findState.clazz,往上进行查找 findState.moveToSuperclass(); } //返回订阅方法集合并回收FindState实例 return getMethodsAndRelease(findState); }复制代码

这里去查找订阅方法,因为子类会继承父类的方法,所以当子类找不到时,需要去查找父类。这里使用了一个while循环递归进行查找。查找过程使用了一个新的对象FindState,它是用来存储查找过程中的一些信息,方便进行迭代查找。它的类定义:

static class FindState {        //订阅方法集合        final List
subscriberMethods = new ArrayList<>(); final Map
anyMethodByEventType = new HashMap<>(); final Map
subscriberClassByMethodKey = new HashMap<>(); final StringBuilder methodKeyBuilder = new StringBuilder(128); //当前订阅者 Class
subscriberClass; //当前查找的类 Class
clazz; //是否跳过父类查找 boolean skipSuperClasses; SubscriberInfo subscriberInfo; }复制代码

接着往下走,进入真正开始使用反射解析一个类的订阅方法:findUsingReflectionInSingleClass()

private void findUsingReflectionInSingleClass(FindState findState) {        Method[] methods;        try {            // 只获取非继承的方法,这个方法比getMethods()方法效率高,比如对一个Activity来说。 findState.clazz.getDeclaredMethods();        } catch (Throwable th) {            // 异常则直接获取所有方法(包括继承的), 这样就不用再检查父类了。            methods = findState.clazz.getMethods();            findState.skipSuperClasses = true;        }        // 遍历所有方法筛选出订阅方法        for (Method method : methods) {            int modifiers = method.getModifiers();            // 必须是public的、非静态、非抽象的方法            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {                //获取到所有参数的类型                Class
[] parameterTypes = method.getParameterTypes(); // 只能有一个参数 if (parameterTypes.length == 1) { //获取@Subscribe注解的方法 Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null) { Class
eventType = parameterTypes[0]; //检查该eventType是否已订阅了,通常订阅者不能有多个 eventType 相同的订阅方法 if (findState.checkAdd(method, eventType)) { ThreadMode threadMode = subscribeAnnotation.threadMode(); //将订阅方法和对应的接收的Event类型以及注解参数保存 findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length); } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract"); } } }复制代码

这个方法主要是通过修饰符、参数个数、指定注解和是否有EventType相同的方法这几层筛选,最终将订阅方法添加进 findStatesubscriberMethods 这个 List 中。

其中重点有一个判断:findState.checkAdd(),这个方法决定了是否订阅方法可以被保存下来进而能接收到消息。一起来看看它是如何判断的:

private boolean checkAdd(Method method, Class
eventType) { // 1.检查eventType是否已经注册过对应的方法(一般都没有) Object existing = anyMethodByEventType.put(eventType, method); if (existing == null) { return true; } else { // 2. 如果已经有方法注册了这个eventType if (existing instanceof Method) { if (!checkAddWithMethodSignature((Method) existing, eventType)) { throw new IllegalStateException(); } anyMethodByEventType.put(eventType, this); } return checkAddWithMethodSignature(method, eventType); }}复制代码

进行了两种检查,第一种是判断当前类中是否已经有这个EventType和对应的订阅方法,一般一个类不会有对同一个EventType写多个方法,会直接返回true,进行保存。

但是如果出现了同一个类中同样的EventType写了多个方法,该如何处理?

还有当findUsingReflection()中进行下一轮循环,会进行父类查找,如果子类继承了父类的订阅方法,又该如何处理呢?

答案就在上边的注释2。关键点就是checkAddWithMethodSignature()方法:

private boolean checkAddWithMethodSignature(Method method, Class
eventType) { // 以[方法名>eventType]为Key methodKeyBuilder.setLength(0); methodKeyBuilder.append(method.getName()); methodKeyBuilder.append('>').append(eventType.getName()); String methodKey = methodKeyBuilder.toString(); // 拿到新的订阅方法所属类 Class
methodClass = method.getDeclaringClass(); Class
methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass); if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) { // methodClassOld == null或者 methodClassOld是methodClass的父类/同一个类 return true; } else { // Revert the put, old class is further down the class hierarchy subscriberClassByMethodKey.put(methodKey, methodClassOld); return false; } }复制代码

对于同一类中同样的EventType写了多个方法,因为方法名不同,所以[方法名>eventType]Key不同,methodClassOld会为null,直接返回 true。所以这种情况会将所有相同EventType的方法都进行保存。

对于子类重写父类方法的情况,则methodClassOld(即子类)不为null,并且methodClassOld也不是methodClass的父类,所以会返回false。即对于子类重写父类订阅方法,只会保存子类的订阅方法,忽略父类的订阅方法。

至此,findState的查找任务就结束了,通过循环向父类查找,将订阅者的订阅方法都保存在了其内部变量subscriberMethods列表中。

最后,跳出循环,回到findUsingReflection()方法中,最后返回了 getMethodsAndRelease(findState)

private List
getMethodsAndRelease(FindState findState) { List
subscriberMethods = new ArrayList<>(findState.subscriberMethods); findState.recycle(); synchronized (FIND_STATE_POOL) { for (int i = 0; i < POOL_SIZE; i++) { if (FIND_STATE_POOL[i] == null) { FIND_STATE_POOL[i] = findState; break; } } } return subscriberMethods; }复制代码

很好理解,就是将findState中的subscriberMethods取出并返回。可以看到作者还是很细心的,将使用完的findState实例置空恢复后又放回实例池中,将实例回收利用,节省了新的开销。

再回到findSubscriberMethods()方法中,将查找的方法最后都存进了内存缓存METHOD_CACHE中, 对应关系是订阅类和它的订阅方法:

METHOD_CACHE.put(subscriberClass, subscriberMethods);复制代码

到这里,查找订阅方法就结束了。

订阅(subscribe())

回到register()方法中的第三步,对上一步查找到的订阅方法集合进行了遍历调用subscribe()方法。来看看subscribe()方法做了什么事:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {        Class
eventType = subscriberMethod.eventType; // 实例一个Subscription对象,内部持有了订阅者和订阅方法 Subscription newSubscription = new Subscription(subscriber, subscriberMethod); // subscriptionsByEventType是以EventType为Key,以它对应Subscription集合的Map CopyOnWriteArrayList
subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } // 根据优先级排序Subscription int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break; } } // typesBySubscriber保存了订阅者对应的所有EventType List
> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); // 粘性订阅方法要立即处理 if (subscriberMethod.sticky) { // 默认为true if (eventInheritance) { // 看当前EventType是否是已有的stickyEvent的子类或父类 Set
, Object>> entries = stickyEvents.entrySet(); for (Map.Entry
, Object> entry : entries) { Class
candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); // 立即投递事件 checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } }复制代码

这里其实只做了三件事:

  1. 将订阅者和订阅方法封装到subscriptionsByEventType,它可以根据EventType拿到所有的Subscription对象,Subscription对象中就有订阅者和订阅方法。这样当有EventType消息过来时,可以快速的传递给订阅者的订阅方法。
  2. 将订阅者和订阅方法封装到typesBySubscriber,它可以根据订阅类拿到所有的EventType。这样当我们调用调用 unregister(this) 时,就可以拿到EventType,又根据EventType拿到所有订阅者和方法,进行解绑了。
  3. 如果当前订阅方法是粘性方法,则立即去查找是否有本地事件,有的话就立即投递。

至此,我们的注册就完成了。可以看到某些方法栈的调用还是非常深的,但是整体流程却很简单。这也是值得我们学习的地方。

注销

注册对应的就是注销,当我们的订阅者不再需要接收事件时,可以调用unregister进行注销:

public synchronized void unregister(Object subscriber) {        // 通过订阅者拿到它订阅的所有的订阅事件类型        List
> subscribedTypes = typesBySubscriber.get(subscriber); if (subscribedTypes != null) { // 遍历事件类型集合,根据事件类型解绑 for (Class
eventType : subscribedTypes) { unsubscribeByEventType(subscriber, eventType); } // 从记录中移除订阅者 typesBySubscriber.remove(subscriber); } else { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } }复制代码

代码很简单,继续看unsubscribeByEventType

private void unsubscribeByEventType(Object subscriber, Class
eventType) { // 根据事件类型拿到所有的Subscription List
subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null) { // 遍历所有Subscription,符合解除条件的进行remove int size = subscriptions.size(); for (int i = 0; i < size; i++) { Subscription subscription = subscriptions.get(i); if (subscription.subscriber == subscriber) { subscription.active = false; subscriptions.remove(i); i--; size--; } } } }复制代码

相比注册流程,注销流程就非常简单了,就是把对应的注册者和对应的注册信息从记录中移除即可。

发送事件

注册完毕后,我们的订阅者和订阅方法都被记录在了EventBus里,这时就可以给订阅者们发送事件了。EventBus提供了两种发送方法post()postSticky()post()发送的是非粘性的事件,postSticky()发送的是粘性事件。

post

先来看看post()发送:

public void post(Object event) {        // 从当前线程中取出PostingThreadState        PostingThreadState postingState = currentPostingThreadState.get();        // 拿到EventType队列        List eventQueue = postingState.eventQueue;        eventQueue.add(event);        // 当前线程是否有消息正在投递        if (!postingState.isPosting) {            postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();            //正在投递            postingState.isPosting = true;            if (postingState.canceled) {                throw new EventBusException("Internal error. Abort state was not reset");            }            try {                // 循环一个个进行投递                while (!eventQueue.isEmpty()) {                    // 投递出去就remove掉                    postSingleEvent(eventQueue.remove(0), postingState);                }            } finally {                // 所有消息都投递完成                postingState.isPosting = false;                postingState.isMainThread = false;            }        }    }复制代码

第一步使用ThreadLocal,是因为ThreadLocal保证了数据只对当前线程可见,其他线程是不可见的,这样的话当我们从不同的线程中去取数据,数据相当于是分开保存,设置和读取就会比较快。从当前线程中取出的是一个PostingThreadState:

final static class PostingThreadState {        final List eventQueue = new ArrayList();        boolean isPosting;        boolean isMainThread;        Subscription subscription;        Object event;        boolean canceled;    }复制代码

PostingThreadState 主要包含了当前线程的Event队列、订阅者信息、事件等数据。接下来就是循环调用postSingleEvent()方法进行投送了:

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {        Class
eventClass = event.getClass(); boolean subscriptionFound = false; // 如果是可继承的事件 if (eventInheritance) { // 查找Event的所有父类、接口类以及父类的接口类(Event的父类和接口也是Event) List
> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class
clazz = eventTypes.get(h); // 根据查找到的所有Class(Event),逐个寻找订阅者,进行分发event subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { // 如果不是可继承的事件,则直接对 subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } // 如果没有找到订阅者,报异常 if (!subscriptionFound) { if (logNoSubscriberMessages) { Log.d(TAG, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this, event)); } } }复制代码

这个方法的作用很简单,就是做了个分支处理:如果是可继承的事件,则查找到它的所有父事件,然后再往下分发。继续看postSingleEventForEventType()方法:

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class
eventClass) { CopyOnWriteArrayList
subscriptions; // 可能同时在多个线程同时发送Event,subscriptionsByEventType是共有常量,所以需要加锁 synchronized (this) { // 根据Event的类型拿到所有订阅者 subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { // 事件还是Event,但是可能会分发到订阅它父类的订阅者中 postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { // 逐个通知订阅者 postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; }复制代码

这个方法和上一个方法的职责也很单一,就是查找所有订阅者,然后遍历进行通知。查找用的是一个Map: postToSubscription,就是在订阅时生成的一个[EventType -> List<Subscription>] Map, 这样我们就可以根据EventType查找到所有订阅者。接着看postToSubscription()

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {        // 根据订阅线程模式进行switch        switch (subscription.subscriberMethod.threadMode) {            case POSTING:                // 直接调用在本线程                invokeSubscriber(subscription, event);                break;            case MAIN:                if (isMainThread) {                    // 如果就在主线程,则直接调用                    invokeSubscriber(subscription, event);                } else {                    // 如果不在主线程,则使用mainThreadPoster                    mainThreadPoster.enqueue(subscription, event);                }                break;            case BACKGROUND:                if (isMainThread) {                    // 如果在主线程,使用backgroundPoster                    backgroundPoster.enqueue(subscription, event);                } else {                    // 如果不在主线程,则直接在当前线程调用                    invokeSubscriber(subscription, event);                }                break;            case ASYNC:                // 启动新的线程调用,asyncPoster                asyncPoster.enqueue(subscription, event);                break;            default:                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);        }    }复制代码

这里就是EventBus的线程切换了,主要有POSTINGMAINBACKGROUNDASYNC四种模式,四种模式在开篇已经介绍过了。这里涉及到了invokeSubscriber()方法和mainThreadPosterbackgroundPosterasyncPoster三个poster,接下来我们就分别来看下:

invokeSubscriber()

void invokeSubscriber(Subscription subscription, Object event) {        try {            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);        } catch (InvocationTargetException e) {            handleSubscriberException(subscription, event, e.getCause());        } catch (IllegalAccessException e) {            throw new IllegalStateException("Unexpected exception", e);        }    }复制代码

invokeSubscriber()就是直接在当前线程调用了订阅者的method对象,这里调用了反射类Method的方法invoke()直接调用执行。

mainThreadPoster

mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);复制代码

mainThreadPoster是个自定义的类HandlerPoster,它的目的是在主线程中调用订阅方法,而EventBus使用的就是我们熟悉的Handler

final class HandlerPoster extends Handler {    private final PendingPostQueue queue;    private final int maxMillisInsideHandleMessage;    private final EventBus eventBus;    private boolean handlerActive;    HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {        super(looper);        // 用来最后调用方法        this.eventBus = eventBus;        // 最大处理时间        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;        // 一个待处理消息的队列        queue = new PendingPostQueue();    }        ...    ...}复制代码

HandlerPoster是自定义的Handler,发送消息使用的是Looper.getMainLooper()即主线程的Handler。 内部定义了一个最大处理消息时间,默认是10毫秒,所以说我们一定不要在订阅方法中做耗时操作。还维护了一个PendingPostQueue,它是自定义的一个链表队列,这里猜测HandlerPoster可能是自己维护了消息队列,来看下入队方法:

void enqueue(Subscription subscription, Object event) {        // 获取一个PendingPost实例        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);        synchronized (this) {            // 入队            queue.enqueue(pendingPost);            if (!handlerActive) {                handlerActive = true;                // 主线程的handler发送消息,发送到主线程                if (!sendMessage(obtainMessage())) {                    throw new EventBusException("Could not send handler message");                }            }        }    }复制代码

这里出现了一个新的类PendingPost,它封装了订阅者subscription实例和事件event实例;它内部又维护了一个大小为10000PendingPost(数组集合)池,用来重复利用PendingPost实例。然后将PendingPost实例入队到上一步说的PendingPostQueue队列中。接着使用主线程的Handler发送一个消息。接下来就是在handleMessage()中如何处理消息了:

@Override    public void handleMessage(Message msg) {        boolean rescheduled = false;        try {            long started = SystemClock.uptimeMillis();            while (true) {                // 从队列中取出一个pendingPost                PendingPost pendingPost = queue.poll();                // 如果队列里的消息处理完毕,就直接跳出循环。                if (pendingPost == null) {                    synchronized (this) {                        // Check again, this time in synchronized                        pendingPost = queue.poll();                        if (pendingPost == null) {                            handlerActive = false;                            return;                        }                    }                }                // 调用订阅方法并会回收pendingPost                eventBus.invokeSubscriber(pendingPost);                long timeInMethod = SystemClock.uptimeMillis() - started;                // 如果方法的执行时间超过最大执行时间(默认10毫秒)                if (timeInMethod >= maxMillisInsideHandleMessage) {                    if (!sendMessage(obtainMessage())) {                        throw new EventBusException("Could not send handler message");                    }                    rescheduled = true;                    return;                }            }        } finally {            handlerActive = rescheduled;        }    }复制代码

这里就是使用了while循环,不断从队列中去取PendingPost处理,但是加了个最大执行时间处理,因为是在主线程调用,所以一旦超时,就退出队列,并重新尝试去再进入队列。

BackgroundPoster

再来看看BackgroundPoster

BackgroundPoster的作用是将UI线程的订阅方法调度在非UI线程中。即它是要执行在新的Thread中的,而开启线程我们最常用的就是Runnable, 来看看源码:

final class BackgroundPoster implements Runnable {    private final PendingPostQueue queue;    private final EventBus eventBus;    private volatile boolean executorRunning;    BackgroundPoster(EventBus eventBus) {        this.eventBus = eventBus;        queue = new PendingPostQueue();    }        ...    ...}复制代码

果不其然,BackgroundPoster实现了Runnable接口,这样就可以被线程执行。其内部也是维护了EventBus和一个PendingPost队列。

public void enqueue(Subscription subscription, Object event) {        // 从消息池中构建一个PendingPost        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);        synchronized (this) {            // 入队            queue.enqueue(pendingPost);            if (!executorRunning) {                executorRunning = true;                // 线程池调度执行                eventBus.getExecutorService().execute(this);            }        }    }复制代码

HandlerPoster类似:新建一个新的PendingPost入队,使用了EventBus里的一个ExecutorService,它是对线程池定义的一个接口,来看看它的默认值:

public class EventBusBuilder {    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();        ...    ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;}复制代码

熟悉的Executors,创建了一个可缓存的线程池,用来执行BackgroundPoster这个Runnable 对象,再来看看BackgroundPosterrun()方法:

@Override    public void run() {        try {            try {                while (true) {                    // 取出队头                    PendingPost pendingPost = queue.poll(1000);                    // 如果队头为空,说明队列里没有消息了,直接退出循环                    if (pendingPost == null) {                        synchronized (this) {                            // Check again, this time in synchronized                            pendingPost = queue.poll();                            if (pendingPost == null) {                                executorRunning = false;                                return;                            }                        }                    }                    // 调用订阅方法                    eventBus.invokeSubscriber(pendingPost);                }            } catch (InterruptedException e) {                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);            }        } finally {            executorRunning = false;        }    }复制代码

也和HandlerPoster类似,但是不同之处在于:

  1. 使用了poll(int maxMillisToWait)方法,这个设计很巧妙,当取到最后发现队列为空后,会wait 1000 毫秒,当有有新的信息来临时就会唤醒线程,poll出消息。这样设计就减少了发送消息的次数,节省了资源。
  2. 因为是在子线程执行,所以就没有方法执行时间的限制了。

AsyncPoster

class AsyncPoster implements Runnable {    private final PendingPostQueue queue;    private final EventBus eventBus;    AsyncPoster(EventBus eventBus) {        this.eventBus = eventBus;        queue = new PendingPostQueue();    }    public void enqueue(Subscription subscription, Object event) {        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);        queue.enqueue(pendingPost);        //直接开启新的线程执行        eventBus.getExecutorService().execute(this);    }    @Override    public void run() {        //  直接取出消息执行        PendingPost pendingPost = queue.poll();        if(pendingPost == null) {            throw new IllegalStateException("No pending post available");        }        eventBus.invokeSubscriber(pendingPost);    }}复制代码

可以看到AsyncPosterBackgroundPoster``非常的相似,因为它们的功能也非常相似。但是不同之处在于:BackgroundPoster是尽可能使用一个后台线层去依次排队执行订阅方法;而AsyncPoster则是每条消息都直接开启新的后台线程立即执行。

至此四个Poster就讲完了,看完是真的爽,不论是从功能抽象到具体细节的把控,EventBus都处理的很好,非常值得学习。

粘性事件

粘性事件这名字一听很耳熟,没错,安卓四大组件之BroadcastReceiver就有一种广播叫做粘性广播(StickyBroadcast),而EventBus也提供了类似的功能:当注册了粘性事件后,立即能收到还没有注册时系统发出的最后一个事件。

public void postSticky(Object event) {        // 将粘性事件保存下来        synchronized (stickyEvents) {            stickyEvents.put(event.getClass(), event);        }        // Should be posted after it is putted, in case the subscriber wants to remove immediately        post(event);    }复制代码

postSticky()方法用来发送一个粘性事件,在这个方法中,直接将粘性事件保存在了一个Map集合中,而key就是Event的Class对象。接着就调用正常的post()方法了。

那为什么我们后注册的方法也能接收到之前发出的粘性事件呢,答案就在上面提到的注册方法subscribe()中的最后一段:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {    ...        // 如果是粘性事件,则直接发送出去    if (subscriberMethod.sticky) {            if (eventInheritance) {                // 从stickyEvents取出粘性事件的Class对象                Set
, Object>> entries = stickyEvents.entrySet(); for (Map.Entry
, Object> entry : entries) { Class
candidateEventType = entry.getKey(); // 如果订阅的事件是保存的粘性事件Class或它的父类 if (eventType.isAssignableFrom(candidateEventType)) { // 取出缓存的Event Object stickyEvent = entry.getValue(); // 将缓存的Event发送出去 checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } }}复制代码

在我们注册订阅方法和事件时,如果是粘性事件,就直接会将事件发送给注册了相同Event的订阅者,方法中调用了checkPostStickyEventToSubscription(newSubscription, stickyEvent)方法:

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {        if (stickyEvent != null) {            // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)            // --> Strange corner case, which we don't take care of here.            postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper());        }    }复制代码

很简单,直接又调用了postToSubscription()方法,根据指定线程分别进行分发。

总结

整体看下来,框架的结构非常的清晰全面,尤其在对方法功能的封装和细节处理上,很是值得学习。尤其在3.0后,EventBus使用了注解处理器在编译阶段就完成了订阅者的解析,这使得框架更佳的轻量和高效。整片源码读下来还是很爽的,没事还要再复习多看几遍,很多东西都值得学习。共勉!

转载于:https://juejin.im/post/5cb431c9e51d456e5c5baacb

你可能感兴趣的文章
通过Visual Studio为Linux编写C++代码
查看>>
利用Apache Spark SQL和DataFrames扩展关系数据库
查看>>
Netflix 混沌工程手册 Part 3:实践方法
查看>>
2018年开源状况:代码贡献超310亿行,而漏洞超16000个
查看>>
Java初学者如何能够把知识深入贯彻
查看>>
仅售99美元!英伟达发布最小AI计算机Jetson Nano
查看>>
写守护进程时, 需要fork两次吗?
查看>>
方面和服务,差别大吗?
查看>>
Go现在接受来自GitHub PR的补丁
查看>>
JetBrains发布WebStorm 2016.2,改进对TypeScript和React的支持
查看>>
国内首例:飞步无人卡车携手中国邮政、德邦投入日常运营
查看>>
深入理解浏览器的缓存机制
查看>>
7道常见的数据分析面试题
查看>>
《反脆弱边缘:反脆弱实践》访谈
查看>>
敏捷世界里中层经理的角色
查看>>
微服务现状综述
查看>>
使用试验和数据创新并构建客户真正使用的产品
查看>>
Kubernetes 1.14重磅来袭,多项关键特性生产可用
查看>>
Google发布Tensor2Tensor for TensorFlow
查看>>
微服务的漫长历史
查看>>