Android RxJava的使用

Android RxJava的使用

八归少年 2,860 2020-12-14

首语

最近因为项目上线,挤不出时间,已经好久没有更新博客了😛,目前项目也做差不多了,写几篇总结类型的博客,梳理一下。
本文主要对RxJava及常用操作符的使用进行总结,同时对RxJava在Android中几种常见的使用场景进行举例。

简介

RxJava是Reactive Extensions的Java VM实现:该库用于通过使用可观察的序列来组成异步和基于事件的程序。

Rx是Reactive Extensions的缩写的简写,它是一个使用可观察数据流进行异步编程的编程接口,Rx结合了观察者模式、迭代器模式和函数式编程的精华。
RxJava是一种异步数据处理库,也是一种扩展的观察者模式。
RxJava最早是 Netflix公司为重构当前架构来减少REST调用的次数,借鉴了Microsoft公司的响应式编程(一种基于异步数据流概念的编程模式),把Microsoft的Rx库迁移到Java JVM中,其中最有名的就是RxJava。

特点

  1. 支持Java 8 Lambda。
  2. 支持异步和同步。
  3. 单一依赖关系。
  4. 简洁,优雅。

RxAndroid

对于Android开发者来说,使用RxJava时也会搭配RxAndroid,它是RxJava针对Android平台的一个扩展,用于Android 开发。它提供了响应式扩展组件。使用RxAndroid 的Schedulers(调度器)可以解决Android主线程问题, 多线程等问题。

观察者模式

四大要素
  1. Observable 被观察者
  2. Observer 观察者
  3. subscribe 订阅
  4. 事件

观察者模式
观察者订阅被观察者,一旦被观察者发出事件,观察者就可以接收到。

扩展的观察者模式

扩展的观察者模式
onNext()订阅了一个事件,当事件完成时会回调onComplete(),在完成过程中发生了异常会回调onError()。

使用

GitHub地址

https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid

依赖
//在Project的gradle下添加maven仓库
maven { url "https://oss.jfrog.org/libs-snapshot" }

implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
Hello World
//1.创建被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("hello world");
                emitter.onComplete();
            }
        });
//2.创建观察者
Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("onSubscribe():");
            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println("onNext():" + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println("onError():" + e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete():");
            }
        };
//3.订阅事件
observable.subscribe(observer);

注意:onError()和onComplete()只会回调一个。

操作符

Creating Observables(创建 Observable)
Create
//链式写法
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("hello world");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe():"+d.toString());
            }

            @Override
            public void onNext(String o) {
                System.out.println("onNext():" + o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError():" + e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete():");
            }
        });
Just

使用将为你创建一个Observable并自动为你调用onNext( )发射数据,just中传递的参数将直接在Observer的onNext()方法中接收到。

