RxJava2 方法总结
2020-12-13 03:34
但是每次看到RxJava的类中的几十个方法的时候,总是感觉心里没底。所以,我打算自己去专门写篇文章来从API的角度系统地梳理一下RxJava的各种方法和用法。
1、RxJava 基本
1.1 RxJava 简介
RxJava是一个在Java VM上使用可观测的序列来组成异步的、基于事件的程序的库。
虽然,在Android中,我们可以使用AsyncTask来完成异步任务操作,但是当任务的梳理比较多的时候,我们要为每个任务定义一个AsyncTask就变得非常繁琐。
RxJava能帮助我们在实现异步执行的前提下保持代码的清晰。
它的原理就是创建一个Observable
来完成异步任务,组合使用各种不同的链式操作,来实现各种复杂的操作,最终将任务的执行结果发射给Observer
进行处理。
当然,RxJava不仅适用于Android,也适用于服务端等各种场景。
我们总结以下RxJava的用途:
- 简化异步程序的流程;
- 使用近似于Java8的流的操作进行编程:因为想要在Android中使用Java8的流编程有诸多的限制,所以我们可以使用RxJava来实现这个目的。
在使用RxJava之前,我们需要先在自己的项目中添加如下的依赖:
compile ‘io.reactivex.rxjava2:rxjava:2.2.0‘
compile ‘io.reactivex.rxjava2:rxandroid:2.0.2‘
这里我们使用的是RxJava2,它与RxJava的第一个版本有些许不同。在本文中,我们所有的关于RxJava的示例都将基于RxJava2.
1.2 概要
下面是RxJava的一个基本的用例,这里我们定义了一个Observable
,然后在它内部使用emitter
发射了一些数据和信息(其实就相当于调用了被观察对象内部的方法,通知所有的观察者)。
然后,我们用Consumer
接口的实例作为subscribe()
方法的参数来观察发射的结果。(这里的接口的方法都已经被使用Lambda简化过,应该学着适应它。)
Observable observable = Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
});
observable.subscribe(System.out::println);
这样,我们就完成了一个基本的RxJava的示例。从上面的例子中,你或许没法看出Observable
中隐藏的流的概念,看下面的例子:
Observable.range(0, 10).map(String::valueOf).forEach(System.out::println);
这里我们先用Observable.range()
方法产生一个序列,然后用map
方法将该整数序列映射成一个字符序列,最后将得到的序列输出来。从上面看出,这种操作和Java8里面的Stream编程很像。但是两者之间是有区别的:
- 所谓的“推”和“拉”的区别:Stream中是通过从流中读取数据来实现链式操作,而RxJava除了Stream中的功能之外,还可以通过“发射”数据,来实现通知的功能,即RxJava在Stream之上又多了一个观察者的功能。
- Java8中的Stream可以通过
parall()
来实现并行,即基于分治算法将任务分解并计算得到结果之后将结果合并起来;而RxJava只能通过subscribeOn()
方法将所有的操作切换到某个线程中去。 - Stream只能被消费一次,但是
Observable
可以被多次进行订阅;
RxJava除了为我们提供了Observable
之外,在新的RxJava中还提供了适用于其他场景的基础类,它们之间的功能和主要区别如下:
-
Flowable
: 多个流,响应式流和背压 -
Observable
: 多个流,无背压 -
Single
: 只有一个元素或者错误的流 -
Completable
: 没有任何元素,只有一个完成和错误信号的流 -
Maybe
: 没有任何元素或者只有一个元素或者只有一个错误的流
除了上面的几个基础类之外,还有一个Disposable
。当我们监听某个流的时候,就能获取到一个Disposable
对象。它提供了两个方法,一个是isDisposed
,可以被用来判断是否停止了观察指定的流;另一个是dispose
方法,用来放弃观察指定的流,我们可以使用它在任意的时刻停止观察操作。
1.3 总结
上面我们介绍了了关于RxJava的基本的概念和使用方式,在下面的文章中我们会按照以上定义的顺序从API的角度来讲解以下RxJava各个模块的使用方法。
2、RxJava 的使用
2.1 Observable
从上面的文章中我们可以得知,Observable
和后面3种操作功能近似,区别在于Flowable
加入了背压的概念,Observable
的大部分方法也适用于其他3个操作和Flowable
。
因此,我们这里先从Observable
开始梳理,然后我们再专门对Flowable
和背压的进行介绍。
Observable
为我们提供了一些静态的构造方法来创建一个Observable
对象,还有许多链式的方法来完成各种复杂的功能。
这里我们按照功能将它的这些方法分成各个类别并依次进行相关的说明。
2.1.1 创建操作
1.interval & intervalRange
下面的操作可以每个3秒的时间发送一个整数,整数从0开始:
Observable.interval(3, TimeUnit.SECONDS).subscribe(System.out::println);
如果想要设置从指定的数字开始也是可以的,实际上interval
提供了许多重载方法供我们是使用。下面我们连同与之功能相近的intervalRange
方法也一同给出:
public static Observable
interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) public static Observable
interval(long period, TimeUnit unit, Scheduler scheduler) public static Observable
intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
这里的initialDelay
参数用来指示开始发射第一个整数的之前要停顿的时间,时间的单位与peroid
一样,都是通过unit
参数来指定的;period
参数用来表示每个发射之间停顿多少时间;unit
表示时间的单位,是TimeUnit
类型的;scheduler
参数指定数据发射和等待时所在的线程。
intervalRange
方法可以用来将发射的整数序列限制在一个范围之内,这里的start
用来表示发射的数据的起始值,count
表示总共要发射几个数字,其他参数与上面的interval
方法一致。
2.range & rangeLong
下面的操作可以产生一个从5开始的连续10个整数构成的序列:
Observable.range(5, 10).subscribe(i -> System.out.println("1: " + i));
该方法需要传入两个参数,与之有相同功能的方法还有rangeLong
:
public static Observable
range(final int start, final int count) public static Observable
rangeLong(long start, long count)
这里的两个参数start
用来指定用于生成的序列的开始值,count
用来指示要生成的序列总共包含多少个数字,上面的两个方法的主要区别在于一个是用来生成int型整数的,一个是用来生成long型整数的。
3.create
create
方法用于从头开始创建一个Observable
,像下面显示的那样,你需要使用create
方法并传一个发射器作为参数,在该发射器内部调用onNext
、onComplete
和onError
方法就可以将数据发送给监听者。
1 Observable.create((ObservableOnSubscribe) observableEmitter -> { 2 observableEmitter.onNext(1); 3 observableEmitter.onNext(2); 4 observableEmitter.onComplete(); 5 }).subscribe(System.out::println);
4.defer
defer
直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable。defer
操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。比如下面的代码两个订阅输出的结果是不一致的:
Observable observable = Observable.defer((Callable>>) () -> Observable.just(System.currentTimeMillis()));
observable.subscribe(System.out::print);
System.out.println();
observable.subscribe(System.out::print);
下面是该方法的定义,它接受一个Callable对象,可以在该对象中返回一个Observable的实例:
public static
5.empty & never & error
-
public static
:创建一个不发射任何数据但是正常终止的Observable;Observable empty() -
public static
:创建一个不发射数据也不终止的Observable;Observable never() -
public static
:创建一个不发射数据以一个错误终止的Observable,它有几个重载版本,这里给出其中的一个。Observable error(Throwable exception)
测试代码:
Observable.empty().subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));
Observable.never().subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));
Observable.error(new Exception()).subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));
输出结果:completeerror
6.from 系列
from
系列的方法用来从指定的数据源中获取一个Observable:
-
public static
:从数组中获取;Observable fromArray(T... items) -
public static
:从Callable中获取;Observable fromCallable(Callable extends T> supplier) -
public static
:从Future中获取,有多个重载版本,可以用来指定线程和超时等信息;Observable fromFuture(Future extends T> future) -
public static
:从Iterable中获取;Observable fromIterable(Iterable extends T> source) -
public static
:从Publisher中获取。Observable fromPublisher(Publisher extends T> publisher)
7.just 系列
just系列的方法的一个参数的版本为下面的形式:public static
,它还有许多个重载的版本,区别在于接受的参数的个数不同,最少1个,最多10个。
8.repeat
该方法用来表示指定的序列要发射多少次,下面的方法会将该序列无限次进行发送:
Observable.range(5, 10).repeat().subscribe(i -> System.out.println(i));
repeat
方法有以下几个相似方法:
public final Observable
repeat() public final Observable
repeat(long times) public final Observable
repeatUntil(BooleanSupplier stop) public final Observable
repeatWhen(Function super Observable
第1个无参的方法会无限次地发送指定的序列(实际上内部调用了第2个方法并传入了Long.MAX_VALUE),第2个方法会将指定的序列重复发射指定的次数;第3个方法会在满足指定的要求的时候停止重复发送,否则会一直发送。
9.timer
timer操作符创建一个在给定的时间段之后返回一个特殊值的Observable,它在延迟一段给定的时间后发射一个简单的数字0。比如下面的程序会在500毫秒之后输出一个数字0
。
Observable.timer(500, TimeUnit.MILLISECONDS).subscribe(System.out::print);
下面是该方法及其重载方法的定义,重载方法还可以指定一个调度器:
public static Observable
timer(long delay, TimeUnit unit) public static Observable
timer(long delay, TimeUnit unit, Scheduler scheduler)
2.1.2 变换操作
1.map & cast
-
map
操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。默认不在任何特定的调度器上执行。 -
cast
操作符将原始Observable发射的每一项数据都强制转换为一个指定的类型(多态),然后再发射数据,它是map的一个特殊版本:
下面的第一段代码用于将生成的整数序列转换成一个字符串序列之后并输出;第二段代码用于将Date类型转换成Object类型并进行输出,这里如果前面的Class无法转换成第二个Class就会出现异常:
Observable.range(1, 5).map(String::valueOf).subscribe(System.out::println);
Observable.just(new Date()).cast(Object.class).subscribe(System.out::print);
这两个方法的定义如下:
public final
Observable map(Function super T, ? extends R> mapper) public final Observable cast(Class clazz)
这里的mapper
函数接受两个泛型,一个表示原始的数据类型,一个表示要转换之后的数据类型,转换的逻辑写在该接口实现的方法中即可。
2.flatMap & contactMap
flatMap
将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里。需要注意的是, flatMap并不保证事件的顺序,也就是说转换之后的Observables的顺序不必与转换之前的序列的顺序一致。比如下面的代码用于将一个序列构成的整数转换成多个单个的Observable
,然后组成一个OBservable
,并被订阅。下面输出的结果仍将是一个字符串数字序列,只是顺序不一定是增序的。
Observable.range(1, 5)
.flatMap((FunctionInteger, ObservableSourceString>>) i -> Observable.just(String.valueOf(i)))
.subscribe(System.out::println);
与flatMap
对应的方法是contactMap
,后者能够保证最终输出的顺序与上游发送的顺序一致。下面是这两个方法的定义:
public final
Observable flatMap(Function super T, ? extends ObservableSource extends R>> mapper) public final
Observable concatMap(Function super T, ? extends ObservableSource extends R>> mapper)
flatMap
的重载方法数量过多,它们在数据源方面略有不同,有的支持错误等可选参数,具体可以参考源代码。
3.flatMapIterable
flatMapIterable
可以用来将上流的任意一个元素转换成一个Iterable
对象,然后我们可以对其进行消费。在下面的代码中,我们先生成一个整数的序列,然后将每个整数映射成一个Iterable
类型,最后,我们对其进行订阅和消费:
Observable.range(1, 5)
.flatMapIterable((FunctionInteger, IterableString>>) integer -> Collections.singletonList(String.valueOf(integer)))
.subscribe(s -> System.out.println("flatMapIterable : " + s));
下面是该方法及其重载方法的定义:
public final Observable flatMapIterable(Function super T, ? extends Iterable extends U>> mapper)
public final Observable
flatMapIterable(Function super T, ? extends Iterable extends U>> mapper, BiFunction super T, ? super U, ? extends V> resultSelector)
4.buffer
该方法用于将整个流进行分组。以下面的程序为例,我们会先生成一个7个整数构成的流,然后使用buffer
之后,这些整数会被3个作为一组进行输出,所以当我们订阅了buffer
转换之后的Observable
之后得到的是一个列表构成的OBservable
:
Observable.range(1, 7).buffer(3)
.subscribe(integers -> System.out.println(Arrays.toString(integers.toArray())));
下面是这个方法及其重载方法的定义,它的重载方法太多,这里我们只给出其中的两个,其他的可以参考RxJava的源码。这里的buffer应该理解为一个缓冲区,当缓冲区满了或者剩余的数据不够一个缓冲区的时候就将数据发射出去。
public final Observable
- > buffer(int count)
public final Observable
- > buffer(int count, int skip)
- ...
5.groupBy
groupBy
用于分组元素,它可以被用来根据指定的条件将元素分成若干组。它将得到一个Observable
类型的Observable
。如下面的程序所示,这里我们使用concat
方法先将两个Observable
拼接成一个Observable
,然后对其元素进行分组。这里我们的分组依据是整数的值,这样我们将得到一个Observable
类型的Observable
。然后,我们再将得到的序列拼接成一个并进行订阅输出:
Observable> observable = Observable.concat(
Observable.range(1,4), Observable.range(1,6)).groupBy(integer -> integer);
Observable.concat(observable).subscribe(integer -> System.out.println("groupBy : " + integer));
该方法有多个重载版本,这里我们用到的一个的定义是:
public final
6.scan
scan
操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做accumulator。
以下面的程序为例,该程序的输结果是2 6 24 120 720
,可以看出这里的计算规则是,我们把传入到scan
中的函数记为f
,序列记为x
,生成的序列记为y
,那么这里的计算公式是y(0)=x(0); y(i)=f(y(i-1), x(i)), i>0
:
Observable.range(2, 5).scan((i1, i2) -> i1 * i2).subscribe(i -> System.out.print(i + " "));
除了上面的这种形式,scan
方法还有一个重载的版本,我们可以使用这个版本的方法来在生成序列的时候指定一个初始值。以下面的程序为例,它的输出结果是3 6 18 72 360 2160
,可以看出它的输出比上面的形式多了1个,这是因为当指定了初始值之后,生成的第一个数字就是那个初始值,剩下的按照我们上面的规则进行的。所以,用同样的函数语言来描述的话,那么它就应该是下面的这种形式:y(0)=initialValue; y(i)=f(y(i-1), x(i)), i>0
。
Observable.range(2, 5).scan(3, (i1, i2) -> i1 * i2).subscribe(i -> System.out.print(i + " "));
以上方法的定义是:
public final Observable
scan(BiFunction accumulator) public final
Observable scan(R initialValue, BiFunction accumulator)
7.window
window
Window和Buffer类似,但不是发射来自原始Observable的数据包,它发射的是Observable,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发射一个onCompleted通知。
以下面的程序为例,这里我们首先生成了一个由10个数字组成的整数序列,然后使用window
函数将它们每3个作为一组,每组会返回一个对应的Observable对象。
这里我们对该返回的结果进行订阅并进行消费,因为10个数字,所以会被分成4个组,每个对应一个Observable:
Observable.range(1, 10).window(3).subscribe(
observable -> observable.subscribe(integer -> System.out.println(observable.hashCode() + " : " + integer)));
除了对数据包进行分组,我们还可以根据时间来对发射的数据进行分组。该方法有多个重载的版本,这里我们给出其中的比较具有代表性的几个:
public final Observable
> window(long count) public final Observable
> window(long timespan, long timeskip, TimeUnit unit) public final Observable
> window(ObservableSource boundary) public final Observable
> window(Callable extends ObservableSource> boundary)
2.1.3 过滤操作
1.filter
filter
用来根据指定的规则对源进行过滤,比如下面的程序用来过滤整数1到10中所有大于5的数字:
Observable.range(1,10).filter(i -> i > 5).subscribe(System.out::println);
下面是该方法的定义:
public final Observable
filter(Predicate super T> predicate)
2.elementAt & firstElement & lastElement
elementAt
用来获取源中指定位置的数据,它有几个重载方法,这里我们介绍一下最简单的一个方法的用法。下面是elementAt
的一个示例,它将获取源数据中索引为1的元素并交给观察者订阅。下面的程序将输出1
Observable.range(1, 10).elementAt(0).subscribe(System.out::print);
这里我们给出elementAt
及其相关的方法的定义,它们的使用相似。注意一下这里的返回类型:
public final Maybe
elementAt(long index) public final Single
elementAt(long index, T defaultItem) public final Single
elementAtOrError(long index)
除了获取指定索引的元素的方法之外,RxJava中还有可以用来直接获取第一个和最后一个元素的方法,这里我们直接给出方法的定义:
public final Maybe
firstElement() public final Single
first(T defaultItem) public final Single
firstOrError() public final Maybe
lastElement() public final Single
last(T defaultItem) public final Single
lastOrError()
3.distinct & distinctUntilChanged
distinct
用来对源中的数据进行过滤,以下面的程序为例,这里会把重复的数字7过滤掉:
Observable.just(1,2,3,4,5,6,7,7).distinct().subscribe(System.out::print);
与之类似的还有distinctUntilChanged
方法,与distinct
不同的是,它只当相邻的两个元素相同的时候才会将它们过滤掉。比如下面的程序会过滤掉其中的2和5,所以最终的输出结果是12345676
:
Observable.just(1,2,2,3,4,5,5,6,7,6).distinctUntilChanged().subscribe(System.out::print);
该方法也有几个功能相似的方法,这里给出它们的定义如下:
public final Observable
distinct() public final
Observable distinct(Function super T, K> keySelector) public final
Observable distinct(Function super T, K> keySelector, Callable extends Collection super K>> collectionSupplier) public final Observable
distinctUntilChanged() public final
Observable distinctUntilChanged(Function super T, K> keySelector) public final Observable
distinctUntilChanged(BiPredicate super T, ? super T> comparer)
4.skip & skipLast & skipUntil & skipWhile
skip
方法用于过滤掉数据的前n项,比如下面的程序将会过滤掉前2项,因此输出结果是345
:
Observable.range(1, 5).skip(2).subscribe(System.out::print);
与skip
方法对应的是take
方法,它用来表示只选择数据源的前n项,该方法的示例就不给出了。这里,我们说一下与之类功能类似的重载方法。skip
还有一个重载方法接受两个参数,用来表示跳过指定的时间,也就是在指定的时间之后才开始进行订阅和消费。下面的程序会在3秒之后才开始不断地输出数字:
Observable.range(1,5).repeat().skip(3, TimeUnit.SECONDS).subscribe(System.out::print);
与skip
功能相反的方法的还有skipLast
,它用来表示过滤掉后面的几项,以及最后的一段时间不进行发射等。比如下面的方法,我们会在程序开始之前进行计时,然后会不断重复输出数字,直到5秒之后结束。然后,我们用skipLast
方法表示最后的2秒不再进行发射。所以下面的程序会先不断输出数字3秒,3秒结束后停止输出,并在2秒之后结束程序:
long current = System.currentTimeMillis();
Observable.range(1,5)
.repeatUntil(() -> System.currentTimeMillis() - current > TimeUnit.SECONDS.toMillis(5))
.skipLast(2, TimeUnit.SECONDS).subscribe(System.out::print);
与上面的这些方法类似的还有一些,这里我们不再一一列举。因为这些方法的重载方法比较多,下面我们给出其中的具有代表性的一部分:
public final Observable
skip(long count) public final Observable
skip(long time, TimeUnit unit, Scheduler scheduler) public final Observable
skipLast(int count) public final Observable
skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) public final Observable
skipUntil(ObservableSource other) public final Observable
skipWhile(Predicate super T> predicate)
5.take & takeLast & takeUntil & takeWhile
与skip
方法对应的是take
方法,它表示按照某种规则进行选择操作。我们以下面的程序为例,这里第一段程序表示只发射序列中的前2个数据:
Observable.range(1, 5).take(2).subscribe(System.out::print);
下面的程序表示只选择最后2秒中输出的数据:
long current = System.currentTimeMillis();
Observable.range(1,5)
.repeatUntil(() -> System.currentTimeMillis() - current > TimeUnit.SECONDS.toMillis(5))
.takeLast(2, TimeUnit.SECONDS).subscribe(System.out::print);
下面是以上相关的方法的定义,同样的,我们只选择其中比较有代表性的几个:
public final Observable
take(long count) public final Observable
takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) public final Observable
takeUntil(ObservableSource other) public final Observable
takeUntil(Predicate super T> stopPredicate) public final Observable
takeWhile(Predicate super T> predicate)
6.ignoreElements
该方法用来过滤所有源Observable产生的结果,只会把Observable的onComplete和onError事件通知给订阅者。下面是该方法的定义:
public final Completable ignoreElements()
7.throttleFirst & throttleLast & throttleLatest & throttleWithTimeout
这些方法用来对输出的数据进行限制,它们是通过时间的”窗口“来进行限制的,你可以理解成按照指定的参数对时间进行分片,然后根据各个方法的要求选择第一个、最后一个、最近的等进行发射。下面是throttleLast
方法的用法示例,它会输出每个500毫秒之间的数字中最后一个数字:
Observable.interval(80, TimeUnit.MILLISECONDS)
.throttleLast(500, TimeUnit.MILLISECONDS)
.subscribe(i -> System.out.print(i + " "));
其他的几个方法的功能大致列举如下:
-
throttleFirst
只会发射指定的Observable在指定的事件范围内发射出来的第一个数据; -
throttleLast
只会发射指定的Observable在指定的事件范围内发射出来的最后一个数据; -
throttleLatest
用来发射距离指定的时间分片最近的那个数据; -
throttleWithTimeout
仅在过了一段指定的时间还没发射数据时才发射一个数据,如果在一个时间片达到之前,发射的数据之后又紧跟着发射了一个数据,那么这个时间片之内之前发射的数据会被丢掉,该方法底层是使用debounce
方法实现的。如果数据发射的频率总是快过这里的timeout
参数指定的时间,那么将不会再发射出数据来。
下面是这些方法及其重载方法的定义(选择其中一部分):
public final Observable
throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler) public final Observable
throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) public final Observable
throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler, boolean emitLast) public final Observable
throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler)
8.debounce
debounce
也是用来限制发射频率过快的,它仅在过了一段指定的时间还没发射数据时才发射一个数据。我们通过下面的图来说明这个问题:
这里红、绿、蓝三个球发射出来的原因都是因为当反射了这个球之后的一定的时间内没有其他的球发射出来,这个时间是我们可以通过参数来指定的。
该方法的用法与throttle
之类的方法类似,上面也说过throttle
那些方法底层用了debounce
实现,所以,这里我们不再为该方法专门编写相关的测试代码。
9.sample
实际上throttleLast
的实现中内部调用的就是sample
。
2.1.4 组合操作
1.startWith & startWithArray
startWith
方法可以用来在指定的数据源的之前插入几个数据,它的功能类似的方法有startWithArray
,另外还有几个重载方法。这里我们给出一个基本的用法示例,下面的程序会在原始的数字流1-5的前面加上0,所以最终的输出结果是012345
:
Observable.range(1,5).startWith(0).subscribe(System.out::print);
下面是startWith
及其几个功能相关的方法的定义:
public final Observable
startWith(Iterable extends T> items) public final Observable
startWith(ObservableSource extends T> other) public final Observable
startWith(T item) public final Observable
startWithArray(T... items)
2.merge & mergeArray
merge
可以让多个数据源的数据合并起来进行发射,当然它可能会让merge
之后的数据交错发射。下面是一个示例,这个例子中,我们使用merge
方法将两个Observable
合并到了一起进行监听:
Observable.merge(Observable.range(1,5), Observable.range(6,5)).subscribe(System.out::print);
鉴于merge
方法及其功能类似的方法太多,我们这里挑选几个比较有代表性的方法,具体的可以查看RxJava的源代码:
public static
Observable merge(Iterable extends ObservableSource extends T>> sources) public static
Observable mergeArray(ObservableSource extends T>... sources) public static
Observable mergeDelayError(Iterable extends ObservableSource extends T>> sources) public static
Observable mergeArrayDelayError(ObservableSource extends T>... sources)
这里的mergeError
方法与merge
方法的表现一致,只是在处理由onError
触发的错误的时候有所不同。mergeError
方法会等待所有的数据发射完毕之后才把错误发射出来,即使多个错误被触发,该方法也只会发射出一个错误信息。而如果使用merger
方法,那么当有错误被触发的时候,该错误会直接被抛出来,并结束发射操作。下面是该方法的一个使用的示例,这里我们主线程停顿4秒,然后所有merge
的Observable中的一个会在线程开始的第2秒的时候触发一个错误,该错误最终会在所有的数据发射完毕之后被发射出来:
Observable.mergeDelayError(Observable.range(1,5),
Observable.range(1,5).repeat(2),
Observable.create((ObservableOnSubscribe) observableEmitter -> {
Thread.sleep(2000);
observableEmitter.onError(new Exception("error"));
})
).subscribe(System.out::print, System.out::print);
Thread.sleep(4000);
3.concat & concatArray & concatEager
该方法也是用来将多个Observable拼接起来,但是它会严格按照传入的Observable的顺序进行发射,一个Observable没有发射完毕之前不会发射另一个Observable里面的数据。下面是一个程序示例,这里传入了两个Observable,会按照顺序输出12345678910
:
Observable.concat(Observable.range(1, 5), Observable.range(6, 5)).subscribe(System.out::print);
下面是该方法的定义,鉴于该方法及其重载方法太多,这里我们选择几个比较有代表性的说明:
public static
Observable concat(Iterable extends ObservableSource extends T>> sources) public static
Observable concatDelayError(Iterable extends ObservableSource extends T>> sources) public static
Observable concatArray(ObservableSource extends T>... sources) public static
Observable concatArrayDelayError(ObservableSource extends T>... sources) public static
Observable concatEager(ObservableSource extends ObservableSource extends T>> sources) public static
Observable concatArrayEager(ObservableSource extends T>... sources)
对于concat
方法,我们之前已经介绍过它的用法;这里的conactArray
的功能与之类似;对于concatEager
方法,当一个观察者订阅了它的结果,那么就相当于订阅了它拼接的所有ObservableSource
,并且会先缓存这些ObservableSource发射的数据,然后再按照顺序将它们发射出来。而对于这里的concatDelayError
方法的作用和前面的mergeDelayError
类似,只有当所有的数据都发射完毕才会处理异常。
4.zip & zipArray & zipIterable
zip
操作用来将多个数据项进行合并,可以通过一个函数指定这些数据项的合并规则。比如下面的程序的输出结果是6 14 24 36 50
,显然这里的合并的规则是相同索引的两个数据的乘积。不过仔细看下这里的输出结果,可以看出,如果一个数据项指定的位置没有对应的值的时候,它是不会参与这个变换过程的:
Observable.zip(Observable.range(1, 6), Observable.range(6, 5), (integer, integer2) -> integer * integer2)
.subscribe(i -> System.out.print(i + " "));
zip
方法有多个重载的版本,同时也有功能近似的方法,这里我们挑选有代表性的几个进行说明:
public static
Observable zip(Iterable extends ObservableSource extends T>> sources, Function super Object[], ? extends R> zipper) ublic static
Observable zipArray(Function super Object[], ? extends R> zipper, boolean delayError, int bufferSize, ObservableSource... sources) public static
Observable zipIterable(Iterable extends ObservableSource extends T>> sources, Function super Object[], ? extends R> zipper, boolean delayError, int bufferSize)
实际上上面几个方法的用法和功能基本类似,区别在于传入的ObservableSource
的参数的形式。
5.combineLastest
与zip
操作类似,但是这个操作的输出结果与zip
截然不同,以下面的程序为例,它的输出结果是36 42 48 54 60
:
Observable.combineLatest(Observable.range(1, 6), Observable.range(6, 5), (integer, integer2) -> integer * integer2)
.subscribe(i -> System.out.print(i + " "));
利用下面的这张图可以比较容易来说明这个问题:
[图片上传失败...(image-5b184-1534261018262)]
上图中的上面的两条横线代表用于拼接的两个数据项,下面的一条横线是拼接之后的结果。combineLatest
的作用是拼接最新发射的两个数据。下面我们用上图的过程来说明该方法是如何执行的:开始第一条只有1的时候无法拼接,;当第二条出现A的时候,此时最新的数据是1和A,故组合成一个1A;第二个数据项发射了B,此时最新的数据是1和B,故组合成1B;第一条横线发射了2,此时最新的数据是2和B,因此得到了2B,依次类推。然后再回到我们上面的问题,第一个数据项连续发射了5个数据的时候,第二个数据项一个都没有发射出来,因此没有任何输出;然后第二个数据项开始发射数据,当第二个数据项发射了6的时候,此时最新的数据组合是6和6,故得36;然后,第二个数据项发射了7,此时最新的数据组合是6和7,故得42,依次类推。
该方法也有对应的combineLatestDelayError
方法,用途也是只有当所有的数据都发射完毕的时候才去处理错误逻辑。
2.1.5 辅助操作
1.delay
delay
方法用于在发射数据之前停顿指定的时间,比如下面的程序会在真正地发射数据之前停顿1秒:
Observable.range(1, 5).delay(1000, TimeUnit.MILLISECONDS).subscribe(System.out::print);
Thread.sleep(1500);
同样delay
方法也有几个重载的方法,可以供我们用来指定触发的线程等信息,这里给出其中的两个,其他的可以参考源码和文档:
public final Observable
delay(long delay, TimeUnit unit) public final Observable
delay(long delay, TimeUnit unit, Scheduler scheduler)
2.do系列
RxJava中还有一系列的方法可以供我们使用,它们共同的特点是都是以do
开头,下面我们列举一下这些方法并简要说明一下它们各自的用途:
-
public final Observable
,会在doAfterNext(Consumer super T> onAfterNext) onNext
方法之后触发; -
public final Observable
,会在Observable终止之后触发;doAfterTerminate(Action onFinally) -
public final Observable
,当doFinally(Action onFinally) onComplete
或者onError
的时候触发; -
public final Observable
,当被dispose的时候触发;doOnDispose(Action onDispose) -
public final Observable
,当complete的时候触发;doOnComplete(Action onComplete) -
public final Observable
,当每个doOnEach(final Observer super T> observer) onNext
调用的时候触发; -
public final Observable
,当调用doOnError(Consumer super Throwable> onError) onError
的时候触发; public final Observable
doOnLifecycle(final Consumer super Disposable> onSubscribe, final Action onDispose) -
public final Observable
,,会在doOnNext(Consumer super T> onNext) onNext
的时候触发; -
public final Observable
,会在订阅的时候触发;doOnSubscribe(Consumer super Disposable> onSubscribe) -
public final Observable
,当终止之前触发。doOnTerminate(final Action onTerminate)
这些方法可以看作是对操作执行过程的一个监听,当指定的操作被触发的时候会同时触发这些监听方法:
Observable.range(1, 5)
.doOnEach(integerNotification -> System.out.println("Each : " + integerNotification.getValue()))
.doOnComplete(() -> System.out.println("complete"))
.doFinally(() -> System.out.println("finally"))
.doAfterNext(i -> System.out.println("after next : " + i))
.doOnSubscribe(disposable -> System.out.println("subscribe"))
.doOnTerminate(() -> System.out.println("terminal"))
.subscribe(i -> System.out.println("subscribe : " + i));
&l
下一篇:php验证登录