博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RxJava源码学习
阅读量:3746 次
发布时间:2019-05-22

本文共 11226 字,大约阅读时间需要 37 分钟。

subscribe源码分析:

简单demo

创建被监听者

Observable
observable1 = Observable.create(new Observable.OnSubscribe
() { @Override public void call(Subscriber
subscriber) { subscriber.onNext("yaoyan"); subscriber.onCompleted(); } });

创建监听者

Subscriber
subscriber = new Subscriber
() { @Override public void onStart() { super.onStart(); System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName()); } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("s2 = " + s); } };

绑定

Subscription subscribe = observable1.subscribe(subscriber);

分析

public final Subscription subscribe(Subscriber
subscriber) { return Observable.subscribe(subscriber, this); }

这里面的subscriber就是观察者,this是被观察者

进入代码:

private static 
Subscription subscribe(Subscriber
subscriber, Observable
observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("observer can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber
(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks. try { // allow the hook to intercept and/or decorate hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); } catch (OnErrorNotImplementedException e2) { // special handling when onError is not implemented ... we just rethrow throw e2; } catch (Throwable e2) { // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. hook.onSubscribeError(r); // TODO why aren't we throwing the hook's return value. throw r; } return Subscriptions.unsubscribed(); } }

这里面主要做了三件事情

1.subscriber.onStart();
2.hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
3.return subscription;

  1. 调用了onStart方法,此方法可以做一些初始化的操作。
  2. hook.onSubscribeStart(observable, observable.onSubscribe)返回的是onSubscribe对象,该对象就是我们在创建被监听者时new的Observable.OnSubscribe对象,subscriber对象为我们
    observable1.subscribe(subscriber);
    传入进来的subscriber对象,到此为止就完成了一次回调。
  3. 返回subscription对象。

compose例子:

final Subscriber
subscriber2 = new Subscriber
() { @Override public void onCompleted() { System.out.println("MainActivity.onCompleted"); } @Override public void onError(Throwable e) { System.out.println("MainActivity.onError"); } @Override public void onNext(String course) { System.out.println("course222 = " + course); } }; //原始的OnSubscribe Observable.OnSubscribe
onSubscribe = new Observable.OnSubscribe
() { @Override public void call(Subscriber
subscriber) { System.out.println("subscriber = " + subscriber); subscriber.onNext(1111); subscriber.onCompleted(); } }; Observable.create(onSubscribe) .compose(new Observable.Transformer
() { @Override public Observable
call(Observable
integerObservable) { return integerObservable .flatMap(new Func1
>() { @Override public Observable
call(Integer integer) { return Observable.from(new String[]{ "cccc"}); } }); } }) .subscribe(subscriber2);

输出

course222 = cccc

flatMap源码分析:

简单demo

Student student=new Student();        student.setCourse(new Course("haha"));        Subscriber
subscriber = new Subscriber
() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Course course) { System.out.println("course = " + course); } }; Func1
> func1 = new Func1
>() { @Override public Observable
call(Student student) { System.out.println("MainActivity.call"); return Observable.from(student.getCourse()); } }; //原始的OnSubscribe Observable.OnSubscribe
onSubscribe = new Observable.OnSubscribe
() { @Override public void call(Subscriber
subscriber) { System.out.println("subscriber = " + subscriber); subscriber.onNext(student); subscriber.onCompleted(); } }; Observable.create(onSubscribe) .flatMap(func1) //这里用的是新创建的Observable .subscribe(subscriber);

flatMap(func1)

public final 
Observable
flatMap(Func1
> func) { if (getClass() == ScalarSynchronousObservable.class) { return ((ScalarSynchronousObservable
)this).scalarFlatMap(func); } return merge(map(func)); }

这里面会调用map(func)

public final 
Observable
map(Func1
func) { return lift(new OperatorMap
(func)); }

这里创建了一个OperatorMap,进入到构造方法:

private final Func1
transformer; public OperatorMap(Func1
transformer) { this.transformer = transformer; }

看到了将transformer这个变量赋值了,到此告一段落。

lift()
这个方法是最主要的方法,不管是map还是flatmap,最终都会进入到这个方法中来。 下面源码:

public final 
Observable
lift(final Operator
operator) { return new Observable
(new OnSubscribe
() { @Override public void call(Subscriber
o) { try { Subscriber
st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); onSubscribe.call(st); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling if (e instanceof OnErrorNotImplementedException) { throw (OnErrorNotImplementedException) e; } st.onError(e); } } catch (Throwable e) { if (e instanceof OnErrorNotImplementedException) { throw (OnErrorNotImplementedException) e; } // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } } }); }

在第二行返回了一个Observable对象,这里很关键,当我们demo中调用的subscribe(subscriber); 实际是这里返回的Observable对象,所以当我们开始订阅事件的时候,会进入到lift方法源码中的call方法。 这里面的onSubscribe.call(st);的onSubscribe对象实际是下面代码中的onSubscribe。

//原始的OnSubscribe        Observable.OnSubscribe
onSubscribe = new Observable.OnSubscribe
() { @Override public void call(Subscriber
subscriber) { System.out.println("subscriber = " + subscriber); subscriber.onNext(student); subscriber.onCompleted(); } };

上面代码中的public void call(Subscriber<? super Student> subscriber),subscriber对象就是下面新New的Subscriber对象

return new Subscriber
(o) { @Override public void onCompleted() { o.onCompleted(); } @Override public void onError(Throwable e) { o.onError(e); } @Override public void onNext(T t) { try { o.onNext(transformer.call(t)); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(OnErrorThrowable.addValueAsLastCause(e, t)); } } };

当调用subscriber.onNext(student),就会进入上面onNext方法

@Override            public void onNext(T t) {                try {                    o.onNext(transformer.call(t));                } catch (Throwable e) {                    Exceptions.throwIfFatal(e);                    onError(OnErrorThrowable.addValueAsLastCause(e, t));                }            }

transformer.call(t)

这时进入就会调用:

Func1
> func1 = new Func1
>() { @Override public Observable
call(Student student) { System.out.println("MainActivity.call"); return Observable.from(new Course[]{student.getCourse()}); } };

在这里面我们将Student的Course对象返回

再接着o.onNext(),这里面的o也是我们在demo中创建的subscriber对象,通过onNext方法将Course方法发送出去,接着我们就会在onNext方法中接收到:

Subscriber
subscriber = new Subscriber
() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Course course) { System.out.println("course = " + course); } };

转载地址:http://stbsn.baihongyu.com/

你可能感兴趣的文章
SpringCloud详细教程6-Zookeeper
查看>>
Freemarker使用mht制作导出word模板
查看>>
Freemarker使用xml写word模板-遇到的坑
查看>>
PyQt5基础用法ui转py后需要修改的地方
查看>>
Scanner类
查看>>
基本类型包装类
查看>>
System类常用方法
查看>>
Runtime类、Math类和Random类的常用方法
查看>>
数据处理类常用方法
查看>>
Collections和Character类 常用静态方法
查看>>
HTML之Javascript——BOM浏览器对象模型
查看>>
JAVA基础中的基础
查看>>
JDBC基础操作
查看>>
连接池
查看>>
Servlet的使用——重定向和转发
查看>>
JSP技术的使用——好像过时了唉。。。。。
查看>>
MVC模式概述
查看>>
Web之过滤器Filter
查看>>
JSON和AJAX
查看>>
web之监听器listener
查看>>