Observable.just("hello world").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String s) {
             	System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
From

将一个Iterable, 一个Future, 或者一个数组转换成一个Observable,遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。

 List<String> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            list.add("Hello" + i);
        }

        Observable.fromArray(list).subscribe(new Observer<List<String>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(List<String> strings) {
                System.out.println(strings);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Defer

当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。
以何种方式创建这个Observable对象,当满足回调条件后,就会进行相应的回调。

		value = "2020/12/13";
        Observable<String> observable = Observable.defer(new Supplier<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> get() throws Throwable {
                return Observable.just(value);
            }
        });
        value = "12345";
        observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Empty/Never/Throw

Empty是创建一个不发射任何数据但是正常终止的Observable。
Never是创建一个不发射数据也不终止的Observable。
Throw是创建一个不发射数据以一个错误终止的Observable。
这三个操作符生成的Observable行为非常特殊和受限。测试的时候很有用,有时候也用于结合其它的Observables,或者作为其它需要Observable的操作符的参数。

Observable.defer(new Supplier<ObservableSource<?>>() {
            @Override
            public ObservableSource<?> get() throws Throwable {
                return Observable.error(new Throwable("你写了个bug"));
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Object o) {
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });
Interval

创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。即按照固定1秒一次调用onNext()方法。

//TrampolineScheduler不会立即执行,当其他排队任务结束时才执行,TrampolineScheduler运行在主线程。
Observable.interval(1000, TimeUnit.MILLISECONDS, Schedulers.trampoline()).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }

            @Override
            public void onNext(@NonNull Long aLong) {
                System.out.println(aLong);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Range

创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常。
代码表示发射1到20的数。即调用20次onNext()方法,依次传入1-20数字。

 Observable.range(1, 20).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Repeat

创建一个Observable,该Observable的事件可以重复调用。

 Observable.just(123).repeat(2).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Start

返回一个Observable,它发射一个类似于函数声明的值。

Timer

创建一个Observable,它在一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。

 Observable.timer(2000, TimeUnit.MILLISECONDS,Schedulers.trampoline()).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                System.out.println(aLong);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Transforming Observables(转换 Observable)
Map

Map就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。

//Integer to String
Observable.just(123).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer s) throws Exception {
                return s.toString();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
FlatMap

flatMap对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想要的数据形式。它可以返回任何它想返回的Observable对象。

 Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> apply(Integer integer) throws Exception {
                return Observable.just(integer.toString());
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
GroupBy

根据规则对数据进行分组。

Observable.just(1, 2, 3, 4, 5).groupBy(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer % 2==0?"偶数":"奇数";
            }
        }).subscribe(new Observer<GroupedObservable<String, Integer>>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull final GroupedObservable<String, Integer> arg0) {
                arg0.subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(arg0.getKey() + "-------" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Buffer

定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。

Observable.just(1, 2, 3, 4, 5,6).buffer(3).subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(List<Integer> integers) {
                System.out.println(integers);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Scan

将数据进行累加。

Observable.range(1, 5).scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Window

window和buffer相似,它返回的是一个Observable对象,它根据一系列任务规则把数据聚集到一个列表。

//        window第一个参数count:每个窗口应发射前的最大大小;第二个:在启动新窗口之前需要跳过多少项
        Observable.range(1, 5).window(5, 1).subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(final Observable<Integer> arg0) {
                System.out.println(arg0);
                arg0.subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("---"+integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Filtering Observables(过滤 Observable)
Debounce

操作间隔一定时间内没有做任何操作,数据才会发送到观察者。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> arg0) throws Exception {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(2000);
                    arg0.onNext(i);
                }
                arg0.onComplete();
            }
        }).debounce(1, TimeUnit.SECONDS).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
            }
        });
Distinct

去掉重复数据的操作符。

Observable.just(1, 2, 3, 2, 3).distinct().subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
ElementAt

取出指定位置的数据。

 Observable.just(1, 2, 3).elementAt(2).subscribe(new MaybeObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Filter

对数据进行指定规则的过滤。

Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {

                return integer > 2;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
First

取数据中的第一个数据。

//first参数:defaultItem: 当前Observable不发射任何内容时发出的默认项
        Observable.just(1, 2, 3,4,5).first(10).subscribe(new SingleObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }
        });
IgnoreElements

忽略所有的数据,不向观察者发送数据,直接回调onError或onComplete()。

 Observable.just(6, 9, 1, 3).distinct().ignoreElements().subscribe(new CompletableObserver() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onComplete() {

            }

            @Override
            public void onError(Throwable e) {

            }

        });
Last

列表数据最后指定的数位项数据。
SingleObserver只发射一条单一的数据,或者一条异常通知,不能发射完成通知,其中数据与通知只能发射一个。

        Observable.just(6, 4, 2,4).distinct().map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer.toString();
            }
        }).last("4").subscribe(new SingleObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {

            }
        });
 Observable.just(6, 4, 2,4).distinct().map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer.toString();
            }
        }).last("4").subscribe(new SingleObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {

            }
        });
Sample

