RxJava
关于RxJava最友好的文章(初级篇)
subscribe
:订阅(事件的终点)(为了保持链式调用,写在在Observable/Flowable
后,实际调用者是Observer/Subscriber
)(Subscriber
是对Observer
的封装,较常用)
subscribeOn
:指定了被观察者执行的线程(多次指定以第一次为准)
observeOn
:指定接下来代码执行的线程环境。(可能会有多个observeO
,切换多次,只对observeOn
后的代码起作用)
map
: 改发出事件的类型(完成类型转换)
filter
:传递过程中对事件进行过滤操作
flatMap
:(无序)
concatMap
:(有序)
RxJava是什么
- Rx:全程Reactive Extensions 响应式扩展
- 基于观察者模式
- 一种变成模型
- 目标是提供一致的编程接口,帮助开发者更方便地处理异步数据流
- ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华
- 四个基本概念
Observable
可观察者、被观察者Observer
观察者
内置了一个实现了Observer
的抽象类:Subscriber
subscribe
订阅- 事件
如何使用RxJava
在gradle
文件的dependencies
中添加
1 | compile 'io.reactivex:rxandroid:1.2.0' |
RxJava在生产环境中的使用
RxJava与Retrofit
gradle文件的引用:
1 | compile 'com.squareup.retrofit2:retrofit:2.0.2' |
1 | Retrofit retrofit = new Retrofit.Builder() |
RxBus(替代EventBus、Otto)
封装:
1 | public class RxBus { |
使用-发送消息:
1 | RxBus.getInstance().post("SomeChange"); |
使用-接收消息:
1 | Subscription mSubscription = RxBus.getInstance().toObserverable(String.class).subscribe(new Action1<String>() { |
在适当的地方取消这个订阅(以免发生内存泄露)
1 | mSubscription.unsubscribe(); |
RxPermissions
1 | //取得相机权限和读取手机状态 |
轮询,定时操作
1 | //每隔两秒执行一次 |
1 | //在两秒后去执行一些操作(比如启动页跳转到主页面) |
节流(防止按钮的重复点击)
1 | RxView.clicks(button).debounce(300, TimeUnit.MILLISECONDS).subscribe(new Action1<Void>() { |
RxBinding
(控件绑定库)
Platform bindings:
1 | compile 'com.jakewharton.rxbinding:rxbinding:0.4.0' |
'support-v4' library bindings:
1 | compile 'com.jakewharton.rxbinding:rxbinding-support-v4:0.4.0' |
'appcompat-v7' library bindings:
1 | compile 'com.jakewharton.rxbinding:rxbinding-appcompat-v7:0.4.0' |
'design' library bindings:
1 | compile 'com.jakewharton.rxbinding:rxbinding-design:0.4.0' |
'recyclerview-v7' library bindings:
1 | compile 'com.jakewharton.rxbinding:rxbinding-recyclerview-v7:0.4.0' |
'leanback-v17' library bindings:
1 | compile 'com.jakewharton.rxbinding:rxbinding-leanback-v17:0.4.0' |
接口
OnSubscribe
实现了Action
接口,只有一个call
方法,在subscribe
时候才会去调用
Subscriber
观察者(实现Observer
的抽象类)
调用subscribe
所传的参数,里面是onNext
,onComplete
,onError
等方法
Observable
可观察者、被观察者
所有的操作符都在这个类里面,我认为最重要的其实是OnSubscribe
和Subscriber
,而Observable
其实是作为一个桥梁来链接这2者
Subscription
这个是调用subscribe
方法的返回值,用来取消这次订阅
另一篇RxJava
一、简单上下游事件接发
1 | //创建一个上游 Observable: |
通过 observable.subscribe 的 subscribe 方法来连接
输出结果:(下游的subscribe先调用)
1 | 12-02 03:37:17.818 4166-4166/zlc.season.rxjava2demo D/TAG: subscribe |
链式调用写法:
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
ObservableEmitter: Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter
的onNext(T value)
、onComplete()
和onError(Throwable error)
就可以分别发出next事件、complete事件和error事件。
注意点:
- 可发送多个 onNext,可接收多个 onNext
- 发送 onComplete 或 onError 后,其后的事件也会继续发送。但接收到 onComplete 或 onError 后就不再接收了。
- onComplete 和 onError 是唯一互斥的(自行在代码中控制唯一互斥,否则也只接收第一个 onComplete就不接收了,但接收第二个 onError 就会导致程序崩溃)**
Disposable 字面意思是一次性用品,用完可丢弃的
注意:调用 dispose()并不会导致上游不再继续发送事件,上游会继续发送剩余的事件。(但下游接收不到事件了)
subscribe()
有多个重载方法
1 | //下游不关心上游发什么 |
二、线程切换 & 和Retrofit
结合使用
正常(默认)情况下上游和下游是工作在同一个线程中的
上游发送在子线程中,下游接收在主线程中:
1 |
|
输出:
1 | D/TAG: Observable thread is : RxNewThreadScheduler-2 |
多次指定上游的线程只有第一次指定的有效,subscirbeOn()
只有第一次有效,其余会被忽略
下游的 observeOn()
调用一次就会切换一次线程
1 | observable.subscribeOn(Schedulers.newThread()) |
结果:
1 | D/TAG: Observable thread is : RxNewThreadScheduler-1 |
RxJava 内置了很多线程供选择,如:
- Schedulers.io() 代表 io 操作的线程,通常用于网络,读写文件等 io 密集型的操作
- Schedulers.computation() 代表 CPU 计算密集型的操作,例如需要大量计算的操作
- Schedulers.newThread() 代表一个常规的新线程
- AndroidSchedulers.mainThread() 代表 Android 的主线程
Retrofit 使用
Gradle中配置Retrofit
1 | //retrofit |
定义 APi 接口
1 | public interface Api { |
创建 一个 Retrofit 客户端
1 | private static Retrofit create() { |