第三方-RxJava

RxJava

关于RxJava最友好的文章(初级篇)

rxjava2思维导图

subscribe:订阅(事件的终点)(为了保持链式调用,写在在Observable/Flowable后,实际调用者是Observer/Subscriber)(Subscriber是对Observer的封装,较常用)

subscribeOn:指定了被观察者执行的线程(多次指定以第一次为准)

observeOn:指定接下来代码执行的线程环境。(可能会有多个observeO,切换多次,只对observeOn后的代码起作用)

map : 改发出事件的类型(完成类型转换)

filter:传递过程中对事件进行过滤操作

flatMap:(无序)

concatMap:(有序)

RxJava是什么

  • Rx:全程Reactive Extensions 响应式扩展
    • 基于观察者模式
    • 一种变成模型
    • 目标是提供一致的编程接口,帮助开发者更方便地处理异步数据流
    • ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华
  • 四个基本概念
    • Observable可观察者、被观察者
    • Observer 观察者
      内置了一个实现了 Observer 的抽象类:Subscriber
    • subscribe 订阅
    • 事件

如何使用RxJava

gradle文件的dependencies中添加

1
2
compile 'io.reactivex:rxandroid:1.2.0'
compile 'io.reactivex:rxjava:1.1.5'

RxJava在生产环境中的使用

RxJava与Retrofit

gradle文件的引用:

1
2
3
compile 'com.squareup.retrofit2:retrofit:2.0.2'
compile 'com.squareup.retrofit2:converter-gson:2.0.2'
compile 'com.squareup.retrofit2:adapter-rxjava:2.0.2'//RxJava与Retrofit的适配器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://gank.io/api/data/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())//这个就是用来适配RxJava的
.build();

RxGankService rxGankService = retrofit.create(RxGankService.class);
final Observable<GankResultBean> observable = rxGankService.getAndroidData(1);
observable
.subscribeOn(Schedulers.io()) //订阅者的回调在主线程
.observeOn(AndroidSchedulers.mainThread()) //订阅发生在io线程
.map(new Func1<GankResultBean, List<GankResultBean.ResultsBean>>() {
@Override
public List<GankResultBean.ResultsBean> call(GankResultBean gankResultBean) {
return gankResultBean.getResults();
}
}) //一般我们不会关心error字段,我们关心的只是results,所以在这里做了一个映射让用户接收的是List<GankResultBean.ResultsBean>而不是包含有error的GankResultBean
.flatMap(new Func1<List<GankResultBean.ResultsBean>, Observable<GankResultBean.ResultsBean>>() {
@Override
public Observable<GankResultBean.ResultsBean> call(List<GankResultBean.ResultsBean> resultsBeen) {
return Observable.from(resultsBeen);
}
}) //让结果一条一条的发射出去,而不是一个集合
.filter(new Func1<GankResultBean.ResultsBean, Boolean>() {
@Override
public Boolean call(GankResultBean.ResultsBean resultsBean) {
return "Android".equals(resultsBean.getType());
}
}) //只接收Type为Android的数据
.subscribe(new Subscriber<GankResultBean.ResultsBean>() {
@Override
public void onCompleted() {
Log.i("test", "onCompleted");
}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onNext(GankResultBean.ResultsBean resultsBean) {
textView.append(resultsBean + "\n");
}
});

RxBus(替代EventBus、Otto)

封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class RxBus {
private final Subject<Object, Object> _bus;
private static class RxBusHolder {
private static final RxBus instance = new RxBus();
}

private RxBus() {
_bus = new SerializedSubject<>(PublishSubject.create());
}

public static synchronized RxBus getInstance() {
return RxBusHolder.instance;
}

public void post(Object o) {
_bus.onNext(o);
}

public <T> Observable<T> toObserverable(Class<T> eventType) {
return _bus.ofType(eventType);
}
}

使用-发送消息:

1
RxBus.getInstance().post("SomeChange");

使用-接收消息:

1
2
3
4
5
6
Subscription mSubscription = RxBus.getInstance().toObserverable(String.class).subscribe(new Action1<String>() {
@Override
public void call(String s) {
handleRxMsg(s);
}
});

在适当的地方取消这个订阅(以免发生内存泄露)

1
mSubscription.unsubscribe();

RxPermissions

1
2
3
4
5
6
7
8
9
10
11
//取得相机权限和读取手机状态
RxPermissions.getInstance(this)
.request(Manifest.permission.CAMERA,
Manifest.permission.READ_PHONE_STATE)
.subscribe(granted -> {
if (granted) {

} else {

}
});

轮询,定时操作

1
2
3
4
5
6
7
//每隔两秒执行一次
Observable.interval(2, 2, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
//TODO WHAT YOU WANT
}
});
1
2
3
4
5
6
7
//在两秒后去执行一些操作(比如启动页跳转到主页面)
Observable.timer(2, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
//TODO WHAT YOU WANT
}
});

节流(防止按钮的重复点击)

1
2
3
4
5
6
RxView.clicks(button).debounce(300, TimeUnit.MILLISECONDS).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
Log.i("test", "clicked");
}
});

RxBinding(控件绑定库)

Platform bindings:

1
compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'

'support-v4' library bindings:

