转载

【转载】Rxjava3文档级教程一: 介绍和基本使用

温馨提示:
本文最后更新于 2024年03月04日,已超过 56 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

Rxjava3简介

ReactiveX的历史

ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。

什么是ReactiveX

微软给的定义是,Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers。

ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。

Observable拥有它的近亲Iterable的全部优雅与灵活。任何对Iterable的操作,你都可以对Observable使用。

RxJava

RxJava是响应式编程(Reactive Extensions)的java实现,它基于观察者模式的实现了异步编程接口。

Rxjava 3.x 的github官网

RxJava2将被支持到2021年2月28日,错误的会同时在2.x和3.x修复,但新功能只会在3.x上添加;

Rxjava 3.0的一些改变:官方Wiki

Rxjava 3.x 文档可以在官方javadoc中找到;

使用Rxjava3.x之前的准备工作:

添加依赖

 
arduino
复制代码
//RxJava的依赖包
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
//RxAndroid的依赖包  
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'

RxJava 在很长一段时间里以java6 为baseline( Android 运行时支持的锅),但在即将到来的 Android Studio 4预览中,一个叫做 desuging 的过程能够将许多 Java 7和8的特性,透明地转换成与 Java 6兼容的特性。因此我们可以将 RxJava 的基准提高到 java 8,并为许多 Java 8构造增加官方支持比如:Optional、Stream等,因此必须将项目的编译目标设置更改为 java8:

android {
    compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }
}
 

二 Rx中的一些概念

2.1 字段含义

  • Reactive 直译为反应性的,有活性的,根据上下文一般翻译为反应式、响应式

  • Iterable 可迭代对象,支持以迭代器的形式遍历,许多语言中都存在这个概念

  • Observable 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者

  • Observer 观察者对象,监听Observable发射的数据并做出响应,Subscriber是它的一个特殊实现

  • emit 直译为发射,发布,发出,含义是Observable在数据产生或变化时发送通知给Observer,调用Observer对应的方法,文章里一律译为发射

  • items 直译为项目,条目,在Rx里是指Observable发射的数据项,文章里一律译为数据,数据项。

2.2 上/下流

在RxJava中,数据以流的方式组织:Rxjava包括一个源数据流,源数据流后跟着若干个用于消费数据流的步骤。

source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer)

在代码中,对于operator2来说,在它前面叫做上流,在它后面的叫做下流。

2.3 流对象

在RxJava的文档中,emission, emits, item, event, signal, data and message都被认为在数据流中被传递的数据对象。

2.4 背压(Backpressure)

当上下游在不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射数据的速度快于下游接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压(backpressure)问题。

为此,RxJava带来了backpressure的概念。背压是一种流量的控制步骤,在不知道上流还有多少数据的情形下控制内存的使用,表示它们还能处理多少数据。背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。

在Rxjava1.0中,有的Observable支持背压,有的不支持,为了解决这种问题,2.0把支持背压和不支持背压的Observable区分开来:支持背压的有Flowable类,不支持背压的有Observable,Single, Maybe and Completable类。

  1. 在订阅的时候如果使用FlowableSubscriber,那么需要通过s.request(Long.MAX_VALUE)去主动请求上游的数据项。如果遇到背压报错的时候,FlowableSubscriber默认已经将错误try-catch,并通过onError()进行回调,程序并不会崩溃;

  2. 在订阅的时候如果使用Consumer,那么不需要主动去请求上游数据,默认已经调用了s.request(Long.MAX_VALUE)。如果遇到背压报错、且对Throwable的Consumer没有new出来,则程序直接崩溃;

  3. 背压策略的上游的默认缓存池是128。

背压策略:

  1. error, 缓冲区大概在128

  2. buffer, 缓冲区在1000左右

  3. drop, 把存不下的事件丢弃

  4. latest, 只保留最新的

  5. missing, 缺省设置,不做任何操作

 
public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}

2.5 线程调度器(Schedulers)

对于Android开发者而言,RxJava最简单的是通过调度器来方便地切换线程。在不同平台还有不同的调度器,例如我们Android的主线程:AndroidSchedulers.mainThread()。