对数据源进行样本采集,发送给观察者。

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> arg0) throws Exception {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(1000);
                    arg0.onNext(i);
                }
                arg0.onComplete();
            }
        }).sample(4, TimeUnit.SECONDS).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {

                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Skip

跳过指定列表项数据的指定项数据。

Observable.just(6, 3, 2, 1).skip(2).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
SkipLast

跳过列表数据的最后几位数据。

Observable.just(1, 2, 3, 5).skipLast(2).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Take

只取列表数据的前几项。

Observable.just(1, 2, 3, 4).take(2).takeLast(1).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
TakeLast

取列表数据项的最后几项数据。
Consumer是简易版的Observer,他有多重重载,可以自定义你需要处理的信息,他只提供一个回调接口accept,由于没有onError和onCompete,无法再 接受到onError或者onCompete之后,实现函数回调。

Observable.just(1, 2, 3, 4).takeLast(2).take(1).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });
Combining Observables(组合 Observable)
Zip

通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。当其中一个Observable发送数据结束或异常,另外一个也停止发送。

 Observable<Integer> observable = Observable.just(10, 20, 30);
        Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
        Observable.zip(observable, observable1, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Merge

合并多个Observables的发射物。

Observable<Integer> observable = Observable.just(10, 20, 30);
        Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
        Observable.merge(observable, observable1).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
StartWith

在数据序列的开头插入一条指定的项。

 Observable<Integer> observable = Observable.just(10, 20, 30);
        Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
        Disposable subscribe = observable.startWith(observable1).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });
CombineLatest

当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。

 Observable<Integer> observable = Observable.just(1, 3, 5);
        Observable<Integer> observable1 = Observable.just(2, 4, 6);
        Observable.combineLatest(observable, observable1, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Throwable {
                System.out.println("integer:" + integer + "---" + "integer2:" + integer2);
                return integer + integer2;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
Join

任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。

 		String[] args1 = new String[]{"张欣1", "张欣2", "张欣3", "张欣4", "张欣5"};
        String[] args2 = new String[]{"春晓1", "春晓2", "春晓3", "春晓4"};
        Observable<String> o1 = Observable.fromArray(args1);
        Observable<String> o2 = Observable.fromArray(args2);
        //相同的数组可以进行合并
        o2.join(o1, new Function<String, Observable<Long>>() {
            @Override
            public Observable<Long> apply(String s) throws Exception {
                return Observable.timer(2, TimeUnit.SECONDS);
            }
        }, new Function<String, Observable<Long>>() {
            @Override
            public Observable<Long> apply(String s) {
                return Observable.timer(2, TimeUnit.SECONDS);
            }
        }, new BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) throws Exception {
                return s + "-&--" + s2;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println(s);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
SwitchOnNext

将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。

        final Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
        final Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS);
        Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {
            @Override
            public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
                emitter.onNext(observable1);
                Thread.sleep(1000);
                // 此时发射一个新的observable2,将会取消订阅observable1
                emitter.onNext(observable2);
                emitter.onComplete();
            }
        });

        // 创建发射含有Error通知的Observable序列的Observable
        Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {

            @Override
            public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
                emitter.onNext(observable1);
//                emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 发射一个发射Error通知的Observable
//                emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 发射一个发射Error通知的Observable
                Thread.sleep(1000);
                // 此时发射一个新的observable2,将会取消订阅observable1
                emitter.onNext(observable2);
                emitter.onComplete();
            }
        });

        // 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
        // 可选参数 bufferSize: 缓存数据项大小
        // 接受一个发射Observable序列的Observable类型的sources,
        // 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据
        Disposable subscribe = Observable.switchOnNext(sources)
                .subscribe(new Consumer<Long>() {

                    @Override
                    public void accept(Long integer) throws Exception {
                        System.out.println("--> accept(1): " + integer);
                    }
                });
        System.out.println("--------------------------------------------------------------------");
        // 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
        // 可选参数 prefetch: 与读取数据项大小
        // 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据,
        // 保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者
        Observable.switchOnNextDelayError(sourcesError)
                .subscribe(new Observer<Long>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("--> onSubscribe(2)");
                    }

                    @Override
                    public void onNext(Long t) {
                        System.out.println("--> onNext(2): " + t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        // 判断是否是CompositeException对象(发生多个Observable出现Error时会发送的对象)
                        if (e instanceof CompositeException) {
                            CompositeException compositeException = (CompositeException) e;
                            List<Throwable> exceptions = compositeException.getExceptions();
                            System.out.println("--> onError(2): " + exceptions);
                        } else {
                            System.out.println("--> onError(2): " + e);
                        }
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("--> onComplete(2)");
                    }
                });
