RxJava連接操作符


RxJava系列教程:

1. RxJava使用介紹 【視頻教程】
2. RxJava操作符
  • Creating Observables(Observable的創建操作符) 【視頻教程】
  • Transforming Observables(Observable的轉換操作符) 【視頻教程】
  • Filtering Observables(Observable的過濾操作符) 【視頻教程】
  • Combining Observables(Observable的組合操作符) 【視頻教程】
  • Error Handling Operators(Observable的錯誤處理操作符) 【視頻教程】
  • Observable Utility Operators(Observable的輔助性操作符) 【視頻教程】
  • Conditional and Boolean Operators(Observable的條件和布爾操作符) 【視頻教程】
  • Mathematical and Aggregate Operators(Observable數學運算及聚合操作符) 【視頻教程】
  • 其他如observable.toList()、observable.connect()、observable.publish()等等; 【視頻教程】
3. RxJava Observer與Subcriber的關系 【視頻教程】
4. RxJava線程控制(Scheduler) 【視頻教程】
5. RxJava 並發之數據流發射太快如何辦(背壓(Backpressure)) 【視頻教程】


RxJava連接操作符

目錄

ConnectableObservable 和它的子類以及它們的操作符:

  • ConnectableObservable.connect() — 指示一個可連接的Observable開始發射數據
  • Observable.publish() — 將一個Observable轉換為一個可連接的Observable
  • Observable.replay() — 確保所有的訂閱者看到相同的數據序列,即使它們在Observable開始發射數據之后才訂閱
  • ConnectableObservable.refCount() — 讓一個可連接的Observable表現得像一個普通的Observable

一個可連接的Observable與普通的Observable差不多,除了這一點:可連接的Observable在被訂閱時並不開始發射數據,只有在它的connect()被調用時才開始。用這種方法,你可以等所有的潛在訂閱者都訂閱了這個Observable之后才開始發射數據。


Publish

Publish 操作符將普通的Observable轉換為可連接的Observable(ConnectableObservable)ConnectableObservable是Observable的子類。 可連接的Observable (connectable Observable)與普通的Observable差不多,不過它並不會在被訂閱時開始發射數據,而是直到使用了Connect操作符時才會開始,這樣可以更靈活的控制發射數據的時機。

注意:如果一個ConnectableObservable已經開始發射數據,再對其進行訂閱只能接受之后發射的數據,訂閱之前已經發射過的數據就丟失了。

publish

示例代碼

Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
//使用publish操作符將普通Observable轉換為可連接的Observable
ConnectableObservable<Long> connectableObservable = observable.publish();

//第一個訂閱者訂閱,不會開始發射數據
connectableObservable.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("1.onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("1.onError");
}
@Override
public void onNext(Long value) {
System.out.println("1.onNext value :"+ value);
}
});

//如果不調用connect方法,connectableObservable則不會發射數據
connectableObservable.connect();
//第二個訂閱者延遲2s訂閱,這將導致丟失前面2s內發射的數據
connectableObservable
.delaySubscription(2, TimeUnit.SECONDS)// 0、1數據丟失
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("2.onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("2.onError");
}
@Override
public void onNext(Long value) {
System.out.println("2.onNext value :"+ value);
}
});
//eclipse下運行加上下面代碼,Android Studio則不需要
Thread.sleep(6000);

輸出結果如下:

1.onNext value :0
1.onNext value :1
1.onNext value :2
2.onNext value :2
1.onNext value :3
2.onNext value :3
1.onNext value :4
2.onNext value :4
1.onNext value :5
2.onNext value :5

無論connect方法何時調用,只要被調用后所有的訂閱者都能發射數據。

Connect

RxJava中connect是ConnectableObservable接口的一個方法,使用publish操作符可以將一個普通的Observable轉換為一個ConnectableObservable。

調用ConnectableObservable的connect方法會讓它后面的Observable開始給發射數據給訂閱者。

connect方法返回一個Subscription對象,可以調用它的unsubscribe方法讓Observable停止發射數據給觀察者。

即使沒有任何訂閱者訂閱它,你也可以使用connect方法讓一個Observable開始發射數據(或者開始生成待發射的數據)。這樣,你可以將一個”冷”的Observable變為”熱”的。

示例代碼

Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
//使用publish操作符將普通Observable轉換為可連接的Observable
ConnectableObservable<Long> connectableObservable = observable.publish();
//開始發射數據,如果不調用connect方法,connectableObservable則不會發射數據
Subscription subscription = connectableObservable.connect();

//第二個訂閱者延遲2s訂閱,這將導致丟失前面2s內發射的數據
connectableObservable
.delaySubscription(2, TimeUnit.SECONDS)// 0、1數據丟失
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onNext(Long value) {
System.out.println("onNext value :"+ value);
}
});

//5秒后取消訂閱
Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted2");
subscription.unsubscribe();//取消訂閱
}
@Override
public void onError(Throwable e) {
System.out.println("onError2");
}
@Override
public void onNext(Long along) {
System.out.println("onNext2:"+along);
}
});