1
compile 'com.jakewharton.rxbinding:rxbinding-support-v4:0.4.0'

'appcompat-v7' library bindings:

1
compile 'com.jakewharton.rxbinding:rxbinding-appcompat-v7:0.4.0'

'design' library bindings:

1
compile 'com.jakewharton.rxbinding:rxbinding-design:0.4.0'

'recyclerview-v7' library bindings:

1
compile 'com.jakewharton.rxbinding:rxbinding-recyclerview-v7:0.4.0'

'leanback-v17' library bindings:

1
compile 'com.jakewharton.rxbinding:rxbinding-leanback-v17:0.4.0'

接口

OnSubscribe

实现了Action接口,只有一个call方法,在subscribe时候才会去调用

Subscriber观察者(实现Observer的抽象类)

调用subscribe所传的参数,里面是onNext,onComplete,onError等方法

Observable可观察者、被观察者

所有的操作符都在这个类里面,我认为最重要的其实是OnSubscribeSubscriber,而Observable其实是作为一个桥梁来链接这2者

Subscription

这个是调用subscribe方法的返回值,用来取消这次订阅

另一篇RxJava

一、简单上下游事件接发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//创建一个上游 Observable:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//创建一个下游 Observer
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}

@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}

@Override
public void onComplete() {
Log.d(TAG, "complete");
}
};
//建立连接
observable.subscribe(observer);

通过 observable.subscribe 的 subscribe 方法来连接
输出结果:(下游的subscribe先调用)

1
2
3
4
5
12-02 03:37:17.818 4166-4166/zlc.season.rxjava2demo D/TAG: subscribe
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: 1
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: 2
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: 3
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: complete

链式调用写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}

@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}

@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});

ObservableEmitter: Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitteronNext(T value)onComplete()onError(Throwable error)就可以分别发出next事件、complete事件和error事件。

注意点

  • 可发送多个 onNext,可接收多个 onNext
  • 发送 onComplete 或 onError 后,其后的事件也会继续发送。但接收到 onComplete 或 onError 后就不再接收了。
  • onComplete 和 onError 是唯一互斥的(自行在代码中控制唯一互斥,否则也只接收第一个 onComplete就不接收了,但接收第二个 onError 就会导致程序崩溃)**

Disposable 字面意思是一次性用品,用完可丢弃的
注意:调用 dispose()并不会导致上游不再继续发送事件,上游会继续发送剩余的事件。(但下游接收不到事件了)

subscribe()有多个重载方法

1
2
3
4
5
6
7
8
//下游不关心上游发什么
public final Disposable subscribe() {}
//下游只关心 onNext 事件,其他事件不管
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}

二、线程切换 & 和Retrofit结合使用

正常(默认)情况下上游和下游是工作在同一个线程中的
上游发送在子线程中,下游接收在主线程中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
emitter.onNext(1);
}
});

Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}

输出:

1
2
3
4
D/TAG: Observable thread is : RxNewThreadScheduler-2  
D/TAG: emit 1
D/TAG: Observer thread is :main
D/TAG: onNext: 1

多次指定上游的线程只有第一次指定的有效,subscirbeOn()只有第一次有效,其余会被忽略
下游的 observeOn()调用一次就会切换一次线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "After observeOn(mainThread), current thread is: " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "After observeOn(io), current thread is : " + Thread.currentThread().getName());
}
})
.subscribe(consumer);

结果:

1
2
3
4
5
D/TAG: Observable thread is : RxNewThreadScheduler-1
D/TAG: emit 1
D/TAG: After observeOn(mainThread), current thread is: main
D/TAG: After observeOn(io), current thread is : RxCachedThreadScheduler-2
D/TAG: Observer thread is :RxCachedThreadScheduler-2 D/TAG: onNext: 1

RxJava 内置了很多线程供选择,如:

  • Schedulers.io() 代表 io 操作的线程,通常用于网络,读写文件等 io 密集型的操作
  • Schedulers.computation() 代表 CPU 计算密集型的操作,例如需要大量计算的操作
  • Schedulers.newThread() 代表一个常规的新线程
  • AndroidSchedulers.mainThread() 代表 Android 的主线程

Retrofit 使用

Gradle中配置Retrofit

1
2
3
4
5
6
7
8
9
//retrofit
compile 'com.squareup.retrofit2:retrofit:2.1.0'
//Gson converter
compile 'com.squareup.retrofit2:converter-gson:2.1.0'
//RxJava2 Adapter
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
//okhttp
compile 'com.squareup.okhttp3:okhttp:3.4.1'
compile 'com.squareup.okhttp3:logging-interceptor:3.4.1'

定义 APi 接口

1
2
3
4
5
6
7
public interface Api {
@GET
Observable<LoginResponse> login(@Body LoginRequest request);

@GET
Observable<RegisterResponse> register(@Body RegisterRequest request);
}

创建 一个 Retrofit 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static Retrofit create() {
OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
builder.readTimeout(10, TimeUnit.SECONDS);
builder.connectTimeout(9, TimeUnit.SECONDS);

if (BuildConfig.DEBUG) {
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
builder.addInterceptor(interceptor);
}

return new Retrofit.Builder().baseUrl(ENDPOINT)
.client(builder.build())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
}