作者——閒魚技術鯤鳴
RxJava是Java對於反應式編程的一個實現框架,是一個基於事件的、提供實現強大且優雅的異步調用程序的代碼庫。18年以來,由淘寶技術部發起的應用架構升級項目,希望通過反應式架構、全異步化的改造,提升系統整體性能和機器資源利用率,減少網絡延時,資源的重複使用,併為業務快速創新提供敏捷的架構支撐。在閒魚的基礎鏈路諸如商品批量更新、訂單批量查詢等,都利用了RxJava的異步編程能力。
不過,RxJava是入門容易精通難,一不小心遍地坑。今天來一起看下RxJava的使用方式、基本原理、注意事項。
1.開始之前
讓我們先看下,使用RxJava之前,我們曾經寫過的回調代碼存在的痛點。
當我們的應用需要處理用戶事件、異步調用時,隨著流式事件的複雜性和處理邏輯的複雜性的增加,代碼的實現難度將爆炸式增長。比如我們有時需要處理多個事件流的組合、處理事件流的異常或超時、在事件流結束後做清理工作等,如果需要我們從零實現,勢必要小心翼翼地處理回調、監聽、併發等很多棘手問題。
還有一個被稱作“回調地獄”的問題,描述的是代碼的不可讀性。
Code 1.1
// 示例引自callbackhell.com
fs.readdir(source, function (err, files) {
if (err) {
console.log('Error finding files: ' + err)
} else {
files.forEach(function (filename, fileIndex) {
console.log(filename)
gm(source + filename).size(function (err, values) {
if (err) {
console.log('Error identifying file size: ' + err)
} else {
console.log(filename + ' : ' + values)
aspect = (values.width / values.height)
widths.forEach(function (width, widthIndex) {
height = Math.round(width / aspect)
console.log('resizing ' + filename + 'to ' + height + 'x' + height)
this.resize(width, height).write(dest + 'w' + width + '_' + filename, function(err) {
if (err) console.log('Error writing file: ' + err)
})
}.bind(this))
}
})
})
}
})
以上js代碼有兩個明顯槽點: 1.由於傳入的層層回調方法,代碼結尾出現一大堆的 }) ; 2. 代碼書寫的順序與代碼執行的順序相反:後面出現回調函數會先於之前行的代碼先執行。
而如果使用了RxJava,我們處理回調、異常等將得心應手。
2.引入RxJava
假設現在要異步地獲得一個用戶列表,然後將結果進行處理,比如展示到ui或者寫到緩存,我們使用RxJava後代碼如下:
Code 2.1
Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(@NotNull ObservableEmitter<Object> emitter) throws Exception {
System.out.println(Thread.currentThread().getName() + "----TestRx.subscribe");
List<UserDo> result = userService.getAllUser();
for (UserDo st : result) {emitter.onNext(st);}
}
});
Observable<String> map = observable.map(s -> s.toString());
// 創建訂閱關係
map.subscribe(o -> System.out.println(Thread.currentThread().getName() + "----sub1 = " + o)/*更新到ui*/);
map.subscribe(o -> System.out.println(Thread.currentThread().getName() + "----sub2 = " + o)/*寫緩存*/,
e-> System.out.println("e = " + e)),
()->System.out.println("finish")));
userService.getAllUser()是一個普通的同步方法,但是我們把它包到了一個Observable中,當有結果返回時,將user逐個發送至監聽者。第一個監聽者更新ui,第二個監聽者寫到緩存。並且當上遊發生異常時,進行打印;在事件流結束時,打印finish。
另外還可以很方便的配置上游超時時間、調用線程池、fallback結果等,是不是非常強大。
需要注意的是,RxJava代碼就像上面例子中看起來很容易上手,可讀性也很強,但是如果理解不充分,很容易出現意想不到的bug:初學者可能會認為,上面的代碼中,一個user列表返回後,每個元素會被異步地發送給兩個下游的觀察者,這兩個觀察者在各自的線程內打印結果。但事實卻不是這樣:userService.getAllUser()會被調用兩次(每當建立訂閱關係時方法getAllUser()都會被重新調用),而user列表被查詢出後,會同步的發送給兩個觀察者,觀察者也是同步地打印出每個元素。即sub1 = user1,sub1 = user2,sub1 = user3,sub2 = user1,sub2 = user2,sub2 = user3。
可見,如果沒有其他配置,RxJava默認是同步阻塞的!!!那麼,我們如何使用它的異步非阻塞能力呢,我們接著往下看。
Code 2.2
Observable
.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable fromCallable");
Thread.sleep(1000); // imitate expensive computation
return "event";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.map(i->{
System.out.println(Thread.currentThread().getName() + "----observable map");
return i;
})
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(Thread.currentThread().getName() + "----inputStr=" + str));
System.out.println(Thread.currentThread().getName() + "----end");
Thread.sleep(2000); // <--- wait for the flow to finish. In RxJava the default Schedulers run on daemon threads
我們用Observable.fromCallable()代替code2.1中最底層的Observable.create方法,來創建了一個Observable(即被觀察者)。fromCallable方法創建的是一個lazy的Observable,只有當有人監聽它時,傳入的代碼才被執行。(關於這一點,我們後面會講,這裡只是為了展示有很多種創建Observable的方式)。
然後通過subscribeOn(Schedulers.io())指定了被觀察者執行的線程池。observeOn(Schedulers.single())指定了下游觀察者(map方法實際也是一個觀察者)執行的線程池。map方法如同很多流式編程api一樣,將上游的每個元素轉化成另一個元素。最後又通過observeOn(Schedulers.newThread())制定了當前下游的觀察者,即最後的subscribe中傳入的觀察者(lambda方式)執行的線程池。
上面的代碼執行後,通過打印的線程名可以看出,被觀察者、map、觀察者均是不同的線程,並且,主線程最後的"end"會先執行,也就是實現了異步非阻塞。
3. 使用方式
本文不是RxJava的接口文檔,不會詳細介紹每個api,只是簡單講下一些常見或者特殊api,進一步闡述RxJava的能力。
3.1 基本組件
RxJava的核心原理其實非常簡單。可類比觀察者模式。Observable是被觀察者,作為數據源產生數據。Observer是觀察者,消費上游的數據源。
每個Observable可註冊多個Observer。但是默認情況下,每當有註冊發生時,Observable的生產方法subscribe都會被調用。如果想只生產一次,可以調用Observable.cached方法。
被觀察者Observable還有多個變體,如Single、Flowable。Single代表只產生一個元素的數據源。Flowable是支持背壓的數據源。通過背壓設計,下游監聽者可以向上遊反饋信息,可以達到控制發送速率的功能。
Observable和Observer是通過裝飾器模式層層包裝達到從而串聯起來。轉換API如map等,會創建一個新的ObservableMap(基層自Observable),包裝原始的Observable作為source,而在真正執行時,先做轉換操作,再發給下游的觀察者。
Scheduler是RxJava為多線程執行提供的支持類,它將可以將生產者或者消費者的執行邏輯包裝成一個Worker,提交到框架提供的公共線程池中,如Schedulers.io()、Schedulers.newThread()等。便於理解,可以將Schedulers類比做線程池,Worker類比做線程池中的線程。可以通過Observable.subscribeOn和Observable.observeOn分別制定被觀察者和觀察者執行的線程,來達到異步非阻塞。
RxJava核心架構圖如下:
3.2 轉換API
- map: 見Code 2.2,一對一轉換,如同很多流式編程api一樣,將上游的每個元素轉化成另一個元素
- flatMap: 一對多轉換,將上游的每個元素轉化成0到多個元素。類比Java8:Stream.flatMap內返回的是stream,Observerable.flatMap內返回的是Observerable。注意,本方法非常強大,很多api底層都是基於此方法。並且由於flatMap返回的多個Observerable是相互獨立的,可以基於這個特點,實現併發。
3.3 組合API
- merge:將兩個事件流合併成一個時間流,合併後的事件流的順序,與上流兩個流中元素到來的時間順序一致。
- zip: 逐個接收上游多個流的每個元素,並且一對一的組合起來,轉換後發送給下游。示例見code3.1
code 3.1
//第一個流每1秒輸出一個偶數
Observable<Long> even = Observable.interval(1000, TimeUnit.MILLISECONDS).map(i -> i * 2L);
//第二個流每3秒輸出一個奇數
Observable<Long> odd = Observable.interval(3000, TimeUnit.MILLISECONDS).map(i -> i * 2L + 1);
//zip也可以傳入多個流,這裡只傳入了兩個
Observable.zip(even, odd, (e, o) -> e + "," + o).forEach(x -> {
System.out.println("observer = " + x);
});
/* 輸出如下,可以看到,當某個流有元素到來時,會等待其他所有流都有元素到達時,才會合併處理然後發給下游
observer = 0,1
observer = 2,3
observer = 4,5
observer = 6,7
...
*/
代碼code 3.1看起來沒什麼問題,兩個流併發執行,最後用zip等待他們的結果。但是卻隱藏了一個很重要的問題:RxJava默認是同步、阻塞的!!當我們想去仿照上面的方式併發發送多個請求,最後用zip監聽所有結果時,很容易發先一個詭異的現象, code 3.2的代碼中,ob2的代碼總是在ob1執行之後才會執行,並不是我們預期的兩個請求併發執行。而打印出來的線程名也可以看到,兩個Single是在同一個線程中順序執行的!
code 3.2
// Single是隻返回一個元素的Observable的實現類
Single<String> ob1 = Single.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable 1");
TimeUnit.SECONDS.sleep(3);
return userService.queryById(1).getName();
});
Single<String> ob2 = Single.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable 2");
TimeUnit.SECONDS.sleep(1);
return userService.queryById(1).getName();
});
String s = Single.zip(ob1, ob2,
(e, o) -> {System.out.println(e + "++++" + o);
那為什麼code 3.1的兩個流能夠併發執行呢?閱讀源碼可以發現zip的實現其實就是先訂閱第一個流,再訂閱第二個流,那麼默認當然是順序執行。但是通過Observable.interval創建的流,默認會被提交到 Schedulers.computation()提供的線程池中。關於線程池,本文後面會講解。
3.4 創建API
- create :最原始的create和subscribe,其他創建方法都基於此
code 3.3
// 返回的子類是ObservableCreate
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("event");
emitter.onNext("event2");
emitter.onComplete();
}
});
// 訂閱observable
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(Thread.currentThread().getName() + " ,TestRx.onSubscribe");
}
@Override
public void onNext(String s) {
System.out.println(Thread.currentThread().getName() + " ,s = " + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + " ,TestRx.onComplete");
}
});
- just : Observable.just("e1","e2"); 簡單的創建一個Observable,發出指定的n個元素。
- interval:code 3.1已給出示例,創建一個按一定間隔不斷產生元素的Observable,默認執行在Schedulers.comutation()提供的線程池中
- defer:產生一個延遲創建的Observable。 有點繞:Observable.create等創建出來的被觀察者雖然是延遲執行的,只有有人訂閱的時候才會真正開始生成數據。但是創建Observable的方法卻是立即執行的。而 Observable.defer方法會在有人訂閱的時候才開始創建Observable。如代碼Code3.4
public String myFun() {
String now = new Date().toString();
System.out.println("myFun = " + now);
return now;
}
public void testDefer(){
// 該代碼會立即執行myFun()
Observable<String> ob1 = Observable.just(myFun());
// 該代碼會在產生訂閱時,才會調用myFun(), 可類比Java8的Supplier接口
Observable<String> ob2 = Observable.defer(() -> Observable.just(myFun()) );
}
- fromCallable :產生一個延遲創建的Observable,簡化的defer方法。Observable.fromCallable(() -> myFun()) 等同於Observable.defer(() -> Observable.just(myFun()) );
4.基本原理
4.1 Observable.create
見代碼code 3.3,create方法接收一個ObserverableOnSubscribe接口對象,我們定義了了發送元素的代碼,create方法返回一個ObserverableCreate類型對象(繼承自Observerable抽象類)。跟進create方法原碼,直接返回new出來的ObserverableCreate,它包裝了一個source對象,即傳入的ObserverableOnSubscribe。
code4.1
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//onAssembly默認直接返回ObservableCreate
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Create方法就這麼簡單,只需要記住它返回了一個包裝了source的Observerble。
4.2 Observerable.subscribe(observer)
看下code3.3中創建訂閱關係時(observalbe.subscribe)發生了什麼:
code4.2
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) {... } catch (Throwable e) {... }
}
Observable是一個抽象類,定義了subscribe這個final方法,最終會調用subscribeActual(observer);而subscribeActual是由子類實現的方法,自然我們需要看ObserverableCreate實現的該方法。
code4.3
//ObserverableCreate實現的subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent); //source是ObservableOnSubscribe,即我們寫的生產元素的代碼
} catch (Throwable ex) {...}
}
- 將觀察者observer包裝到一個CreateEmitter裡。
- 調用observer的onSubscribe方法,傳入這個emitter。
- 調用source(即生產代碼接口)的subscribe方法,傳入這個emitter。
第二步中,直接調用了我們寫的消費者的onSubscribe方法,很好理解,即創建訂閱關係的回調方法。
重點在第三步,source.subscribe(parent); 這個parent是包裝了observer的emitter。還記得source就是我們寫的發送事件的代碼。其中手動調用了emitter.onNext()來發送數據。那麼我們CreateEmitter.onNext()做了什麼
code4.4
public void onNext(T t) {
if (t == null) {...}
if (!isDisposed()) { observer.onNext(t); }
}
!isDisposed()判斷若訂閱關係還沒取消,則調用observer.onNext(t);這個observer就是我們寫的消費者,code 3.3中我們重寫了它的onNext方法來print接收到的元素。
以上就是RxJava最基本的原理,其實邏輯很簡單,就是在創建訂閱關係的時候,直接調用生產邏輯代碼,然後再生產邏輯的onNext中,調用了觀察者observer.onNext。時序圖如下。
顯然,最基本的原理,完全解耦了和異步回調、多線程的關係。
4.2 Observable.map
通過最簡答的map方法,看下轉換api做了什麼。
如Code2.1中,調用map方法,傳入一個轉換函數,可以一對一地將上游的元素轉換成另一種類型的元素。
code4.5
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
code4.5是Observable定義的final的map方法,可見map方法將this(即原始的observer)和轉換函數mapper包裝到一個ObservableMap中(ObservableMap也繼承Observable),然後返回這個ObservableMap(onAssembly默認什麼都不做)。
由於ObservableMap也是一個Observable,所以他的subscribe方法會在創建訂閱者時被層層調用到,subscribe是Observable定義的final方法,最終會調用到他實現的subscribeAcutal方法。
code4.6
//ObservableMap的subscribeActual
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
可以看到ObservableMap的subscribeActual中,將原始的觀察者t和變換函數function包裝到了一個新的觀察者MapObserver中,並將它訂閱到被觀察者source上。
我們知道,發送數據的時候,觀察者的onNext會被調用,所以看下MapObserver的onNext方法
code4.7
@Override
public void onNext(T t) {
if (done) {return; }
if (sourceMode != NONE) { actual.onNext(null);return;}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {...}
actual.onNext(v);
}
code4.7中可以看到mapper.apply(t)將變換函數mapper施加到每個元素t上,變換後得到v,最後調用actual.onNext(v)將v發送給下游觀察者actual(actual為code4.6中創建MapObserver時傳入的t)。
總結一下例如map之類的變換api的原理:
- map方法返回一個ObservableMap,包裝了原始的觀察者t和變換函數function
- ObservableMap繼承自AbstractObservableWithUpstream(它繼承自Observable)
- 訂閱發生時,observable的final方法subscribe()會調用實現類的subscribeActual
- ObservableMap.subscribeActual中創建MapObserver(包裝了原observer),訂閱到原Observable
- 發送數據onNext被調用時,先apply變換操作,再調用原observer的onNext,即傳給下游觀察者
4.3 線程調度
代碼Code 2.2中給出了線程調度的示例。subscribeOn(Schedulers.io())指定了被觀察者執行的線程池。observeOn(Schedulers.single())指定了下游觀察者執行的線程池。經過了上面的學習,很自然的能夠明白,原理還是通過裝飾器模式,將Observable和Observer層層包裝,丟到線程池裡執行。我們以observeOn()為例,見code4.8。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//observeOn(Scheduler) 返回ObservableObserveOn(繼承自Observable)
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
// Observable的subscribe方法最終會調用到ObservableObserveOn.subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
//創建一個ObserveOnObserver包裝了原觀察者、worker,把它訂閱到source(原observable)
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
- observeOn(Scheduler) 返回ObservableObserveOn
- ObservableObserveOn繼承自Observable
- 所以subscribe方法最終會調用到ObservableObserveOn重寫的subscribeActual方法
- subscribeActual返回一個ObserveOnObserver(是一個Observer)包裝了真實的observer和worker
根據Observer的邏輯,發送數據時onNext方法會被調用,所以要看下ObserveOnObserver的onNext方法:
code4.9
public void onNext(T t) {
if (done) { return; }
if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t);}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this); //this是ObserveOnObserver,他同樣實現了Runable
}
}
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal(); //最終會調用actual.onNext(v) , 即調用被封裝的下游觀察者,v是emmiter
}
}
- 最終生產者代碼中調用onNext時,會調用schedule方法
- schedule方法中,會提交自身(ObserveOnObserver)到線程池
- 而run方法會調用onNext(emmiter)
可見,RxJava線程調度的機制就是通過observeOn(Scheduler)將發送元素的代碼onNext(emmiter)提交到線程池裡執行。
5.使用注意
5.1 適用場景
並不是所有的IO操作、異步回調都需要使用RxJava來解決,比如如果我們只是一兩個RPC服務的調用組合,或者每個請求都是獨立的處理邏輯,那麼引入RxJava並不會帶來多大的收益。下面給出幾個最佳的適用場景。
- 處理UI事件
- 異步響應和處理IO結果
- 事件或數據 是由無法控制的生產者推送過來的
- 組合接收到的事件
下面給一個閒魚商品批量補數據的使用場景:
背景:算法推薦了用戶的一些商品,目前只有基礎信息,需要調用多個業務接口,補充用戶和商品的附加業務信息,如用戶頭像、商品視頻連接、商品首圖等。並且根據商品的類型不同,填充不同的垂直業務信息。
難點:1. 多個接口存在前後依賴甚至交叉依賴;2. 每個接口都有可能超時或者報錯,繼而影響後續邏輯;3.根據不同的依賴接口特點,需要單獨控制超時和fallback。整個接口也需要設置整體的超時和fallback。
方案:如果只是多個接口獨立的異步查詢,那麼完全可以使用CompletableFuture。但基於它對組合、超時、fallback支持不友好,並不適用於此場景。我們最終採用RxJava來實現。下面是大致的代碼邏輯。代碼中的HsfInvoker是阿里內部將普通HSF接口轉為Rx接口的工具類,默認運行到單獨的線程池中,所以能實現併發調用。
// 查找當前用戶的所有商品
Single<List<IdleItemDO>> userItemsFlow =
HSFInvoker.invoke(() -> idleItemReadService.queryUserItems(userId, userItemsQueryParameter))
.timeout(300, TimeUnit.MILLISECONDS)
.onErrorReturnItem(errorRes)
.map(res -> {
if (!res.isSuccess()) {
return emptyList;
}
return res.getResult();
})
.singleOrError();
//補充商品,依賴userItemsFlow
Single<List<FilledItemInfo>> fillInfoFlow =
userItemsFlow.flatMap(userItems -> {
if (userItems.isEmpty()) {
return Single.just(emptyList);
}
Single<List<FilledItemInfo>> extraInfo =
Flowable.fromIterable(userItems)
.flatMap(item -> {
//查找商品extendsDo
Flowable<Optional<ItemExtendsDO>> itemFlow =
HSFInvoker.invoke(() -> newItemReadService.query(item.getItemId(), new ItemQueryParameter()))
.timeout(300, TimeUnit.MILLISECONDS)
.onErrorReturnItem(errorRes)
.map(res -> Optional.ofNullable(res.getData()));
//視頻url
Single<String> injectFillVideoFlow =
HSFInvoker.invoke(() -> videoFillManager.getVideoUrl(item))
.timeout(100, TimeUnit.MILLISECONDS)
.onErrorReturnItem(fallbackUrl);
//填充首圖
Single<Map<Long, FrontCoverPageDO>> frontPageFlow =
itemFlow.flatMap(item -> {
...
return frontCoverPageManager.rxGetFrontCoverPageWithTpp(item.id);
})
.timeout(200, TimeUnit.MILLISECONDS)
.onErrorReturnItem(fallbackPage);
return Single.zip(itemFlow, injectFillVideoFlow, frontPageFlow, (a, b, c) -> fillInfo(item, a, b, c));
})
.toList(); //轉成商品List
return extraInfo;
});
//頭像信息
Single<Avater> userAvaterFlow =
userAvaterFlow = userInfoManager.rxGetUserAvaters(userId).timeout(150, TimeUnit.MILLISECONDS).singleOrError().onErrorReturnItem(fallbackAvater);
//組合用戶頭像和商品信息,一併返回
return Single.zip(fillInfoFlow, userAvaterFlow,(info,avater) -> fillResult(info,avater))
.timeout(300, TimeUnit.MILLISECONDS)
.onErrorReturn(t -> errorResult)
.blockingGet(); //最後阻塞式的返回
可以看到,通過引入RxJava,對於超時控制、兜底策略、請求回調、結果組合都能更方便的支持。
5.2 Scheduler線程池
RxJava2 內置多個 Scheduler 的實現,但是我們建議使用Schedulers.from(executor)指定線程池,這樣可以避免使用框架提供的默認公共線程池,防止單個長尾任務block其他線程執行,或者創建了過多的線程導致OOM。
5.3 CompletableFuture
當我們的邏輯比較簡單,只想異步調用一兩個RPC服務的時,完全可以考慮使用Java8提供的CompletableFuture實現,它相較於Future是異步執行的,也可以實現簡單的組合邏輯。
5.4 併發
單個Observable始終是順序執行的,不允許併發地調用onNext()。
code5.1
Observable.create(emitter->{
new Thread(()->emitter.onNext("a1")).start();
new Thread(()->emitter.onNext("a2")).start();
})
但是,每個Observable可以獨立的併發執行。
code5.2
Observable ob1 = Observable.create(e->new Thread(()->e.onNext("a1")).start());
Observable ob2 = Observable.create(e->new Thread(()->e.onNext("a2")).start());
Observable ob3 = Observable.merge(ob1,ob2);
ob3中組合了ob1和ob2兩個流,每個流是獨立的。(這裡需要注意,這兩個流能併發執行,還有一個條件是他們的發送代碼運行在不同線程,就如果code3.1和code3.2中的示例一樣,雖然兩個流是獨立的,但是如果不提交到不同的線程中,還是順序執行的)。
5.5 背壓
在 RxJava 2.x 中,只有 Flowable 類型支持背壓。當然,Observable 能解決的問題,對於 Flowable 也都能解決。但是,其為了支持背壓而新增的額外邏輯導致 Flowable 運行性能要比 Observable 慢得多,因此,只有在需要處理背壓場景時,才建議使用 Flowable。如果能夠確定上下游在同一個線程中工作,或者上下游工作在不同的線程中,而下游處理數據的速度高於上游發射數據的速度,則不會產生背壓問題,就沒有必要使用Flowable。關於Flowable的使用,由於篇幅原因,就不在本文闡述。
5.6 超時
強烈建議設置異步調用的超時時間,用timeout和onErrorReturn方法設置超時的兜底邏輯,否則這個請求將一直佔用一個Observable線程,當大量請求到來時,也會導致OOM。
6.結語
目前,閒魚的多個業務場景都採用RxJava做異步化,大大降低了開發同學的異步開發成本。同時在多請求響應組合、併發處理都有很好的性能表現。自帶的超時邏輯和兜底策略,在批量業務數據處理中能保證可靠性,是用戶流暢體驗的強力支撐。