调度器 功能
AndroidSchedulers.mainThread() 需要引用rxandroid, 切换到UI线程
Schedulers.computation() 用于计算任务,如事件循环和回调处理,默认线程数等于处理器数量
Schedulers.io() 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需求,它默认是一个CacheThreadScheduler
Schedulers.newThread() 为每一个任务创建一个新线程
Schedulers.trampoline() 在当前线程中立刻执行,如当前线程中有任务在执行则将其暂停, 等插入进来的任务执行完成之后,在将未完成的任务继续完成。
Scheduler.from(executor) 指定Executor作为调度器

2.6 事件调度器

RxJava事件发出去并不是置之不顾,要有合理的管理者来管理它们,在合适的时机要进行释放事件,这样才不会导致内存泄漏,这里的管理者我们称为事件调度器(或事件管理者)CompositeDisposable。

2.7 基类

RxJava 3 中的基类相比RxJava 2 没啥改变,主要有以下几个基类:

  • io.reactivex.Flowable:发送0个N个的数据,支持Reactive-Streams和背压
  • io.reactivex.Observable:发送0个N个的数据,不支持背压,
  • io.reactivex.Single:只能发送单个数据或者一个错误
  • io.reactivex.Completable:没有发送任何数据,但只处理 onComplete 和 onError 事件。
  • io.reactivex.Maybe:能够发射0或者1个数据,要么成功,要么失败。

2.8 Observables的"热"和"冷"

​ Observable什么时候开始发射数据序列?这取决于Observable的实现,一个"热"的Observable可能一创建完就开始发射数据,因此所有后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有一些数据错过了)。一个"冷"的Observable会一直等待,直到有观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列。

在一些ReactiveX实现里,还存在一种被称作Connectable的Observable,不管有没有观察者订阅它,这种Observable都不会开始发射数据,除非Connect方法被调用。

三 RxJava的简单使用

需要知道的是,RxJava以观察者模式为骨架,有两种常见的观察者模式:

  • Observable(被观察者)/Observer(观察者)
  • Flowable(被观察者)/Subscriber(观察者)

RxJava2/3中,Observeable用于订阅Observer,是不支持背压的,而Flowable用于订阅Subscriber,是支持背压(Backpressure)的。

3.1 Observable/Observer

Observable正常用法:

 
      Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onComplete();
        }
    });
 
    Observer mObserver=new Observer<Integer>() {
        //这是新加入的方法,在订阅后发送数据之前,
        //回首先调用这个方法,而Disposable可用于取消订阅
        @Override
        public void onSubscribe(Disposable d) {
 
        }
 
        @Override
        public void onNext(Integer value) {
            Log.e("lucas", "onNext: "+value );
        }
 
        @Override
        public void onError(Throwable e) {
 
        }
 
        @Override
        public void onComplete() {
 
        }
    };
 
    mObservable.subscribe(mObserver); 

这种观察者模型不支持背压:当被观察者快速发送大量数据时,下游不会做其他处理,即使数据大量堆积,调用链也不会报MissingBackpressureException,消耗内存过大只会OOM。所以,当我们使用Observable/Observer的时候,我们需要考虑的是,数据量是不是很大(官方给出以1000个事件为分界线作为参考)。

3.2 Flowable/Subscriber

 
        Flowable.range(0,10)
        .subscribe(new Subscriber<Integer>() {
            Subscription sub;
            //当订阅后,会首先调用这个方法,其实就相当于onStart(),
            //传入的Subscription s参数可以用于请求数据或者取消订阅
            @Override
            public void onSubscribe(Subscription s) {
                Log.w("TAG","onsubscribe start");
                sub=s;
                sub.request(1);
                Log.w("TAG","onsubscribe end");
            }
 
            @Override
            public void onNext(Integer o) {
                Log.w("TAG","onNext--->"+o);
                sub.request(1);
            }
            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }
            @Override
            public void onComplete() {
                Log.w("TAG","onComplete");
            }
        });   

输出如下:

 

onsubscribe start
onsubscribe end
onNext--->0
onNext--->1
onNext--->2
...
onNext--->9
onComplete
 

