Retrofit2+rxjava2源码解析(二):rxjava2源码解析

上一篇讲了retrofit2的原理,这一篇咱们重点讲讲rxjava2的实现原理。不过呢,由于rxjava2博大精深,这里篇幅有限,而且精力有限,所以这里只讲以下几个点:
1.上游被观察者Observable的创建
2.subscribeOn如何初始化调度线程者
3.observeOn如何初始化调度线程者
4.下游观察者的创建,上游被观察者的订阅事件,subscribeOn如何调度上游被观察者线程以及observeOn如何调度下游观察者线程

        //第一步
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //log1
                LogUtil.d("subscribe : " + Thread.currentThread().getName());
                e.onNext("a");
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io())//第二步
        .observeOn(AndroidSchedulers.mainThread())//第三步
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                 //log2
                LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String str) {
                 //log3
                LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                 //log4
                LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                 //log5
                LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
            }
        });//第四步

一、上游被观察者Observable的创建:

首先进入Observable的create方法里面去:

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
public final class RxJavaPlugins {
    ...代码省略...
    /**
     * Calls the associated hook function.
     * @param <T> the value type
     * @param source the hook's input value
     * @return the value returned by the hook
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

    ...代码省略...
}

RxJavaPlugins.onAssembly 是个hook方法,在这里不做详细介绍,我们可以

RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))

直接看成

new ObservableCreate<T>(source)

就可以了。
下面很多地方会有类似的调用,都可以以这种形式看待,就不在说明了。
这里就返回了ObservableCreate的对象,然后把source传了进来,这里的source便是一开始new出来的ObservableOnSubscribe对象。

二、subscribeOn如何初始化调度线程者

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

1.我们可以看出此时返回的是一个ObservableSubscribeOn对象,参数一个是this说明是第一步new出来的ObservableCreate对象,这个就是当前对象里面的source变量,第二个参数是scheduler即Schedulers.io()。
2.那么Schedulers.io()到底是什么呢,一起去看看吧:

    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
public final class RxJavaPlugins {
    ...代码省略...

     /**
     * Calls the associated hook function.
     * @param defaultScheduler the hook's input value
     * @return the value returned by the hook
     */
    @NonNull
    public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
        Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
        if (f == null) {
            return defaultScheduler;
        }
        return apply(f, defaultScheduler);
    }

    ...代码省略...
}

其实这也是个hook方法,所以返回的就是IO。再来看下IO是个什么东西

    static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }

上图所知,这是个静态块,在程序加载的时候就会初始化出来

    /**
     * Calls the associated hook function.
     * @param defaultScheduler a {@link Callable} which returns the hook's input value
     * @return the value returned by the hook, not null
     * @throws NullPointerException if the callable parameter or its result are null
     */
    @NonNull
    public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
        ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
        Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
        if (f == null) {
            return callRequireNonNull(defaultScheduler);
        }
        return applyRequireNonNull(f, defaultScheduler);
    }

然后根据hook function可以得知返回的Scheduler就是
callRequireNonNull(defaultScheduler)返回的对象。这个defaultScheduler是个IOTask对象,看下面代码

    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }

    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

IOTask有个重写方法call,返回的是IoHolder.DEFAULT即IoScheduler。那么这个call方法是什么时候调用的呢,其实就是上面的callRequireNonNull方法里面调用。

所以可以得出Schedulers.io()获取到的就是IoSchedulers对象。

三、observeOn如何初始化调度线程者

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
  
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

1.可以看出来这里返回的是一个ObservableObserveOn对象,这里的this参数就是第二步的ObservableSubscribeOn对象,这个就是当前对象里面的source变量。而这个scheduler参数就是传入的AndroidSchedulers.mainThread(),看名字大概能猜出来这是个主线程调度者
2.那么就随我进入AndroidSchedulers的代码里一探究竟吧:

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
  ...代码省略...
}

mainThread方法里面返回的是MAIN_THREAD,而这个MAIN_THREAD是通过RxAndroidPlugins.initMainThreadScheduler返回的MainHolder.DEFAULT对象,也就是HandlerScheduler对象。这个对象传入了主线程的handler,用来调度线程。

四、下游观察者的创建,上游被观察者的订阅事件,subscribeOn如何调度上游被观察者线程以及observeOn如何调度下游观察者线程

最关键的一步来了,先上代码:

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

1.ObjectHelper.requireNonNull已经出现了很多次,其实就是为了判断是否为空用的
2.这里的observer就是传入的观察者,即刚刚在最外层new出来的observer。
3.这个时候进入subscribeActual方法,发现这是个抽象方法,那么是谁实现了这个方法呢。还记得我们现在的subscribe是谁调用的嘛,没错就是第三步返回的ObservableObserveOn对象,所以ObservableObserveOn实现了subscribeActual方法:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

这里的source就是之前步骤3.1保存的ObservableSubscribeOn对象,而observer就是最外层传入的观察者对象,由此可以看出观察者被内部类ObserveOnObserver进行了装饰。
w就是handler线程调度者创建的工作者主要用来将ObserveOnObserver(实现了runnable接口)通过handler发送到主线程上面去,从而实现线程调度
4.这个时候要调用source的subscribe了,跟ObservableObserveOn类似,之后进入到ObservableSubscribeOn的subscribeActual方法中:

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

这里的s就是步骤4.3new出来的ObserveOnObserver对象,这里将s装饰到SubscribeOnObserver对象中,然后调用ObserveOnObserver的onSubscribe方法 ↓↓↓

        @Override
        public void onSubscribe(Disposable s) {
                ...代码省略...
                actual.onSubscribe(this);
            }
        }

