EventBus
是一个用于组件间通信的框架。它为开发提供一种非常简便的方式来是实现组件间解耦通信,并且提供了线程切换、优先级设置等功能。
从官方的示意图中不难看出,EventBus
使用的是观察者模式
:Subscriber
注册到EventBus
, 当Publisher
使用post
方法将Event
发送给EventBus
,EventBus
就会回调Subscriber
的onEvent
方法。观察者模式能将观察者和订阅者最大程度的解耦,这也是EventBus
的功能所在。
具体用法就不多说了,具体可见官方主页
本文解析使用的EventBus
版本是3.0.0
。
进行源码分析之前,先思考一下,如果是自己,会如何实现?
首先,我们需要将注册的类、声明的订阅方法,以及方法执行的线程、优先级都保存下来; 其次,我们要可以根据接收的事件去快速查找到对应的订阅者,当有消息通知时,可以高效通知; 再者,我们还需要自己去处理线程间的切换以满足不同的应用场景; 最后,我们应该提供注销功能以便取消消息订阅。
带着思路,一起来看看源码是如何一步一步实现的。本文根据我们使用EventBus
的步骤来进行讲解的。
注解@Subscribe
在声明订阅方法时,要求使用 @Subscribe
注解,先来看下它的具体定义
@Documented@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.METHOD})public @interface Subscribe { ThreadMode threadMode() default ThreadMode.POSTING; boolean sticky() default false; int priority() default 0;}复制代码
先来看三个元注解:
@Documented
:生成java文档时,会将该注解也写进文档里
@Retention(RetentionPolicy.RUNTIME)
:有效周期是在运行时
@Target({ElementType.METHOD})
:指定对方法有效。
注解里声明了三个成员变量:
threadMode
(订阅方法执行的线程)sticky
(是否是粘性事件)priority
(优先级)。
ThreadMode
是一个枚举类,定义了四种线程模式:
public enum ThreadMode { POSTING, MAIN, BACKGROUND, ASYNC}复制代码
POSTING: 和发送事件的线程在同一个线程,避免了线程切换开销。
MAIN:订阅在主线程,事件将排队等待交付(非阻塞)。使用此模式的订阅者必须快速返回,以避免阻塞主线程。
BACKGROUND:如果是在主线程发布,则会订阅在一个后台线程,依次排队执行;如果不是在主线程发布,则会订阅在发布所在的线程。
ASYNC: 在非主线程和发布线程中订阅。当处理事件的方法 是耗时的,需要使用此模式。尽量避免同时触发大量的耗时较长的异步操作,EventBus 使用线程池高效的复用已经完成异步操作的线程。
EventBus
之所以要求在订阅方法上加上@Subscribe
注解,就是相当于给订阅者打标签,框架根据注解去找到订阅者。
注册
先来看看注册的过程
public void register(Object subscriber) { //拿到订阅者的运行时类型Class Class subscriberClass = subscriber.getClass(); //利用订阅者的Class去查找类中声明的订阅方法 ListsubscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); //循环遍历逐个将订阅者和订阅方法订阅到EventBus synchronized (this) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } }}复制代码
主要有三步,第一步拿到订阅者的类对象,第二步通过订阅者类对象找到类中的所有订阅方法,第三步进行订阅。
查找订阅方法
在上述第二步使用了一个SubscriberMethodFinder
实例来进行方法查找。SubscriberMethodFinder
这个类是专门用来查找订阅方法的,findSubscriberMethods()
最后返回了一个SubscriberMethod
集合。SubscriberMethod
类则就是对我们声明的订阅方法和参数的封装。可以先略过。
接着看findSubscriberMethods()
是如何通过订阅者的Class对象来进行方法查找的。
ListfindSubscriberMethods(Class subscriberClass) { //从缓存中查找,如果已经有,则直接返回。 List subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null) { return subscriberMethods; } //如果缓存中没有查找到,就通过订阅者Class对象进行查找。 if (ignoreGeneratedIndex) { subscriberMethods = findUsingReflection(subscriberClass); } else { // 使用apt提前解析的订阅者信息 subscriberMethods = findUsingInfo(subscriberClass); } // 将查找到的方法存入缓存并返回 if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation"); } else { METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } }复制代码
这里做了个缓存处理METHOD_CACHE
,因为查找是比较耗时的操作,缓存可以提高效率。
private static final Map, List > METHOD_CACHE = new ConcurrentHashMap<>();复制代码
METHOD_CACHE
是一个以订阅者Class为Key
, 订阅方法集合为Value的线程安全的的HashMap
。
第一次执行肯定是没有缓存的,然后会根据ignoreGeneratedIndex
来执行不同的方法。从方法名来看,一个是使用反射去查找,另一个是使用已有的信息去查找。
其实这里就是3.0.0引入的优化点:3.0.0引入了APT(Annotation Processing Tool)
,它可以在编译阶段就提前解析注解,提前检索到订阅方法。这样就提高了运行时效率。
ignoreGeneratedIndex
这个值默认是false,因为反射开销大,所以默认是走findUsingInfo()
分支,但是在findUsingInfo()
方法中会检查本地是否有apt
预先解析出的订阅者信息,如果没有,还是会执行反射方法findUsingReflectionInSingleClass()
。
启动apt
的部分涉及到了注解处理器,所以会单独起一篇来讲解,先来看看不使用apt
的情况:
private ListfindUsingReflection(Class subscriberClass) { //准备一个FindeState实例 FindState findState = prepareFindState(); //将订阅者Class与FindeState实例关联 findState.initForSubscriber(subscriberClass); //从子类到父类去逐一查找订阅方法 while (findState.clazz != null) { //使用反射查找一个类的订阅方法 findUsingReflectionInSingleClass(findState); //将父类赋给findState.clazz,往上进行查找 findState.moveToSuperclass(); } //返回订阅方法集合并回收FindState实例 return getMethodsAndRelease(findState); }复制代码
这里去查找订阅方法,因为子类会继承父类的方法,所以当子类找不到时,需要去查找父类。这里使用了一个while循环递归进行查找。查找过程使用了一个新的对象FindState
,它是用来存储查找过程中的一些信息,方便进行迭代查找。它的类定义:
static class FindState { //订阅方法集合 final ListsubscriberMethods = new ArrayList<>(); final Map anyMethodByEventType = new HashMap<>(); final Map subscriberClassByMethodKey = new HashMap<>(); final StringBuilder methodKeyBuilder = new StringBuilder(128); //当前订阅者 Class subscriberClass; //当前查找的类 Class clazz; //是否跳过父类查找 boolean skipSuperClasses; SubscriberInfo subscriberInfo; }复制代码
接着往下走,进入真正开始使用反射解析一个类的订阅方法:findUsingReflectionInSingleClass()
:
private void findUsingReflectionInSingleClass(FindState findState) { Method[] methods; try { // 只获取非继承的方法,这个方法比getMethods()方法效率高,比如对一个Activity来说。 findState.clazz.getDeclaredMethods(); } catch (Throwable th) { // 异常则直接获取所有方法(包括继承的), 这样就不用再检查父类了。 methods = findState.clazz.getMethods(); findState.skipSuperClasses = true; } // 遍历所有方法筛选出订阅方法 for (Method method : methods) { int modifiers = method.getModifiers(); // 必须是public的、非静态、非抽象的方法 if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) { //获取到所有参数的类型 Class [] parameterTypes = method.getParameterTypes(); // 只能有一个参数 if (parameterTypes.length == 1) { //获取@Subscribe注解的方法 Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null) { Class eventType = parameterTypes[0]; //检查该eventType是否已订阅了,通常订阅者不能有多个 eventType 相同的订阅方法 if (findState.checkAdd(method, eventType)) { ThreadMode threadMode = subscribeAnnotation.threadMode(); //将订阅方法和对应的接收的Event类型以及注解参数保存 findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length); } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract"); } } }复制代码
这个方法主要是通过修饰符、参数个数、指定注解和是否有EventType
相同的方法这几层筛选,最终将订阅方法添加进 findState
的 subscriberMethods
这个 List 中。
其中重点有一个判断:findState.checkAdd()
,这个方法决定了是否订阅方法可以被保存下来进而能接收到消息。一起来看看它是如何判断的:
private boolean checkAdd(Method method, Class eventType) { // 1.检查eventType是否已经注册过对应的方法(一般都没有) Object existing = anyMethodByEventType.put(eventType, method); if (existing == null) { return true; } else { // 2. 如果已经有方法注册了这个eventType if (existing instanceof Method) { if (!checkAddWithMethodSignature((Method) existing, eventType)) { throw new IllegalStateException(); } anyMethodByEventType.put(eventType, this); } return checkAddWithMethodSignature(method, eventType); }}复制代码
进行了两种检查,第一种是判断当前类中是否已经有这个EventType
和对应的订阅方法,一般一个类不会有对同一个EventType
写多个方法,会直接返回true
,进行保存。
但是如果出现了同一个类中同样的EventType
写了多个方法,该如何处理?
还有当findUsingReflection()
中进行下一轮循环,会进行父类查找,如果子类继承了父类的订阅方法,又该如何处理呢?
答案就在上边的注释2。关键点就是checkAddWithMethodSignature()
方法:
private boolean checkAddWithMethodSignature(Method method, Class eventType) { // 以[方法名>eventType]为Key methodKeyBuilder.setLength(0); methodKeyBuilder.append(method.getName()); methodKeyBuilder.append('>').append(eventType.getName()); String methodKey = methodKeyBuilder.toString(); // 拿到新的订阅方法所属类 Class methodClass = method.getDeclaringClass(); Class methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass); if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) { // methodClassOld == null或者 methodClassOld是methodClass的父类/同一个类 return true; } else { // Revert the put, old class is further down the class hierarchy subscriberClassByMethodKey.put(methodKey, methodClassOld); return false; } }复制代码
对于同一类中同样的EventType
写了多个方法,因为方法名不同,所以[方法名>eventType]
的Key
不同,methodClassOld
会为null
,直接返回 true
。所以这种情况会将所有相同EventType
的方法都进行保存。
对于子类重写父类方法的情况,则methodClassOld
(即子类)不为null
,并且methodClassOld
也不是methodClass
的父类,所以会返回false
。即对于子类重写父类订阅方法,只会保存子类的订阅方法,忽略父类的订阅方法。
至此,findState
的查找任务就结束了,通过循环向父类查找,将订阅者的订阅方法都保存在了其内部变量subscriberMethods
列表中。
最后,跳出循环,回到findUsingReflection()
方法中,最后返回了 getMethodsAndRelease(findState)
private ListgetMethodsAndRelease(FindState findState) { List subscriberMethods = new ArrayList<>(findState.subscriberMethods); findState.recycle(); synchronized (FIND_STATE_POOL) { for (int i = 0; i < POOL_SIZE; i++) { if (FIND_STATE_POOL[i] == null) { FIND_STATE_POOL[i] = findState; break; } } } return subscriberMethods; }复制代码
很好理解,就是将findState
中的subscriberMethods
取出并返回。可以看到作者还是很细心的,将使用完的findState
实例置空恢复后又放回实例池中,将实例回收利用,节省了新的开销。
再回到findSubscriberMethods()
方法中,将查找的方法最后都存进了内存缓存METHOD_CACHE中, 对应关系是订阅类和它的订阅方法:
METHOD_CACHE.put(subscriberClass, subscriberMethods);复制代码
到这里,查找订阅方法就结束了。
订阅(subscribe())
回到register()
方法中的第三步,对上一步查找到的订阅方法集合进行了遍历调用subscribe()
方法。来看看subscribe()
方法做了什么事:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { Class eventType = subscriberMethod.eventType; // 实例一个Subscription对象,内部持有了订阅者和订阅方法 Subscription newSubscription = new Subscription(subscriber, subscriberMethod); // subscriptionsByEventType是以EventType为Key,以它对应Subscription集合的Map CopyOnWriteArrayListsubscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } // 根据优先级排序Subscription int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break; } } // typesBySubscriber保存了订阅者对应的所有EventType List > subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); // 粘性订阅方法要立即处理 if (subscriberMethod.sticky) { // 默认为true if (eventInheritance) { // 看当前EventType是否是已有的stickyEvent的子类或父类 Set , Object>> entries = stickyEvents.entrySet(); for (Map.Entry , Object> entry : entries) { Class candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); // 立即投递事件 checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } }复制代码
这里其实只做了三件事:
- 将订阅者和订阅方法封装到
subscriptionsByEventType
,它可以根据EventType
拿到所有的Subscription
对象,Subscription
对象中就有订阅者和订阅方法。这样当有EventType
消息过来时,可以快速的传递给订阅者的订阅方法。 - 将订阅者和订阅方法封装到
typesBySubscriber
,它可以根据订阅类拿到所有的EventType
。这样当我们调用调用unregister(this)
时,就可以拿到EventType
,又根据EventType
拿到所有订阅者和方法,进行解绑了。 - 如果当前订阅方法是粘性方法,则立即去查找是否有本地事件,有的话就立即投递。
至此,我们的注册就完成了。可以看到某些方法栈的调用还是非常深的,但是整体流程却很简单。这也是值得我们学习的地方。
注销
注册对应的就是注销,当我们的订阅者不再需要接收事件时,可以调用unregister
进行注销:
public synchronized void unregister(Object subscriber) { // 通过订阅者拿到它订阅的所有的订阅事件类型 List> subscribedTypes = typesBySubscriber.get(subscriber); if (subscribedTypes != null) { // 遍历事件类型集合,根据事件类型解绑 for (Class eventType : subscribedTypes) { unsubscribeByEventType(subscriber, eventType); } // 从记录中移除订阅者 typesBySubscriber.remove(subscriber); } else { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } }复制代码
代码很简单,继续看unsubscribeByEventType
:
private void unsubscribeByEventType(Object subscriber, Class eventType) { // 根据事件类型拿到所有的Subscription Listsubscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null) { // 遍历所有Subscription,符合解除条件的进行remove int size = subscriptions.size(); for (int i = 0; i < size; i++) { Subscription subscription = subscriptions.get(i); if (subscription.subscriber == subscriber) { subscription.active = false; subscriptions.remove(i); i--; size--; } } } }复制代码
相比注册流程,注销流程就非常简单了,就是把对应的注册者和对应的注册信息从记录中移除即可。
发送事件
注册完毕后,我们的订阅者和订阅方法都被记录在了EventBus
里,这时就可以给订阅者们发送事件了。EventBus
提供了两种发送方法post()
和postSticky()
。post()
发送的是非粘性的事件,postSticky()
发送的是粘性事件。
post
先来看看post()
发送:
public void post(Object event) { // 从当前线程中取出PostingThreadState PostingThreadState postingState = currentPostingThreadState.get(); // 拿到EventType队列 List
第一步使用ThreadLocal
,是因为ThreadLocal
保证了数据只对当前线程可见,其他线程是不可见的,这样的话当我们从不同的线程中去取数据,数据相当于是分开保存,设置和读取就会比较快。从当前线程中取出的是一个PostingThreadState
:
final static class PostingThreadState { final List
PostingThreadState
主要包含了当前线程的Event
队列、订阅者信息、事件等数据。接下来就是循环调用postSingleEvent()
方法进行投送了:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class eventClass = event.getClass(); boolean subscriptionFound = false; // 如果是可继承的事件 if (eventInheritance) { // 查找Event的所有父类、接口类以及父类的接口类(Event的父类和接口也是Event) List> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class clazz = eventTypes.get(h); // 根据查找到的所有Class(Event),逐个寻找订阅者,进行分发event subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { // 如果不是可继承的事件,则直接对 subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } // 如果没有找到订阅者,报异常 if (!subscriptionFound) { if (logNoSubscriberMessages) { Log.d(TAG, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this, event)); } } }复制代码
这个方法的作用很简单,就是做了个分支处理:如果是可继承的事件,则查找到它的所有父事件,然后再往下分发。继续看postSingleEventForEventType()
方法:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class eventClass) { CopyOnWriteArrayListsubscriptions; // 可能同时在多个线程同时发送Event,subscriptionsByEventType是共有常量,所以需要加锁 synchronized (this) { // 根据Event的类型拿到所有订阅者 subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { // 事件还是Event,但是可能会分发到订阅它父类的订阅者中 postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { // 逐个通知订阅者 postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; }复制代码
这个方法和上一个方法的职责也很单一,就是查找所有订阅者,然后遍历进行通知。查找用的是一个Map: postToSubscription
,就是在订阅时生成的一个[EventType -> List<Subscription>]
Map, 这样我们就可以根据EventType
查找到所有订阅者。接着看postToSubscription()
:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { // 根据订阅线程模式进行switch switch (subscription.subscriberMethod.threadMode) { case POSTING: // 直接调用在本线程 invokeSubscriber(subscription, event); break; case MAIN: if (isMainThread) { // 如果就在主线程,则直接调用 invokeSubscriber(subscription, event); } else { // 如果不在主线程,则使用mainThreadPoster mainThreadPoster.enqueue(subscription, event); } break; case BACKGROUND: if (isMainThread) { // 如果在主线程,使用backgroundPoster backgroundPoster.enqueue(subscription, event); } else { // 如果不在主线程,则直接在当前线程调用 invokeSubscriber(subscription, event); } break; case ASYNC: // 启动新的线程调用,asyncPoster asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }复制代码
这里就是EventBus
的线程切换了,主要有POSTING
、MAIN
、BACKGROUND
、ASYNC
四种模式,四种模式在开篇已经介绍过了。这里涉及到了invokeSubscriber()
方法和mainThreadPoster
、backgroundPoster
、asyncPoster
三个poster,接下来我们就分别来看下:
invokeSubscriber()
void invokeSubscriber(Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } }复制代码
invokeSubscriber()
就是直接在当前线程调用了订阅者的method
对象,这里调用了反射类Method
的方法invoke()
直接调用执行。
mainThreadPoster
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);复制代码
mainThreadPoster
是个自定义的类HandlerPoster
,它的目的是在主线程中调用订阅方法,而EventBus
使用的就是我们熟悉的Handler
:
final class HandlerPoster extends Handler { private final PendingPostQueue queue; private final int maxMillisInsideHandleMessage; private final EventBus eventBus; private boolean handlerActive; HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) { super(looper); // 用来最后调用方法 this.eventBus = eventBus; // 最大处理时间 this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage; // 一个待处理消息的队列 queue = new PendingPostQueue(); } ... ...}复制代码
HandlerPoster
是自定义的Handler
,发送消息使用的是Looper.getMainLooper()
即主线程的Handler
。 内部定义了一个最大处理消息时间,默认是10
毫秒,所以说我们一定不要在订阅方法中做耗时操作。还维护了一个PendingPostQueue
,它是自定义的一个链表队列,这里猜测HandlerPoster
可能是自己维护了消息队列,来看下入队方法:
void enqueue(Subscription subscription, Object event) { // 获取一个PendingPost实例 PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { // 入队 queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; // 主线程的handler发送消息,发送到主线程 if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } }复制代码
这里出现了一个新的类PendingPost
,它封装了订阅者subscription
实例和事件event
实例;它内部又维护了一个大小为10000
的PendingPost
(数组集合)池,用来重复利用PendingPost
实例。然后将PendingPost
实例入队到上一步说的PendingPostQueue
队列中。接着使用主线程的Handler
发送一个消息。接下来就是在handleMessage()
中如何处理消息了:
@Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { // 从队列中取出一个pendingPost PendingPost pendingPost = queue.poll(); // 如果队列里的消息处理完毕,就直接跳出循环。 if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; } } } // 调用订阅方法并会回收pendingPost eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; // 如果方法的执行时间超过最大执行时间(默认10毫秒) if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } }复制代码
这里就是使用了while循环,不断从队列中去取PendingPost
处理,但是加了个最大执行时间处理,因为是在主线程调用,所以一旦超时,就退出队列,并重新尝试去再进入队列。
BackgroundPoster
再来看看BackgroundPoster
BackgroundPoster
的作用是将UI线程的订阅方法调度在非UI线程中。即它是要执行在新的Thread
中的,而开启线程我们最常用的就是Runnable
, 来看看源码:
final class BackgroundPoster implements Runnable { private final PendingPostQueue queue; private final EventBus eventBus; private volatile boolean executorRunning; BackgroundPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); } ... ...}复制代码
果不其然,BackgroundPoster
实现了Runnable
接口,这样就可以被线程执行。其内部也是维护了EventBus
和一个PendingPost
队列。
public void enqueue(Subscription subscription, Object event) { // 从消息池中构建一个PendingPost PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { // 入队 queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true; // 线程池调度执行 eventBus.getExecutorService().execute(this); } } }复制代码
和HandlerPoster
类似:新建一个新的PendingPost
入队,使用了EventBus
里的一个ExecutorService
,它是对线程池定义的一个接口,来看看它的默认值:
public class EventBusBuilder { private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool(); ... ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;}复制代码
熟悉的Executors
,创建了一个可缓存的线程池,用来执行BackgroundPoster
这个Runnable
对象,再来看看BackgroundPoster
的run()
方法:
@Override public void run() { try { try { while (true) { // 取出队头 PendingPost pendingPost = queue.poll(1000); // 如果队头为空,说明队列里没有消息了,直接退出循环 if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { executorRunning = false; return; } } } // 调用订阅方法 eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { Log.w("Event", Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } }复制代码
也和HandlerPoster
类似,但是不同之处在于:
- 使用了
poll(int maxMillisToWait)
方法,这个设计很巧妙,当取到最后发现队列为空后,会wait
1000 毫秒,当有有新的信息来临时就会唤醒线程,poll
出消息。这样设计就减少了发送消息的次数,节省了资源。 - 因为是在子线程执行,所以就没有方法执行时间的限制了。
AsyncPoster
class AsyncPoster implements Runnable { private final PendingPostQueue queue; private final EventBus eventBus; AsyncPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); } public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); queue.enqueue(pendingPost); //直接开启新的线程执行 eventBus.getExecutorService().execute(this); } @Override public void run() { // 直接取出消息执行 PendingPost pendingPost = queue.poll(); if(pendingPost == null) { throw new IllegalStateException("No pending post available"); } eventBus.invokeSubscriber(pendingPost); }}复制代码
可以看到AsyncPoster
和BackgroundPoster``非常的相似,因为它们的功能也非常相似。但是不同之处在于:BackgroundPoster
是尽可能使用一个后台线层去依次排队执行订阅方法;而AsyncPoster
则是每条消息都直接开启新的后台线程立即执行。
至此四个Poster就讲完了,看完是真的爽,不论是从功能抽象到具体细节的把控,EventBus
都处理的很好,非常值得学习。
粘性事件
粘性事件这名字一听很耳熟,没错,安卓四大组件之BroadcastReceiver
就有一种广播叫做粘性广播(StickyBroadcast
),而EventBus
也提供了类似的功能:当注册了粘性事件后,立即能收到还没有注册时系统发出的最后一个事件。
public void postSticky(Object event) { // 将粘性事件保存下来 synchronized (stickyEvents) { stickyEvents.put(event.getClass(), event); } // Should be posted after it is putted, in case the subscriber wants to remove immediately post(event); }复制代码
postSticky()
方法用来发送一个粘性事件,在这个方法中,直接将粘性事件保存在了一个Map集合中,而key
就是Event
的Class对象。接着就调用正常的post()
方法了。
那为什么我们后注册的方法也能接收到之前发出的粘性事件呢,答案就在上面提到的注册方法subscribe()
中的最后一段:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { ... // 如果是粘性事件,则直接发送出去 if (subscriberMethod.sticky) { if (eventInheritance) { // 从stickyEvents取出粘性事件的Class对象 Set, Object>> entries = stickyEvents.entrySet(); for (Map.Entry , Object> entry : entries) { Class candidateEventType = entry.getKey(); // 如果订阅的事件是保存的粘性事件Class或它的父类 if (eventType.isAssignableFrom(candidateEventType)) { // 取出缓存的Event Object stickyEvent = entry.getValue(); // 将缓存的Event发送出去 checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } }}复制代码
在我们注册订阅方法和事件时,如果是粘性事件,就直接会将事件发送给注册了相同Event
的订阅者,方法中调用了checkPostStickyEventToSubscription(newSubscription, stickyEvent)
方法:
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) { if (stickyEvent != null) { // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state) // --> Strange corner case, which we don't take care of here. postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper()); } }复制代码
很简单,直接又调用了postToSubscription()
方法,根据指定线程分别进行分发。
总结
整体看下来,框架的结构非常的清晰全面,尤其在对方法功能的封装和细节处理上,很是值得学习。尤其在3.0后,EventBus
使用了注解处理器在编译阶段就完成了订阅者的解析,这使得框架更佳的轻量和高效。整片源码读下来还是很爽的,没事还要再复习多看几遍,很多东西都值得学习。共勉!