最近准备写一个知乎日报客户端,主要是练习和验证 RxJava 在实际项目中的价值。RxJava 在国外社区很火热,目前支持的编程语言包括 Java、JavaScript、C#、Scala、Clojure、C++、Python、Ruby、Kotlin、Swift 等。

Rx(Reactive Extensions)从官方定义来看,是基于观察者模式的事件驱动异步编程框架。它扩展了观察者模式,抽象并简化了线程调度、同步、线程安全、并发数据结构和非阻塞 I/O 等基础操作。

推荐先阅读扔物线(朱凯)的文章:给 Android 开发者的 RxJava 详解

Scheduler(调度器)

RxJava 提供多种 Scheduler 来控制代码运行在哪个线程上:

Scheduler用途说明
Schedulers.immediate()当前线程默认 Scheduler,直接在当前线程运行
Schedulers.newThread()新线程每次都启用新线程,无复用
Schedulers.io()I/O 操作读写文件、数据库、网络调用等。使用无上限缓存线程池,可复用空闲线程,比 newThread() 更高效。不要在此执行 CPU 密集型计算
Schedulers.computation()计算操作CPU 密集型计算,线程数固定为 CPU 核数。不要在此执行 I/O 操作
AndroidSchedulers.mainThread()Android 主线程UI 更新专用

通过 subscribeOn()observeOn() 控制线程:

  • subscribeOn():指定事件产生的线程(即 Observable.OnSubscribe 被激活的线程)。在链中只有第一次调用生效。
  • observeOn():指定事件消费的线程(即 Subscriber 接收事件的线程)。可在链中多次调用,每次切换后续操作的线程。

创建 Observable

操作符说明
Create通过编程方式调用 Observer 方法创建 Observable
Defer直到有观察者订阅时才创建 Observable,每个订阅者获得独立实例
Empty / Never / Throw创建特定行为的 Observable:空、永不发射、发射错误
From将数组或 Iterable 转为 Observable
Interval按固定时间间隔发射递增整数序列
Just将一个或一组对象转为发射这些对象的 Observable
Range发射指定范围内的连续整数
Repeat重复发射某个项或序列
Start发射某个函数的返回值
Timer延迟一段时间后发射一个项

变换 Observable

操作符说明
Buffer定期收集发射物到集合中,再发射该集合
FlatMap将每个发射项转换为另一个 Observable,然后合并所有结果
GroupBy按 key 将原始 Observable 分组为多个子 Observable
Map对每个发射项应用函数进行变换
Scan依次对每个发射项应用函数,并发射每次的中间结果
Window类似 Buffer,但发射的是 Observable 而非集合

目前正在用 RxAndroid 写一个实际应用。基础用法已掌握,但在真实项目中仍有不少需要摸索的地方,后续继续补充。

References