Error Handling Operators(处理错误)
Catch

Catch操作符拦截原始Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。
还有一个叫onErrorResumeNext的操作符,它的行为与Catch相似。
RxJava将Catch实现为三个不同的操作符:

  • onErrorReturn
    让Observable遇到错误时发射一个特殊的项并且正常终止。
  • onErrorResumeNext
    让Observable在遇到错误时开始发射第二个Observable的数据序列。
  • onExceptionResumeNext
    让Observable在遇到错误时继续发射后面的数据项。
 Observable.just(1,2,3).onErrorReturn(new Function<Throwable, Integer>() {
            @Override
            public Integer apply(Throwable throwable) throws Throwable {
                return  null;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });
Retry

如果原始Observable遇到错误,重新订阅它期望它能正常终止。
retryWhen和retry类似,区别是,retryWhen将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,retryWhen观察它的结果再决定是不是要重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。
retryWhen默认在trampoline调度器上执行,你可以通过参数指定其它的调度器。
场景:网络请求失败重试操作。

final AtomicInteger atomicInteger = new AtomicInteger(3);
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext(String.valueOf(System.currentTimeMillis()));
                emitter.onError(new Error(String.valueOf(atomicInteger.decrementAndGet())));
            }
        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Throwable {
                return throwableObservable;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println(s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });

更多资料

Schedulers(调度器)

它是RxJava以一种及其简单的方式解决多线程问题的机制。

种类

io()
用于I/O操作。
computation()
计算,计算工作默认的调度器,与I/O操作无关。
immediate()
立即执行,允许立即在当前线程执行你指定的工作。
newThread()
新线程,为指定任务创建新线程。
trampoline()
顺序处理,按需处理队列,并运行队列的每一个任务。

AndroidSchedulers

RxAndroid提供在Android平台的调度器(指定观察者在主线程)。

  • SubscribeOn 方法用于每个Observable对象
  • ObserveOn 方法用于每个Subscriber(Observer)对象
 observable.subscribeOn(Schedulers.newThread())
                    .unsubscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .onErrorResumeNext(new HttpErrorHandler<T>())
                    .subscribe(observer);

使用场景

与Retrofit结合使用

Retrofit使用参考之前的博文:Retrofit
Retrofit支持RxJava适配器,目前Retrofit官方最新的RxJava适配器已经更新到Rx3.x版本了。
Retrofit支持RxJava3.0

使用
retrofitBuilder = new Retrofit.Builder();
        retrofitBuilder.client(okHttpClient)
                .addConverterFactory(ScalarsConverterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create());
                
public <T> void doCall(LifecycleOwner owner, Observable<T> observable, final HttpCallBack<T> httpCallBack) {

        if (observable == null || httpCallBack == null) {
            throw new IllegalArgumentException("observable或HttpCallBack为空");
        }

        //观察者_网络请求状态
        BaseObserver<T> observer = new BaseObserver<T>() {
            @Override
            public void onNext(T t) {
                try {
                    if (t != null) {
                        httpCallBack.onSuccess(t);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    httpCallBack.onFailure(e);
                }
            }

            @Override
            public void onError(Throwable e) {
                httpCallBack.onFailure(e);
            }

        };

        if (owner == null) {
            //被观察者订阅观察者,根据生命周期取消订阅,子线程订阅主线程观察
            observable.subscribeOn(Schedulers.newThread())
                    .unsubscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .map(getAppErrorHandler())
                    .onErrorResumeNext(new HttpErrorHandler<T>())
                    .subscribe(observer);
        } else {
            //被观察者订阅观察者,根据生命周期取消订阅,子线程订阅主线程观察
            observable.subscribeOn(Schedulers.newThread())
                    .unsubscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .map(getAppErrorHandler())
                    .onErrorResumeNext(new HttpErrorHandler<T>())
                    .as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(owner, Lifecycle.Event.ON_DESTROY)))
                    .subscribe(observer);
        }
    }
