Akindone's Studio.

RxBus:谁动了我的线程

字数统计: 914阅读时长: 4 min
2018/10/28 Share

我们在线上收集到一些异常:在某个RxBus的事件监听回调中抛出了在子线程更改UI的异常。可我们去查看这个RxBus.send()所在的线程的确就是主线程。一般来说订阅事件发生在哪个线程,如果没有修改过线程,那回调也会在同一个线程。事出幺蛾必有妖,我们还是要去看看代码。

尝试复现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class RxBusTest {
@Test
@Throws(Exception::class)
fun send() {
RxBus.toObservable(SomeEvent::class.java)
.doOnNext { System.out.println("$it ---${it.threadName == Thread.currentThread().name} --- ${Thread.currentThread().name}") }
.subscribe()
for (i in 0..4) {
Thread(Runnable {
for (j in 0..10) {
RxBus.send(SomeEvent(j, Thread.currentThread().name))
Thread.sleep(100)
}
}, "Thread $i").start()
}
Thread.sleep(1000)
}

}
data class SomeEvent(val number: Int, val threadName: String)

运行一下发现,的确在多线程情况下send事件会导致回调中的线程与发送的不同

RxBus的实现

1
2
3
4
5
6
7
object RxBus {
private val sBus: Subject<Any> = PublishSubject.create<Any>().toSerialized()

fun send(o: Any) = sBus.onNext(o)

fun <T> toObservable(eventType: Class<T>): Observable<T> = sBus.ofType(eventType)
}

主要看看sBus变量的创建

PublishSubject.create<Any>(),这个好理解,为了实现消息总线,需要有一个“全局的事件管理器”。PublishSubject可以发事件,同时自己也能接收事件。它里面维护了一个所有的订阅者的列表,收到事件时,它会把事件转发给订阅者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final class PublishSubject<T> extends Subject<T> {
/** The array of currently subscribed subscribers. */
final AtomicReference<PublishDisposable<T>[]> subscribers;

public void subscribeActual(Observer<? super T> t) {
PublishDisposable<T> ps = new PublishDisposable<T>(t, this);
t.onSubscribe(ps);
if (add(ps)) {}
...
}

/** 把订阅者加入列表,如果成功返回true,如果subject终止了就返回false */
boolean add(PublishDisposable<T> ps) {}

/** 收到事件时,它会把事件转发给订阅者 */
public void onNext(T t) {
...
for (PublishDisposable<T> s : subscribers.get()) {
s.onNext(t);
}
}
}

再对PublishSubject调用.toSerialized(),转化为SerializedSubject

1
2
3
4
5
6
public final Subject<T> toSerialized() {
if (this instanceof SerializedSubject) {
return this;
}
return new SerializedSubject<T>(this);
}

我们在调用RxBus.send()时实际调用的就是SerializedSubject.onNext。SerializedSubject里有2个重要成员emitting和queue。他们的作用是:如果当前事件还没有处理完,会先把它加到queue,等onSubscribe onNext执行到最后时去遍历queue。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
final class SerializedSubject<T> extends Subject<T> implements NonThrowingPredicate<Object> {
/** Indicates an emission is going on, guarded by this. */
boolean emitting;
/** If not null, it holds the missed NotificationLite events. */
AppendOnlyLinkedArrayList<Object> queue;

public void onSubscribe(Disposable s) {
boolean cancel;
if (!done) {
synchronized (this) {
if (done) {
cancel = true;
} else {
if (emitting) {
AppendOnlyLinkedArrayList<Object> q = queue;
if (q == null) {
q = new AppendOnlyLinkedArrayList<Object>(4);
queue = q;
}
q.add(NotificationLite.disposable(s));
return;
}
emitting = true;
cancel = false;
}
}
} else {
cancel = true;
}
if (cancel) {
s.dispose();
} else {
actual.onSubscribe(s);
emitLoop();
}
}

@Override
public void onNext(T t) {
if (done) {
return;
}
synchronized (this) {
if (done) {
return;
}
if (emitting) {
AppendOnlyLinkedArrayList<Object> q = queue;
if (q == null) {
q = new AppendOnlyLinkedArrayList<Object>(4);
queue = q;
}
q.add(NotificationLite.next(t));
return;
}
emitting = true;
}
actual.onNext(t);
emitLoop();
}

/** Loops until all notifications in the queue has been processed. */
void emitLoop() {
for (;;) {
AppendOnlyLinkedArrayList<Object> q;
synchronized (this) {
q = queue;
if (q == null) {
emitting = false;
return;
}
queue = null;
}
q.forEachWhile(this);
}
}
}

这样这个问题就划归为多线程操作同一个内存对象的问题了。也就可以理解为什么在回调里执行线程会与send事件的线程不同了。

总结

所以以后在RxBus的回调中如果要操作UI,还是加一句切线程的代码吧。
ps.bug一般来源于想当然。但实践和代码才是检验真理的标准。

CATALOG
  1. 1. 尝试复现
  2. 2. RxBus的实现
  3. 3. 总结