RxJava is designed on the basis of chained calls, and by setting different schedulers, you can flexibly switch between threads and execute the corresponding Task.
Let’s take a look at how this switching pattern is implemented.
Scheduler
Scheduler is the abstract parent class of all RxJava schedulers, and subclasses need to override its createWorker()
to return a Worker instance that accepts and executes a Task; it can also override its scheduleDirect()
to determine how to assign a Task to a different worker. An abbreviated version of the Scheduler source code is as follows.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
public abstract class Scheduler {
...
@NonNull
public abstract Worker createWorker();
// 调度一次定时 Task,细节封装在传入的 Runnable 里
@NonNull
public Disposable scheduleDirect(
@NonNull Runnable run, long delay, @NonNull TimeUnit unit
) {
// 新建一个 Worker
final Worker w = createWorker();
// 静态代理并封装我们想要执行的 Runnable,具体实现可忽略
final Runnable decoratedRun
= RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
// 将 Task 交给新建的 Worker 执行
w.schedule(task, delay, unit);
return task;
}
// 同时 Worker 也是一个抽象类
public abstract static class Worker implements Disposable {
...
// 执行被分配的定时 Task;
// 注意,Worker 内部也可以维护一个自己的 Task 调度策略
@NonNull
public abstract Disposable schedule(
@NonNull Runnable run, long delay,
@NonNull TimeUnit unit);
}
}
|
In general, the default implementation of the Scheduler is to create a new instance of the Worker and assign the Task to it whenever a new Task arrives, and the Worker can also maintain a Task scheduling policy of its own internally.
newThread
RxJava’s newThread scheduler starts a new thread for each new Task to execute it. Let’s take newThread as an example and see how the simplest Scheduler is implemented.
Our usual Schedulers.newThread()
is a singleton pattern that returns a NewThreadScheduler instance. Here is the corresponding source code for NewThreadScheduler.
1
2
3
4
5
6
7
8
9
10
11
|
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
...
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
|
As you can see, the NewThreadScheduler does not override the default behavior of scheduleDirect()
, which is to “create a new worker instance and assign a Task to it whenever a new Task arrives”; it simply overrides createWorker()
to return a concrete NewThreadWorker instance.
Let’s look at the corresponding source code for NewThreadWorker.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
public class NewThreadWorker extends Scheduler.Worker
implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
// 最终被赋值为 Executors.newScheduledThreadPool(1)
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
@Override
public Disposable schedule(
@NonNull final Runnable action, long delayTime,
@NonNull TimeUnit unit
) {
...
return scheduleActual(action, delayTime, unit, null);
}
@NonNull
public ScheduledRunnable scheduleActual(
final Runnable run, long delayTime,
@NonNull TimeUnit unit,
@Nullable DisposableContainer parent
) {
// 静态代理并封装我们想要执行的 Runnable,具体实现可忽略
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr =
new ScheduledRunnable(decoratedRun, parent);
...
Future<?> f;
try {
if (delayTime <= 0) {
// 立即执行
f = executor.submit((Callable<Object>) sr);
} else {
// 延时调度
f = executor.schedule(
(Callable<Object>) sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
...
}
return sr;
}
}
|
Combining the source code of NewThreadScheduler and NewThreadWorker, you can see that each new Task is immediately executed or deferred by a newly created thread pool with a capacity of 1 ScheduledExecutorService, which is a multi-threaded scheduler implementation natively provided by the JDK, is executed immediately or deferred. Other RxJava scheduler implementations will not be expanded here, you can check the corresponding source code if you are interested.
Chained Calls
After understanding the concrete implementation of Scheduler, we also need to know how Scheduler works in chained calls. For more information about how chained calls work in RxJava, it is recommended to read my previous article on RxJava chained call principles, so I won’t go into it here. Here we will focus on the subscribeOn and observeOn operators for thread switching. Note that this article uses the operator implementation of Observable as the object of discussion.
subscribeOn is used to set the thread in which the Observable starts execution; observeOn is used to set the thread in which the downstream operator starts from where the operator is called. A typical thread switching scenario is as follows.
1
2
3
4
5
|
Observable
.create(...) // 在 io 调度器上执行
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...) // 在 Android 主线程上执行
|
Let’s look at the corresponding source code for subscribeOn.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public abstract class Observable<@NonNull T>
implements ObservableSource<T> {
...
@NonNull
public final Observable<T> subscribeOn(
@NonNull Scheduler scheduler
) {
...
return RxJavaPlugins.onAssembly(
new ObservableSubscribeOn<>(this, scheduler)
);
}
}
|
Let’s look at the source code for ObservableSubscribeOn.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
public final class ObservableSubscribeOn<T>
extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(
ObservableSource<T> source, Scheduler scheduler
) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(
final Observer<? super T> observer
) {
// 静态代理传入的上游 Observer,具体实现可忽略
final SubscribeOnObserver<T> parent =
new SubscribeOnObserver<>(observer);
...
parent.setDisposable(
scheduler.scheduleDirect(new SubscribeTask(parent))
);
}
...
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
// 这是 Runnable 接口必须实现的方法,
// 使得 subscribe() 可以运行在对应的 Scheduler
@Override
public void run() {
// source 对象是上游的 Observable,
// parent 对象是下游的 Observer
source.subscribe(parent);
}
}
}
|
As you can see, observeOn wraps subscribe()
of the Observable in a Task and calls scheduleDirect()
of the Scheduler to switch threads, thus achieving the purpose of “setting the thread where the Observable starts execution”.
Next, let’s look at the source code for observeOn.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public abstract class Observable<@NonNull T>
implements ObservableSource<T> {
...
@NonNull
public final Observable<T> observeOn(
@NonNull Scheduler scheduler,
boolean delayError, int bufferSize
) {
...
return RxJavaPlugins.onAssembly(
new ObservableObserveOn<>(
this, scheduler, delayError, bufferSize)
);
}
}
|
Let’s look at the source code for ObservableObserveOn.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
public final class ObservableSubscribeOn<T>
extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableObserveOn(
ObservableSource<T> source, Scheduler scheduler,
boolean delayError, int bufferSize
) {
super(source);
this.scheduler = scheduler;
...
}
@Override
public void subscribeActual(
final Observer<? super T> observer
) {
if (scheduler instanceof TrampolineScheduler) {
...
} else {
// 直接创建一个新的 Worker 实例
Scheduler.Worker w = scheduler.createWorker();
// source 对象是上游的 Observable,
// observer 对象是下游的 Observer;
// 此处通过创建一个 ObserveOnObserver 作为中间人角色,
// 它订阅了 source 并在相关回调中调用 observer 的对应方法,
// 仍然是静态代理模式的应用
source.subscribe(new ObserveOnObserver<>(
observer, w, delayError, bufferSize));
}
}
...
static final class ObserveOnObserver<T>
extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
...
final Observer<? super T> downstream;
final Scheduler.Worker worker;
ObserveOnObserver(
Observer<? super T> actual, Scheduler.Worker worker,
boolean delayError, int bufferSize
) {
this.downstream = actual;
this.worker = worker;
...
}
// 和平时调用 subscribe() 时 new Observer 一样,
// 复写以下四个方法;具体实现相对复杂,略去不表
@Override
public void onSubscribe(Disposable d) { ... }
@Override
public void onNext(T t) { ... }
@Override
public void onError(Throwable t) { ... }
@Override
public void onComplete() { ... }
// 主要调用 downstream 的逻辑在这里;
// 这是 Runnable 接口必须实现的方法,
// 使得 downstream 可以运行在对应的 Scheduler
@Override
public void run() { ... }
// 实际的逻辑跳转很多,但最终在这里切换线程
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
...
}
}
|
As you can see, observeOn achieves the purpose of “setting the thread where the downstream operator is located from the point where the operator is invoked” by encapsulating the invocation logic for calling the downstream Observer in a Task and switching threads by the specified Worker instance.
As you may have noticed here, we know from the previous Scheduler source code that the default call to scheduleDirect()
is also to hand over the Task to the new Worker instance created by createWorker()
for execution. implementation? If you’re interested, take a look at the source code of the single scheduler, where the two methods can be more fully customized and the two methods are not necessarily directly related. Just make sure that the underlying dispatching logic is correct and you’ll be OK.
Finally, even today, when the same JVM-based language Kotlin already supports coroutines, RxJava is still a library well worth learning and using, as it handles thread switching so elegantly using only the multithreading API provided by the JDK. The author does not consider it obsolete.