Flowable是支持背压的,也就是说,一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用request(n)来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。

当然,Flowable也可以通过creat()来创建:

    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onNext(4);
            e.onComplete();
        }
    }
    //需要指定背压策略
    , BackpressureStrategy.BUFFER); 

Flowable虽然可以通过create()来创建,但是你必须指定背压的策略,以保证你创建的Flowable是支持背压的。

3.3 其他的观察者

最常用的其实就是上面说的两种订阅观察者,但是一些情况下,我们也会用到一些其他的一类观察者比如

  • Single/SingleObserver
  • Completable/CompletableObserver
  • Maybe/MaybeObserver

3.3.1 Single/SingleObserver

Single类似于Observable,不同的是,它总是只发射一个值,或者一个错误通知,而不是发射一系列的值(当然就不存在背压问题),所以当你使用一个单一连续事件流,这样你可以使用Single。Single观察者只包含两个事件,一个是正常处理成功的onSuccess,另一个是处理失败的onError。因此,不同于Observable需要三个方法onNext, onError, onCompleted,订阅Single只需要两个方法:

  • onSuccess - Single发射单个的值到这个方法

  • onError - 如果无法发射需要的值,Single发射一个Throwable对象到这个方法

Single只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。

Single的操作符:

Single也可以组合使用多种操作,一些操作符让你可以混合使用Observable和Single:

操作符 返回值 说明
compose Single 创建一个自定义的操作符
concat and concatWith Observable 连接多个Single和Observable发射的数据
create Single 调用观察者的create方法创建一个Single
error Single 返回一个立即给订阅者发射错误通知的Single
flatMap Single 返回一个Single,它发射对原Single的数据执行flatMap操作后的结果
flatMapObservable Observable 返回一个Observable,它发射对原Single的数据执行flatMap操作后的结果
from Single 将Future转换成Single
just Single 返回一个发射一个指定值的Single
map Single 返回一个Single,它发射对原Single的数据执行map操作后的结果
merge Single 将一个Single(它发射的数据是另一个Single,假设为B)转换成另一个Single(它发射来自另一个Single(B)的数据)
merge and mergeWith Observable 合并发射来自多个Single的数据
observeOn Single 指示Single在指定的调度程序上调用订阅者的方法
onErrorReturn Single 将一个发射错误通知的Single转换成一个发射指定数据项的Single
subscribeOn Single 指示Single在指定的调度程序上执行操作
timeout Single 它给原有的Single添加超时控制,如果超时了就发射一个错误通知
toSingle Single 将一个发射单个值的Observable转换为一个Single
zip and zipWith Single 将多个Single转换为一个,后者发射的数据是对前者应用一个函数后的结果

操作符详细的图解可以参考英文文档:Single

    //被观察者
    Single<String> single = Single.create(new SingleOnSubscribe<String>() {
        @Override
        public void subscribe(SingleEmitter<String> e) throws Exception {
            e.onSuccess("test");
            e.onSuccess("test2");//错误写法,重复调用也不会处理,因为只会调用一次
        }
    });
 
    //订阅观察者SingleObserver
    single.subscribe(new SingleObserver<String>() {
        @Override
        public void onSubscribe(Disposable d) {
 
        }
 
        @Override
        public void onSuccess(String s) {
            //相当于onNext和onComplete
            Log.d("lucas",  s  );
        }
 
        @Override
        public void onError(Throwable e) {
 
        }
    });

//运行结果
2020-04-03 23:02:37.337 15462-15462/com.ysalliance.getfan.myapplication D/lucas: test 

3.3.2 Completable/CompletableObserver

如果你的观察者连onNext事件都不关心,可以使用Completable,它只有onComplete和onError两个事件:

 

    Completable.create(new CompletableOnSubscribe() {//被观察者
 
        @Override
        public void subscribe(CompletableEmitter e) throws Exception {
            e.onComplete();//单一onComplete或者onError
        }
 
    }).subscribe(new CompletableObserver() {//观察者
        @Override
        public void onSubscribe(Disposable d) {
 
        }
 
        @Override
        public void onComplete() {
            Log.e("lucas", "onComplete: ");
        }
 
        @Override
        public void onError(Throwable e) {
 
        }
    });
    
    //打印结果
