Netty异步回调模式-Future和Promise剖析

学习目标

  • 为什么了解Netty异步监听?
  • Netty如何实现异步监听的?

Future简介

我们知道Netty的I/O操作都是异步的,例如bind,connect,write等操作,会返回一个Future接口。Netty源码中大量使用了异步回调处理模式。在做Netty应用开发时,我们也会用到,所以,了解Netty的异步监听,无论是做Netty应用开发还是阅读源码都是十分重要的。

Future接口剖析

image

咱们通过上图的Echo Server的bind(...)方法来分析,返回一个ChannelFuture实例,点ChannelFuture接口,来看下类层次结构图:


future-2.png

可知:ChannelFuture继承了Netty的Future,Netty的Future继承了JDK的Future。

JDK的Future,我想大家应该非常熟悉了,用的最多的就是在线程池ThreadPoolExecutor时,通过submit方法提交任务返回一个Future实例,通过它来查询任务的执行状态和执行结果,最用常用的方法isDone()和get()。

Netty的Future,在继承JDK的Future基础上,扩展了自己方法,我们来看下:

public interface Future<V> extends java.util.concurrent.Future<V> {
    // 任务执行成功,返回true
    boolean isSuccess();
    // 任务被取消,返回true
    boolean isCancellable();
    // 支付执行失败,返回异常
    Throwable cause();
    // 添加Listener,异步非阻塞处理回调结果
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    // 移除Listener
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    // 同步阻塞等待任务结束;执行失败话,会将“异常信息”重新抛出来
    Future<V> sync() throws InterruptedException;
    Future<V> syncUninterruptibly();
    // 同步阻塞等待任务结束,和sync方法一样,只不过不会抛出异常信息
    Future<V> await() throws InterruptedException;
    Future<V> awaitUninterruptibly();
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    boolean await(long timeoutMillis) throws InterruptedException;
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);
    // 非阻塞,获取执行结果
    V getNow();
    // 取消任务
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

我们知道JDK的Future的任务结果获取需要主动查询,而Netty的Future通过添加监听器Listener,可以做到异步非阻塞处理任务结果,可以称为被动回调

同步阻塞有两种方式:sync()和await(),区别:sync()方法在任务失败后,会把异常信息抛出;await()方法对异常信息不做任何处理,当我们不关心异常信息时可以用await()。通过阅读源码可知sync()方法里面其实调的就是await()方法。推荐使用:sync()方法

// DefaultPromise 类

 @Override
public Promise<V> sync() throws InterruptedException {
    await();
    rethrowIfFailed();
    return this;
}

我们可以看到Future接口没有和IO操作关联在一起,接下来让我们来看看Future的子接口ChannelFuture,它和IO操作中的channel通道关联在一起了,用于异步处理channel事件,这个接口用的最多

public interface ChannelFuture extends Future<Void> {
    // 获取channel通道
    Channel channel();
    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelFuture sync() throws InterruptedException;
    @Override
    ChannelFuture syncUninterruptibly();
    @Override
    ChannelFuture await() throws InterruptedException;
    @Override
    ChannelFuture awaitUninterruptibly();
    // 标记Futrue是否为Void,如果ChannelFuture是一个void的Future,不允许调// 用addListener(),await(),sync()相关方法
    boolean isVoid();
}

ChannelFuture接口相比父类Future接口,就增加了channel()和isVoid()两个方法,其他方法都是覆盖父类接口,没有别扩展。

到这了,顺便了解下ChannelFuture的状态转换

* <pre>
 *                                      +---------------------------+
 *                                      | Completed successfully    |
 *                                      +---------------------------+
 *                                 +---->      isDone() = true      |
 * +--------------------------+    |    |   isSuccess() = true      |
 * |        Uncompleted       |    |    +===========================+
 * +--------------------------+    |    | Completed with failure    |
 * |      isDone() = false    |    |    +---------------------------+
 * |   isSuccess() = false    |----+---->      isDone() = true      |
 * | isCancelled() = false    |    |    |       cause() = non-null  |
 * |       cause() = null     |    |    +===========================+
 * +--------------------------+    |    | Completed by cancellation |
 *                                 |    +---------------------------+
 *                                 +---->      isDone() = true      |
 *                                      | isCancelled() = true      |
 *                                      +---------------------------+
 * </pre>

ChannelFuture就两种状态Uncompleted(未完成)和Completed(完成),Completed包括三种,执行成功,执行失败和任务取消。注意:执行失败和任务取消都属于Completed

让我们在来看下Future的另一个子接口Promise,它是个可写的Future,到底是写什么东西呢?

public interface Promise<V> extends Future<V> {
    // 执行成功,设置返回值,并通知所有listener,如果已经设置,则会抛出异常
    Promise<V> setSuccess(V result);
    // 设置返回值,如果已经设置,返回false
    boolean trySuccess(V result);
    // 执行失败,设置异常,并通知所有listener
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);
    // 标记Future不可取消
    boolean setUncancellable();

    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    @Override
    Promise<V> await() throws InterruptedException;
    @Override
    Promise<V> awaitUninterruptibly();
    @Override
    Promise<V> sync() throws InterruptedException;
    @Override
    Promise<V> syncUninterruptibly();
}
  • Future接口只提供了获取返回值的get()方法,不可设置返回值
  • Promise接口在Future基础上,还提供了设置返回值和异常信息,并立即通知listeners。而且,一旦 setSuccess(...) 或 setFailure(...) 后,那些 await() 或 sync() 的线程就会从等待中返回。