与RxPermission结合使用

RxPermission是基于RxJava的Android动态权限申请框架。
Github地址:https://github.com/tbruyelle/RxPermissions

使用(简单封装)
    public void initPermissions(String[] permissions, PermissionResult permissionResult) {
        if (rxPermissions == null) {
            rxPermissions = new RxPermissions(this);
        }
        rxPermissions.requestEachCombined(permissions)
                .subscribe(permission -> {
                    if (permission.granted) {
                        permissionResult.onSuccess();
                    } else if (permission.shouldShowRequestPermissionRationale) {
                        permissionResult.onFailure();
                    } else {
                        permissionResult.onFailureWithNeverAsk();
                    }
                });
    }
代替EventBus

EventBus是一个Android端优化的publish/subscribe消息总线,简化了应用程序内各组件间、组件与后台线程间的通信。更多相关请参考Android事件总线之EventBus
RxJava也可以实现事件总线,因为它们都依据于观察者模式。我们使用RxJava替换EventBus,可以减少App的体积。

使用
	private static volatile RxBus instance;
    private PublishSubject<Object> mRxtBus;
    
    public static RxBus getDefault() {
        if (instance == null) {
            synchronized (RxBus.class) {
                instance = new RxBus();
            }
        }
        return instance;
    }

    private RxBus() {
        mRxtBus = PublishSubject.create();
    }

    public void post(String tag, Object event) {
        Message msg = new Message(tag, event);
        mRxtBus.onNext(msg);
    }

    public <T> Observable<T> toEvent(Class<T> eventType) {
        return mRxtBus.ofType(eventType);
    }
 }
  • 发送
RxBus.getDefault().post("payValue",code);
  • 接收
subscribe = RxBus.getDefault().toEvent(RxBus.Message.class).subscribe(new Consumer<RxBus.Message>() {
            @Override
            public void accept(RxBus.Message message) throws Throwable {
                if ("payValue".equals(message.getTag())) {
                    Log.e("yhj", "accept: " + message.getEvent().toString());
                }
            }
        });
  • 解绑
if (subscribe != null && !subscribe.isDisposed()) {
            subscribe.dispose();
        }

Rxjava内存泄漏的处理

Rxjava的使用不当会导致内存泄漏,使用AutoDispose可以解决这个问题,它是一个随Android生命周期事件自动解绑Rxjava订阅的方便工具。
Github地址:https://github.com/uber/AutoDispose

使用

结合JetPack的LifeCycle(生命周期感知型组件),根据生命周期取消订阅。

 //被观察者订阅观察者,根据生命周期取消订阅,子线程订阅主线程观察
            observable.subscribeOn(Schedulers.newThread())
                    .unsubscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .map(getAppErrorHandler())
                    .onErrorResumeNext(new HttpErrorHandler<T>())
                    .as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(owner, Lifecycle.Event.ON_DESTROY)))
                    .subscribe(observer);

总结

本文主要是对RxJava使用及Android常见使用场景进行总结,掌握这些还远远不够,RxJava还有许多强大的功能,诸如从磁盘/内存中获取缓存数据,背压策略,联想搜索优化等等。后面在项目开发中遇到相关场景再进行总结,更新。本文若有不当之处,请批评指正。


Copyright: 采用 知识共享署名4.0 国际许可协议进行许可

Links: https://www.yanghujun.com/archives/rxjava