这里的actual其实就是最外层的观察者,然后就会调用最外层观察者的onSubscribe方法,此时由于并没有做任何的线程调度,所以当前的操作处于主线程中。因此log5出打出来的线程应该是主线程

好了,我们再回到subscribeActual方法中,此时调用scheduler.scheduleDirect方法,将SubscribeTask对象传入:

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

run就是传入SubscribeTask的对象,将SubscribeTask的对象装饰到DisposeTask中,
w是IO线程调度者创建的切换线程的实际工作者,此时调用createWorker就会执行IoScheduler的createWork方法new 出EventLoopWorker对象:

    @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

而w.schedule也就是执行了EventLoopWorker的schedule方法,将DisposeTask传入:

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }

接着调用threadWorker里面的scheduleActual方法:

   @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

因为之前传入的delayTime是0,所以此时会走 f = executor.submit((Callable<Object>)sr);这条线。然后我们看下executor是什么时候初始化的:

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    ...代码省略...
      /**
     * Creates a ScheduledExecutorService with the given factory.
     * @param factory the thread factory
     * @return the ScheduledExecutorService
     */
    public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        if (exec instanceof ScheduledThreadPoolExecutor) {
            ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
            POOLS.put(e, exec);
        }
        return exec;
    }

由此可以得出这个executor是ScheduledExecutorService对象,然后调用ScheduledExecutorService的submit方法:

   public Future<?> submit(Runnable task) {
        return schedule(task, 0, NANOSECONDS);
    }
...代码省略...
    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<Void> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit),
                                          sequencer.getAndIncrement()));
        delayedExecute(t);
        return t;
    }

这里先是ScheduledFutureTask的初始化,将command传入,然后调用其父类的构造方法->调用Executors.callable方法返回一个RunnableAdapter对象(将command传入构造方法内,赋值给task),然后赋值给callable对象。
然后调用delayedExecute以后就会执行ScheduledFutureTask的run方法,调用父类的run方法:

    public void run() {
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

接着就调用callable的call方法,这个callable就是上面提到的RunnableAdapter对象,调用RunnableAdapter的call方法->task.run,此处的task便是传入的command。
那么往前追溯到NewThreadWorker的schedule中,发现传入到ScheduledThreadPoolExecutor中的command是ScheduledRunnable对象,也就是说调用了ScheduledRunnable.run->DisposeTask.run->SubscribeTask.run->SubscribeOnObserver.run->source.subscribe(parent);
所以上面的一系列操作全部都在子线程中执行

SubscribeOnObserver里面的source是之前保存的ObservableSubscribeOn的ObservableCreate对象。
parent是装饰了下游ObservableObserveOn的内部类对象(装饰了外部观察者)的ObservableSubscribeOn的内部类,这句话比较拗口,多读几次理解一下。

5.这时候就执行到ObservableCreate的subscribeActual方法了:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

observer是装饰了下游ObservableSubscribeOn的内部类SubscribeOnObserver对象,该对象装饰了他下游ObservableObserveOn的内部类ObserveOnObserver对象(该对象装饰了外部观察者)如下图所示:


observer.png

source是一开始在创建ObservableCreate时就传入的外部被观察者对象
因此,最终source.subscribe(parent)会调用最外层被观察者的subscribe方法,因此log1打印出来的线程应该处于子线程中,

                //log1
                LogUtil.d("subscribe : " + Thread.currentThread().getName());
                e.onNext("a");
                e.onComplete();

e就是传入的CreateEmitter对象,调用createEmitter的onNext方法:

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

会依次调用observer.png所展示的对象的onNext方法,重点看下ObservableObserveOn.ObserveOnObserver的onNext方法:

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

worker就是上面步骤4.3里面调用HandlerScheduler.createWorker方法new出来的HandlerWorker对象,调用HandlerWorker的schedule方法:

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

将runnable封装到消息对象里面发送给主线程,主线程执行run方法,从而完成了线程调度,这时操作处理已经进入到了主线程当中执行ScheduledRunnable的run方法:

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                IllegalStateException ie =
                    new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                RxJavaPlugins.onError(ie);
                Thread thread = Thread.currentThread();
                thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
            }
        }

这里的delegate就是ObservableObserveOn.ObserveOnObserver对象,所以重新进入ObservableObserveOn.ObserveOnObserver代码的run方法中:

        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

run-> drainNormal->actual.onNext(v);
这里的actual便是最外层的观察者,所以观察者里面的log3,log4,log5都会是在主线程中打印出来。

接下来我们打印出来看看:

08-08 14:09:03.138 4928-4928/com.zkyl.zkyl D/(MainActivity.java:57): onSubscribe  onSubscribe : main
08-08 14:09:03.142 4928-4974/com.zkyl.zkyl D/(MainActivity.java:47): subscribe  subscribe : RxCachedThreadScheduler-1
08-08 14:09:03.162 4928-4928/com.zkyl.zkyl D/(MainActivity.java:62): onNext  onSubscribe : main
08-08 14:09:03.162 4928-4928/com.zkyl.zkyl D/(MainActivity.java:72): onComplete  onSubscribe : main

确实正如代码分析一般只有log1是处于子线程中,其他都处于主线程当中。

感想

rxjava2的设计实在是过于庞大,精髓远远不止这些。研究到这里已经非常累了,所以rxjava2的研究道路还是非常遥远漫长的

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,784评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,745评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,702评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,229评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,245评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,376评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,798评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,471评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,655评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,485评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,535评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,235评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,793评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,863评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,096评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,654评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,233评论 2 341

推荐阅读更多精彩内容