2020-04-03 23:12:08.099 16264-16264/com.ysalliance.getfan.myapplication E/lucas: onComplete: 

要转换成其他类型的被观察者,也是可以使用toFlowable()、toObservable()等方法去转换。

3.3.3 Maybe/MaybeObserver

如果你有一个需求是可能发送一个数据或者不会发送任何数据,这时候你就需要Maybe,它类似于Single和Completable的混合体。

  Maybe可能会调用以下其中一种情况(也就是所谓的Maybe):

  • onSuccess或者onError

  • onComplete或者onError 可以看到onSuccess和onComplete是互斥的存在,例子代码如下:
  //被观察者
  Maybe<String> maybe = Maybe.create(new MaybeOnSubscribe<String>() {
      @Override
      public void subscribe(MaybeEmitter<String> e) throws Exception {
          e.onSuccess("test");//发送一个数据的情况,或者onError,不需要再调用onComplete(调用了也不会触发onComplete回调方法)
          //e.onComplete();//不需要发送数据的情况,或者onError
      }
  });
   
  //订阅观察者
  maybe.subscribe(new MaybeObserver<String>() {
      @Override
      public void onSubscribe(Disposable d) {
   
      }
   
      @Override
      public void onSuccess(String s) {
          //发送一个数据时,相当于onNext和onComplete,但不会触发另一个方法onComplete
          Log.i("lucas", s);
      }
   
      @Override
      public void onComplete() {
          //无数据发送时候的onComplete事件
          Log.i("lucas", "onComplete");
      }
   
      @Override
      public void onError(Throwable e) {
   
      }
  });
  
  //打印结果
2020-04-03 23:14:40.266 16558-16558/com.ysalliance.getfan.myapplication I/lucas: test 

要转换成其他类型的被观察者,也是可以使用toFlowable()、toObservable()等方法去转换。

//判断是否登陆
Maybe.just(isLogin())
    //可能涉及到IO操作,放在子线程
    .subscribeOn(Schedulers.newThread())
    //取回结果传到主线程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new MaybeObserver<Boolean>() {
            @Override
            public void onSubscribe(Disposable d) {
 
            }
 
            @Override
            public void onSuccess(Boolean value) {
                if(value){
                    ...
                }else{
                    ...
                }
            }
 
            @Override
            public void onError(Throwable e) {
 
            }
 
            @Override
            public void onComplete() {
 
            }
        });复制代码

上面就是Maybe/MaybeObserver的普通用法,你可以看到,实际上,这种观察者模式并不用于发送大量数据,而是发送单个数据,也就是说,当你只想要某个事件的结果(true or false)的时候,你可以用这种观察者模式

3.4 事件调度器释放事件


public class Main {
 
    private static CompositeDisposable mRxEvent = new CompositeDisposable();
 
    public static void main(String[] args) {
        Disposable subscribe = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("俊俊俊很帅");
                e.onNext("你值得拥有");
                e.onNext("取消关注");
                e.onNext("但还是要保持微笑");
                e.onComplete();
            }
        }).subscribe(
                new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        //对应onNext()
                        System.out.println("accept=" + s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        //对应onError()
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        //对应onComplete()
                    }
                }, new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        //对应onSubscribe()
                    }
                });
        
        mRxEvent.add(subscribe);
        mRxEvent.clear();
    }
}

CompositeDisposable提供的方法中,都是对事件的管理

  • dispose():释放所有事件
  • clear():释放所有事件,实现同dispose()
  • add():增加某个事件
  • addAll():增加所有事件
  • remove():移除某个事件并释放
  • delete():移除某个事件

3.5 小结

这是上面那些基类被观察者的上层接口:

 
//Observable接口
interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}
//Single接口
interface SingleSource<T> {
    void subscribe(SingleObserver<? super T> observer);
}
//Completable接口
interface CompletableSource {
    void subscribe(CompletableObserver observer);
}
//Maybe接口
interface MaybeSource<T> {
    void subscribe(MaybeObserver<? super T> observer);
}
//Flowable接口
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

