博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RxJava如何结合观察者与链式处理
阅读量:6257 次
发布时间:2019-06-22

本文共 3640 字,大约阅读时间需要 12 分钟。

RxJava如何结合观察者与链式处理

Author: Dorae

Date: 2018年12月3日17:10:31 转载请注明出处


一、概述

首先问自己几个问题,如果非常清楚这几个问题的目的与答案,那么恭喜你,不用继续往下看了-_-。

  1. RxJava是干什么的;
  2. 链式调用中当存在数个Observable.subscribeOn()时的处理方式;
  3. 链式调用中当存在数个Observable.observeOn()时的处理方式;
  4. 数据是如何经过操作符进行的处理。

回顾

观察者模式

如图1-1所示

图 1-1

Java8的stream

二、RxJava是什么

一款为了简化异步调用,且功能比较全面的框架。

三、RxJava如何结合了观察者模式与链式处理

参见Java8中的sink链,在RxJava中同样实现了链式处理。如代码片段code1-1所示,我们对其结构进行分析:

code1-1

Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(ObservableEmitter
emitter) throws Exception { emitter.onNext("Dorae"); } }) .filter(e -> e.contains("o")) .map(e -> "AfterMap: " + e) .filter(e -> e.contains("D")) .subscribe(new Observer
() { @Override public void onNext(@NonNull String o) { System.out.println("观察者 onNext: " + o); } @Override public void onSubscribe(Disposable d) { System.out.println("观察者onSubscribe: " + d + "### " + Thread.currentThread().getName()); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });复制代码

1、首先看下其输出结果:

观察者onSubscribe: io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver@52d455b8### main

观察者 onNext: AfterMap: Dorae

2、现在来对如何输出这段结果进行分析

首先了解下RxJava中的几种基本角色:

  • Observable,是RxJava描述的事件流,可以说其与Observer构成了RxJava的基础。在链式调用中,事件从创建到加工到最后被Observer接受,其实就是由一条Observerable链串起来的。
  • Observer,RxJava中的订阅者,也就是需要对事件进行响应的一个角色。其实除了我们通常自己实现的一个Observer,在链中的每一步都会产生一个Observer,然后这些Observer构成一条链,最终完成整个链的计算。
  • ObservableOnSubscribe,整个事件流的源头,通常需要我们自己实现,其依赖一个Emitter。
  • Emitter,可以将其理解为触发器,推动整个流程的运转。
  • Scheduler,这个其实不用太过关心,RxJava用其封装了Thread,用于完成线程切换等任务。

是不是感觉上边的一堆废话非常枯燥?先上一张RxJava的核心结构,如图3-1所示。

图 3-1

现在我们再来看看code1-1,其最终形成的Observable链如图3-2所示,每次调用map、filter等操作,都会生成一个新对象,并且保持了一个对上游的引用(用于生成Observer链)。

图 3-2

Observer链如图3-3所示,整个事件流程由CreateEmitter触发,最终交由我们的实现Observer$1处理。

图 3-3

看了上边几张图之后,是不是感觉清晰了很多?那么让我们进一步看下Rxjava如何完成了一键线程切换。

四、RxJava如何实现线程切换

通常我们使用RxJava的线程切换功能时,只需要在调用链中加上一句subscribeOn()或observeOn(),其中Scheduler如上所述,其实就是一个包装了ThreadPool的调度器。那么我们先来看下相关源码。

1、subscribeOn

如代码code4-1所示,为subscribeOn的核心代码。很明显,其中在新线程中只是简单的直接调用了source,也就是说这里之后的所有操作均在一个新线程中进行,和单线程并没有什么区别。

code 4-1

public final Observable
subscribeOn(Scheduler scheduler) { return new ObservableSubscribeOn
() { @Override public void subscribeActual(final Observer
observer) { scheduler.createWorker().schedule(new SubscribeTask() { @Override public void run() { source.subscribe(e); } }); } };}复制代码

2、observeOn

如代码段code4-2所示,为observeOn的核心逻辑,可以看出其在订阅阶段(生成Observer链的阶段)还是在当前线程执行,只有触发之后,到了ObserverOn的Observer的节点时才会真正的切换到新线程。

code 4-2

public final Observable
observeOn(Scheduler scheduler) { return new ObservableOnSubscribe
() { @Override public void subscribeActual(@NonNull Observer
e) { source.subscribe(new Observer
() { @Override public void onNext(T var1) { scheduler.createWorker().schedule(new Runnable() { @Override public void run() { e.onNext(var1); } }); } }); } };}复制代码

多次Observable.subscribeOn()、多次Observable.observeOn()会发生什么

通过上述code4-1、code4-2的分析,是不是可以推断出当多次subscribeOn时会发生什么?没错,虽然每次subscribeOn都会产生一次线程切换,但是真正起作用的只有最开始的一次subscribeOn,也就相当于只在最初的位置调用了subscribeOn;对于observeOn也是类似,每次都会产生新线程,但是每次都会产生一定的影响,也就是每个线程都承担了一部分工作。

小结

通过本文,我们可以简要了解到RxJava的基本原理,但是对于其丰富的api还需要在实践中进行磨合。但是,RxJava既然作为一个异步框架,其必然有一定的局限,比如其切换线程时无法阻塞当前线程(这种对于Android等需要渲染或者网络IO的需求来说非常适用),但是对于常见的服务端业务来说,还需要额外引入阻塞当前线程的操作(因为大部分的server代码还是单线程模型),倘若完全不用线程切换在服务端强行引入,可能会得不偿失。个人更推荐Java8的CompletableFuture。

参考

转载于:https://juejin.im/post/5c3bf302f265da615d72d96a

你可能感兴趣的文章
本地推送
查看>>
Beta 冲刺 (7/7)
查看>>
区块链实现简单的电商交易(以太坊)
查看>>
VMware报错:"激活连接失败:No suitable device found for this connection."
查看>>
maven设置
查看>>
个人考场VIM配置
查看>>
adobe
查看>>
微信小程序中的分享事件
查看>>
HDU 6069 Counting Divisors【区间素筛】【经典题】【好题】
查看>>
使用HAXM为QEMU for Windows加速
查看>>
配置tomcat下war包可以自压缩
查看>>
idea中artifacts、facets、modules是什么意思?
查看>>
大数据下的Distinct Count(一):序
查看>>
android 打包
查看>>
FUCKED-BUG之临时对象的生死
查看>>
一句话开启XP_CMDSHELL
查看>>
【100题】第四十五题 雅虎面试两道题(矩阵判断、数组划分)
查看>>
MySQL基础知识
查看>>
HTML页面优化
查看>>
centos6下安装docker
查看>>