接下来,让我们来看看ChannelFuture的可写的子接口ChannelPromise:

public interface ChannelPromise extends ChannelFuture, Promise<Void> {
    // 覆盖ChannelFuture接口
    @Override
    Channel channel();
    // 覆盖Promise接口
    @Override
    ChannelPromise setSuccess(Void result);
    ChannelPromise setSuccess();
    boolean trySuccess();
    @Override
    ChannelPromise setFailure(Throwable cause);

    @Override
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelPromise sync() throws InterruptedException;
    @Override
    ChannelPromise syncUninterruptibly();
    @Override
    ChannelPromise await() throws InterruptedException;
    @Override
    ChannelPromise awaitUninterruptibly();
    ChannelPromise unvoid();
}

ChannelPromise接口只是综合了ChannelFuture和Promise接口,没有新增功能

截至到目前,异步监听相关的接口已经介绍完了,让我们通过一张图来概括下:


image

Promise的实现类

看看DefaultPromise的主要属性:

// setSuccess设置result为null时,设置成SUCCESS
private static final Object SUCCESS = new Object();
// 不可取消值
private static final Object UNCANCELLABLE = new Object();
// 取消异常信息持有者
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
        StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)"));
//执行结果,使用volatile修饰,保证可见性
private volatile Object result;
// 当Promise执行完成时需要通知Listener,此时就使用这个executor
private final EventExecutor executor;
// 要通知的listener,因为它可能是不同的类型,所以定义为Object类型,使用时再判断类型
private Object listeners;
// 要使用wait()/notifyAll()机制,这个变量记录了waiter的数量
private short waiters;
// 是否正在通知listener
private boolean notifyingListeners;

从属性中可以看出, DefaultPromise 通过 Object 的 wait/notify 机制实现线程间的同步,通过 volatile 属性保证线程间的可见性。

通过setSuccess(...)方法,来了解下 执行成功后设置返回值并通知Listener的过程:

@Override
public Promise<V> setSuccess(V result) {
    // 第一次成功设置返回值后,返回Promise对象
    if (setSuccess0(result)) {
        return this;
    }
    // 否则抛出异常
    throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
    // result为null,设置为SUCCESS
    return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
    // CAS设置result值,只有当result为null或者UNCANCELLABLE,才可以执行成功
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        // 唤醒等待的waiter,同时判断是否存在listener
        if (checkNotifyWaiters()) {
            // 通知所有的listener
            notifyListeners();
        }
        return true;
    }
    return false;
}
private synchronized boolean checkNotifyWaiters() {
    if (waiters > 0) {
        // 通知所有waiters
        notifyAll();
    }
    // 判断是否添加了监听者listeners
    return listeners != null;
}
private void notifyListeners() {
    EventExecutor executor = executor();
    // 是否是同一个线程
    if (executor.inEventLoop()) {
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        final int stackDepth = threadLocals.futureListenerStackDepth();
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
                notifyListenersNow();
            } finally {
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }
    // 用自己的executor执行
    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}

DefaultPromise的方法实现逻辑挺简单,不再全部讲解了,只需知道通过 Object 的 wait/notify 机制实现线程间的同步观察者设计模式进行通知就可以了。

DefaultPromise还有个子类DefaultChannelPromise,这个类在Netty中用的最多的,其内部逻辑调用的都是父类DefaultPromise的方法。DefaultChannelPromise类层次结构图如下:

image

应用实战

public class ChannelFuture_01 {

    public static void main(String[] args) throws InterruptedException {

        EventExecutor executor = new SelfEventExecutor();
        final Promise<String> promise = new DefaultPromise<String>(executor);


        promise.addListener(new GenericFutureListener<Future<? super String>>() {
            @Override
            public void operationComplete(Future<? super String> future) throws Exception {
                System.out.println("执行完成结果:" + future.get());
            }
        });


        final Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                    promise.setSuccess("hello world!");
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });
        thread.start();

        promise.sync();
        System.out.println("==================");
    }

    static class SelfEventExecutor extends SingleThreadEventExecutor {


        public SelfEventExecutor() {
            this(null);
        }

        public SelfEventExecutor(EventExecutorGroup parent) {
            this(parent, new DefaultThreadFactory(SelfEventExecutor.class));
        }

        public SelfEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory) {
            super(parent, threadFactory, true);
        }

        @Override
        protected void run() {
            Runnable task = takeTask();
            if (task != null) {
                task.run();
            }
        }
    }
}

Future总结

Netty的Future继承JDK的Future,通过 Object 的 wait/notify机制,实现了线程间的同步;使用观察者设计模式,实现了异步非阻塞回调处理。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容