本文共 11226 字,大约阅读时间需要 37 分钟。
创建被监听者
Observableobservable1 = Observable.create(new Observable.OnSubscribe () { @Override public void call(Subscriber subscriber) { subscriber.onNext("yaoyan"); subscriber.onCompleted(); } });
创建监听者
Subscribersubscriber = new Subscriber () { @Override public void onStart() { super.onStart(); System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName()); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("s2 = " + s); } };
绑定
Subscription subscribe = observable1.subscribe(subscriber);
public final Subscription subscribe(Subscriber subscriber) { return Observable.subscribe(subscriber, this); }
这里面的subscriber就是观察者,this是被观察者
进入代码:
private staticSubscription subscribe(Subscriber subscriber, Observable observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("observer can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber (subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks. try { // allow the hook to intercept and/or decorate hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); } catch (OnErrorNotImplementedException e2) { // special handling when onError is not implemented ... we just rethrow throw e2; } catch (Throwable e2) { // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. hook.onSubscribeError(r); // TODO why aren't we throwing the hook's return value. throw r; } return Subscriptions.unsubscribed(); } }
这里面主要做了三件事情
1.subscriber.onStart(); 2.hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); 3.return subscription;observable1.subscribe(subscriber);
传入进来的subscriber对象,到此为止就完成了一次回调。final Subscribersubscriber2 = new Subscriber () { @Override public void onCompleted() { System.out.println("MainActivity.onCompleted"); } @Override public void onError(Throwable e) { System.out.println("MainActivity.onError"); } @Override public void onNext(String course) { System.out.println("course222 = " + course); } }; //原始的OnSubscribe Observable.OnSubscribe onSubscribe = new Observable.OnSubscribe () { @Override public void call(Subscriber subscriber) { System.out.println("subscriber = " + subscriber); subscriber.onNext(1111); subscriber.onCompleted(); } }; Observable.create(onSubscribe) .compose(new Observable.Transformer () { @Override public Observable call(Observable integerObservable) { return integerObservable .flatMap(new Func1 >() { @Override public Observable call(Integer integer) { return Observable.from(new String[]{ "cccc"}); } }); } }) .subscribe(subscriber2);
输出
course222 = ccccStudent student=new Student(); student.setCourse(new Course("haha")); Subscribersubscriber = new Subscriber () { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Course course) { System.out.println("course = " + course); } }; Func1 > func1 = new Func1 >() { @Override public Observable call(Student student) { System.out.println("MainActivity.call"); return Observable.from(student.getCourse()); } }; //原始的OnSubscribe Observable.OnSubscribe onSubscribe = new Observable.OnSubscribe () { @Override public void call(Subscriber subscriber) { System.out.println("subscriber = " + subscriber); subscriber.onNext(student); subscriber.onCompleted(); } }; Observable.create(onSubscribe) .flatMap(func1) //这里用的是新创建的Observable .subscribe(subscriber);
flatMap(func1)
public finalObservable flatMap(Func1 > func) { if (getClass() == ScalarSynchronousObservable.class) { return ((ScalarSynchronousObservable )this).scalarFlatMap(func); } return merge(map(func)); }
这里面会调用map(func)
public finalObservable map(Func1 func) { return lift(new OperatorMap (func)); }
这里创建了一个OperatorMap,进入到构造方法:
private final Func1 transformer; public OperatorMap(Func1 transformer) { this.transformer = transformer; }
看到了将transformer这个变量赋值了,到此告一段落。
lift() 这个方法是最主要的方法,不管是map还是flatmap,最终都会进入到这个方法中来。 下面源码:public finalObservable lift(final Operator operator) { return new Observable (new OnSubscribe () { @Override public void call(Subscriber o) { try { Subscriber st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); onSubscribe.call(st); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling if (e instanceof OnErrorNotImplementedException) { throw (OnErrorNotImplementedException) e; } st.onError(e); } } catch (Throwable e) { if (e instanceof OnErrorNotImplementedException) { throw (OnErrorNotImplementedException) e; } // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } } }); }
在第二行返回了一个Observable对象,这里很关键,当我们demo中调用的subscribe(subscriber); 实际是这里返回的Observable对象,所以当我们开始订阅事件的时候,会进入到lift方法源码中的call方法。 这里面的onSubscribe.call(st);的onSubscribe对象实际是下面代码中的onSubscribe。
//原始的OnSubscribe Observable.OnSubscribeonSubscribe = new Observable.OnSubscribe () { @Override public void call(Subscriber subscriber) { System.out.println("subscriber = " + subscriber); subscriber.onNext(student); subscriber.onCompleted(); } };
上面代码中的public void call(Subscriber<? super Student> subscriber)
,subscriber对象就是下面新New的Subscriber对象
return new Subscriber(o) { @Override public void onCompleted() { o.onCompleted(); } @Override public void onError(Throwable e) { o.onError(e); } @Override public void onNext(T t) { try { o.onNext(transformer.call(t)); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(OnErrorThrowable.addValueAsLastCause(e, t)); } } };
当调用subscriber.onNext(student),就会进入上面onNext方法
@Override public void onNext(T t) { try { o.onNext(transformer.call(t)); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(OnErrorThrowable.addValueAsLastCause(e, t)); } }
transformer.call(t)
这时进入就会调用:Func1> func1 = new Func1 >() { @Override public Observable call(Student student) { System.out.println("MainActivity.call"); return Observable.from(new Course[]{student.getCourse()}); } };
在这里面我们将Student的Course对象返回
再接着o.onNext(),这里面的o也是我们在demo中创建的subscriber对象,通过onNext方法将Course方法发送出去,接着我们就会在onNext方法中接收到:
Subscribersubscriber = new Subscriber () { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Course course) { System.out.println("course = " + course); } };
转载地址:http://stbsn.baihongyu.com/