//eclipse下運行加上下面代碼,Android Studio則不需要
Thread.sleep(10000);

輸出結果如下:

onNext2:0
onNext2:1
onNext value :2
onNext2:2
onNext value :3
onNext2:3
onNext value :4
onNext2:4
onCompleted2

RefCount

RefCount操作符可以看做是Publish的逆向,它能將一個ConnectableObservable對象再重新轉化為一個普通的Observable對象,如果轉化后有訂閱者對其進行訂閱將會開始發射數據,后面如果有其他訂閱者訂閱,將只能接受后面的數據(這也是轉化之后的Observable 與普通的Observable的一點區別 )。

還有一個操作符叫share,它的作用等價於對一個Observable同時應用publish和refCount操作。

RefCount

示例代碼

//創建一個可連接的Observable
ConnectableObservable<Long> connectableObservable = Observable.interval(1, TimeUnit.SECONDS).take(6)
.publish();

connectableObservable.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted1.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError1: " + e.getMessage());
}

@Override
public void onNext(Long along) {
System.out.println("onNext1: " + along);
}
});

connectableObservable.delaySubscription(3, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted2.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError2: " + e.getMessage());
}

@Override
public void onNext(Long along) {
System.out.println("onNext2: " + along);
}
});

//如果不調用connect方法,connectableObservable則不會發射數據
connectableObservable.connect();

System.out.println("------after refCount()------");

Observable<Long> observable = connectableObservable.refCount();

observable.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted3.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError3: " + e.getMessage());
}

@Override
public void onNext(Long along) {
System.out.println("onNext3: " + along);
}
});

observable.delaySubscription(3, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted4.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError4: " + e.getMessage());
}

@Override
public void onNext(Long along) {
System.out.println("onNext4: " + along);
}
});
Thread.sleep(10000);

輸出結果如下:

------after refCount()------
onNext1: 0
onNext3: 0
onNext1: 1
onNext3: 1
onNext1: 2
onNext3: 2
onNext1: 3
onNext3: 3
onNext2: 3
onNext4: 3
onNext1: 4
onNext3: 4
onNext2: 4
onNext4: 4
onNext1: 5
onNext3: 5
onNext2: 5
onNext4: 5
onCompleted1.
onCompleted3.
onCompleted2.
onCompleted4.

由運行結果可以看出,RefCount操作符將一個Connectable Observable 對象重新轉化為一個普通的Observable對象,這時候訂閱者進行訂閱將會觸發數據的發射。

Replay

使用Replay操作符返回的ConnectableObservable 會緩存訂閱者訂閱之前已經發射的數據,這樣即使有訂閱者在其發射數據開始之后進行訂閱也能收到之前發射過的數據。Replay操作符能指定緩存的大小或者時間,這樣能避免耗費太多內存。

示例代碼:

//創建一個可連接的Observable
ConnectableObservable<Long> connectableObservable = Observable.interval(1, TimeUnit.SECONDS).take(5)
.publish();
//如果不調用connect方法,connectableObservable則不會發射數據
connectableObservable.connect();
connectableObservable.delaySubscription(3, TimeUnit.SECONDS)//延時訂閱
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted1.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError1: " + e.getMessage());
}

@Override
public void onNext(Long along) {
System.out.println("onNext1: " + along);
}
});

//創建一個可連接的Observable
ConnectableObservable<Long> connectableObservable2 = Observable.interval(1, TimeUnit.SECONDS).take(6)
.replay(1);//這里不在使用publish,replay(1)緩存1個數據

//如果不調用connect方法,connectableObservable則不會發射數據
connectableObservable2.connect();
connectableObservable2.delaySubscription(3, TimeUnit.SECONDS)//延時訂閱
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted2.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError2: " + e.getMessage());
}

@Override
public void onNext(Long along) {
System.out.println("onNext2: " + along);
}
});

//創建一個可連接的Observable
ConnectableObservable<Long> connectableObservable3 = Observable.interval(1, TimeUnit.SECONDS).take(6)
.replay(3, TimeUnit.SECONDS);//這里不在使用publish,replay(3, TimeUnit.SECONDS)緩存3s內的數據

//如果不調用connect方法,connectableObservable則不會發射數據
connectableObservable3.connect();
connectableObservable3.delaySubscription(3, TimeUnit.SECONDS)//延時訂閱
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted3.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError3: " + e.getMessage());
}

@Override
public void onNext(Long along) {
System.out.println("onNext3: " + along);
}
});

輸出結果如下:

onNext3: 0
onNext3: 1
onNext3: 2

onNext2: 2

onNext1: 3
onNext2: 3
onNext3: 3
onNext1: 4
onNext2: 4
onNext3: 4
onNext1: 5
onCompleted1.
onNext3: 5
onNext2: 5
onCompleted2.
onCompleted3.

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com