其实我们可以看到,每一种观察者都继承自各自的接口,这也就把他们能完全的区分开,各自独立(特别是Observable和Flowable),保证了他们各自的拓展或者配套的操作符不会相互影响。

例如flatMap操作符实现:

//Flowable中flatMap的定义
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);

//Observable中flatMap的定义
Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);

假如你想为Flowable写一个自定义的操作符,那么只要保证Function< Publisher >中的类型实现了Publisher接口即可。这么说可能很抽象,大家不理解其实也没关系,因为并不推荐大家自定义操作符,RxJava中的操纵符的组合已经可以满足大家的需求了。

当然,你也会注意到上面那些接口中的subscribe()方法的返回类型为void了,在1.X中,这个方法一般会返回一个Subscription对象,用于取消订阅。现在,这个功能的对象已经被放到观察者Observer或者subscriber的内部实现方法中了,

Flowable/Subscriber

public interface Subscriber<T> {  
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}

上面的实例中,onSubscribe(Subscription s)传入的参数s就肩负着取消订阅的功能,当然,他也可以用于请求上游的数据。

在Observable/observer中,传入的参数是另一个对象

Observable/Observer

public interface Observer<T> {
   void onSubscribe(Disposable d);
    void onNext(T value);
    void onError(Throwable e);
    void onComplete();
}

public interface Disposable {
    /**
     * Dispose the resource, the operation should be idempotent.
     */
    void dispose();
    /**
     * Returns true if this resource has been disposed.
     * @return true if this resource has been disposed
     */
    boolean isDisposed();
}

在Observer接口中,onSubscribe(Disposable d)方法传入的Disposable也是用于取消订阅,基本功能是差不多的,只不过命名不一致,大家知道就好。

其实这种设计可以说还是符合逻辑的,因为取消订阅这个动作就只有观察者(Observer等)才能做的,现在把它并入到观察者内部,也算顺理成章吧。

Rxjava中,被观察者不能接收null作为数据源。

补充 1 Rxjava 3.x 主要更新内容如下API changes:

  • 将eagerTruncate添加到replay运算符,以便head节点将在截断时丢失它保留的项引用 (#6532)
  • 新增 X.fromSupplier() (#6529)
  • 使用 Scheduler 添加 concatMap,保证 mapper 函数的运行位置 (#6538)
  • 新增 startWithItem 和 startWithIterable (#6530)
  • ConnectableFlowable/ConnetableFlowable 重新设计 (#6519)
  • 将 as() 并入 to() (#6514)
  • 更改 Maybe.defaultIfEmpty() 以返回 Single (#6517)
  • 用 Supplier 代替 Callable (#6511)
  • 将一些实验操作符推广到标准 (#6537)
  • 从某些主题/处理器中删除 getValues() (#6516)
  • 删除 replay(Scheduler) 及其重载 (#6539)
  • 删除 dematerialize() (#6539)
  • 删除 startWith(T|Iterable) (#6530)
  • 删除 as() (#6514)
  • Maybe.toSingle(T) (#6517)
  • 删除 Flowable.subscribe(4 args) (#6517)
  • 删除 Observable.subscribe(4 args) (#6517)
  • 删除 Single.toCompletable() (#6517)
  • 删除 Completable.blockingGet() (#6517)

参考文章:

因为写RxJava系列的文章时进行了很多阅读和参考,因此不分一二三等,将全系列的参考引用统一如下:

RxJava3 Wiki

RxJava3官方github

ReactiveX文档中文翻译

single

操作符系列讲的很好的文章:blog.csdn.net/weixin_4204…

基础介绍:blog.csdn.net/weixin_4204…

RxJava3的一些简介:juejin.im/post/5d1eef…

观察者被观察者入门RxJava的一篇好文章:juejin.im/post/580103…

关于背压一个很好的介绍:juejin.im/post/582d41…

RxLifecycle:github.com/trello/RxLi…

 

作者:许进进
链接:https://juejin.cn/post/7020665574682263560
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
正文到此结束