RxJava 學習筆記(三)


我只是搬運工,本來想自己寫,但是理解和語言表達真心過不了關,
源碼方面,我詳細點介紹,
筆記么,好東西我就抄 給 Android 開發者的 RxJava 詳解

線程控制Scheduler

1)Scheduler的API

在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調用 subscribe(),就在哪個線程生產事件;在哪個線程生產事件,就在哪個線程消費事件。如果需要切換線程,就需要用到 Scheduler(調度器)。

  • Schedulers.immediate(): 直接在當前線程運行,相當於不指定線程。這是默認的Scheduler
  • Schedulers.newThread():總是啟用新線程,並在新線程執行操作。
  • Schedulers.io():I/O 操作(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的Scheduler。行為模式和newThread()差不多,區別在於 io()的內部實現是是用一個無數量上限的線程池,可以重用空閑的線程,因此多數情況下io()newThread()更有效率。不要把計算工作放在io()中,可以避免創建不必要的線程。
  • Schedulers.computation():計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation()中,否則 I/O 操作的等待時間會浪費 CPU。
  • 另外,Android還有一個專用的AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運行。

有了這幾個 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。

  • subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫做事件產生的線程。

  • observeOn(): 指定 Subscriber 所運行在的線程。或者叫做事件消費的線程。

Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});

這個其實可以配合compose 使用 下一篇介紹

變換 (也是個人感覺掌握了這個就算入門了)

1)API

  • map():事件對象的直接變換
Observable.just("images/logo.png") // 輸入類型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 參數類型 String
return getBitmapFromPath(filePath); // 返回類型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 參數類型 Bitmap
showBitmap(bitmap);
}
});

這里出現了一個叫做 Func1的類。它和 Action1 非常相似,也是 RxJava 的一個接口,用於包裝含有一個參數的方法。Func1Action 的區別在於, Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣,FuncX 也有多個,用於不同參數個數的方法。FuncXActionX的區別在 FuncX 包裝的是有返回值的方法。

  • flatMap()
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);

從上面的代碼可以看出,flatMap()map() 有一個相同點:它也是把傳入的參數轉化之后返回另一個對象。但需要注意,和map()不同的是, flatMap() 中返回的是個Observable 對象,並且這個Observable 對象並不是被直接發送到了Subscriber的回調方法中。 flatMap() 的原理是這樣的:

  1. 使用傳入的事件對象創建一個 Observable對象;

  2. 並不發送這個 Observable, 而是將它激活,於是它開始發送事件;

  3. 每一個創建出來的Observable發送的事件,都被匯入同一個Observable ,而這個Observable負責將這些事件統一交給Subscriber的回調方法。這三個步驟,把事件拆成了兩級,通過一組新創建的Observable將初始的對象『鋪平』之后通過統一路徑分發了下去。而這個『鋪平』就是flatMap()所謂的 flat。

2) 變換的原理:lift()

這些變換雖然功能各有不同,但實質上都是針對事件序列的處理和再發送。而在 RxJava 的內部,它們是基於同一個基礎的變換方法: lift(Operator)。首先看一下 lift() 的內部實現(僅核心代碼):

  • 下面是 map() 方法的 內部實現 ,flatmap() 其實就是類似 最后也是用到 lift()
// 比如 String 轉為 Bitmap ,這樣 T 為 String R 為 Bitmap
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
// 看到沒有 ,return lift(new OperatorMap<String , Bitmap>) 注意類型
return lift(new OperatorMap<T, R>(func));
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { // <String,Bitmap>
// 看到沒有 返回的是 Observable<String> ,你可能會疑惑 為什么不是 Observable<Bitmap> ,
// 不是要String 轉為Bitmap嗎 ?
// 如果你是這樣想就錯了哦,我們這個是 對於map()的內部實現,他並沒有轉為Observalbe<Bitmap>,而是一個新的Observable<String>
// 這邊生成 了新的 被觀察者 Observable<String>
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
// ① hook.onLift其實就是直接返回operator
// 重寫這個方法 ,可以被修改,裝飾,或者只是取代作為傳送的返回,
// 然后調用 call方法 o的類型為 String
/* ②
public interface Func1<T, R> extends Function {
// Bitmap call(Stirng t) 這邊就是重點,就是在這邊他做了轉化,我們就是在這個方法里面 進行自定義的轉化
R call(T t);
}
*/

// 這邊生成了 第二個 新的 觀察者Subscriber<Bitmap>
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// 這邊調用 Subscriber<Bitmap> 的 onStart()
st.onStart();
// 在 Observable 執行了 lift(Operator) 方法之后,會返回一個新的 Observable,
// 這個新的 Observable 會像一個代理一樣,負責接收原始的 Observable 發出的事件,
// 並在處理后發送給 Subscriber --》 這邊是個Action1
onSubscribe.call(st);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
o.onError(e);
}
}
});
}

//*****************************************************************************************

final OnSubscribe<T> onSubscribe;

/**
* 其實 onSubscribe 是Action1 ,這就是為什么 我們的 觀察者可以直接使用 new Action1
* Action1<Subscriber<? super T>> 我驚呆了,他的類型是一個 Subscriber,
* 真J8巧妙,我之前還懷疑, 上面那段代碼 的onSubscribe.call(st); 難道只能是Action1,那為什么Sbucribe也可以?
* 所以東西還是要清楚,看下來,才知道,才能理解透徹一點
*/

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}

/**
* 其實 operator 是 Func1 , 這就是為什么 我們 map中是 new Func1
*/

public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
  • 這邊算是對 RxJava 學習筆記(二)訂閱部分的源碼補充吧
  • 在 被觀察者Observable 中, subscribe方法 有多種參數,結合上面的源碼這樣你應該可以理解為什么可以用onSubscribe.call(str)

public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new Subscriber<T>() {

@Override
public void onCompleted() {
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onNext(T t) {
observer.onNext(t);
}

});
}
// 就是這邊
public final Subscription subscribe(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}

return subscribe(new Subscriber<T>() {

@Override
public final void onCompleted() {
// do nothing
}

@Override
public final void onError(Throwable e) {
throw new OnErrorNotImplementedException(e);
}

@Override
public final void onNext(T args) {
onNext.call(args);
}

});
}

public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}

return subscribe(new Subscriber<T>() {

@Override
public final void onCompleted() {
// do nothing
}

@Override
public final void onError(Throwable e) {
onError.call(e);
}

@Override
public final void onNext(T args) {
onNext.call(args);
}

});
}


public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
if (onComplete == null) {
throw new IllegalArgumentException("onComplete can not be null");
}

return subscribe(new Subscriber<T>() {

@Override
public final void onCompleted() {
onComplete.call();
}

@Override
public final void onError(Throwable e) {
onError.call(e);
}

@Override
public final void onNext(T args) {
onNext.call(args);
}

});
}

文章開頭那篇文章,我之前看了很多次 在lift()這一塊,但是都沒有理解,現在自己做筆記,然后看源碼,雖然說不能百分百理解,但是也差不多了


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com