Netty 源码解析(三): Netty 的 Future 和 Promise

今天是猿灯塔“365篇原创计划”第三篇。

接下来的时间灯塔君持续更新Netty系列一共九篇

Netty 源码解析(一): 开始

Netty 源码解析(二): Netty 的 Channel

当前:Netty 源码解析(三): Netty 的 Future 和 Promise

Netty 源码解析(四): Netty 的 ChannelPipeline

Netty 源码解析(五): Netty 的线程池分析

Netty 源码解析(六): Channel 的 register 操作

Netty 源码解析(七): NioEventLoop 工作流程

Netty 源码解析(八): 回到 Channel 的 register 操作

Netty 源码解析(九): connect 过程和 bind 过程分析

今天呢!灯塔君跟大家讲:  

Netty 的 Future 和 Promise

Netty 中的异步编程: Future 和 Promise

Netty 中非常多的异步调用,所以在介绍更多 NIO 相关的内容之前,我们来看看它的异步接口是怎么使用的。

前面我们在介绍 Echo 例子的时候,已经用过了 ChannelFuture 这个接口了:

争取在看完本节后,读者能搞清楚上面的这几行划线部分是怎么走的。

关于 Future 接口,我想大家应该都很熟悉,用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在submit一个任务到线程池中的时候,返回的就是一个Future实例,通过它来获取提交的任务的执行状态和最终的执行结果,我们最常用它的isDone()和get()方法。

下面是 JDK  中的 Future 接口 java.util.concurrent.Future:

publicinterfaceFuture{

// 取消该任务

booleancancel(booleanmayInterruptIfRunning);

// 任务是否已取消

booleanisCancelled();

// 任务是否已完成

booleanisDone();

// 阻塞获取任务执行结果

Vget()throwsInterruptedException, ExecutionException;

// 带超时参数的获取任务执行结果

Vget(longtimeout, TimeUnit unit)

throwsInterruptedException, ExecutionException, TimeoutException

;

}

Netty 中的 Future 接口(同名)继承了 JDK 中的 Future 接口,然后添加了一些方法:

// io.netty.util.concurrent.Future

publicinterfaceFutureextendsjava.util.concurrent.Future{

// 是否成功

booleanisSuccess();

// 是否可取消

booleanisCancellable();

// 如果任务执行失败,这个方法返回异常信息

Throwablecause();

// 添加 Listener 来进行回调

FutureaddListener(GenericFutureListener> listener);

FutureaddListeners(GenericFutureListener>... listeners);

FutureremoveListener(GenericFutureListener> listener);

FutureremoveListeners(GenericFutureListener>... listeners);

// 阻塞等待任务结束,如果任务失败,将“导致失败的异常”重新抛出来

Futuresync()throwsInterruptedException;

// 不响应中断的 sync(),这个大家应该都很熟了

FuturesyncUninterruptibly();

// 阻塞等待任务结束,和 sync() 功能是一样的,不过如果任务失败,它不会抛出执行过程中的异常

Futureawait()throwsInterruptedException;

FutureawaitUninterruptibly();

booleanawait(longtimeout, TimeUnit unit)throwsInterruptedException;

booleanawait(longtimeoutMillis)throwsInterruptedException;

booleanawaitUninterruptibly(longtimeout, TimeUnit unit);

booleanawaitUninterruptibly(longtimeoutMillis);

// 获取执行结果,不阻塞。我们都知道 java.util.concurrent.Future 中的 get() 是阻塞的

VgetNow();

// 取消任务执行,如果取消成功,任务会因为 CancellationException 异常而导致失败

//      也就是 isSuccess()==false,同时上面的 cause() 方法返回 CancellationException 的实例。

// mayInterruptIfRunning 说的是:是否对正在执行该任务的线程进行中断(这样才能停止该任务的执行),

//       似乎 Netty 中 Future 接口的各个实现类,都没有使用这个参数

@Override

booleancancel(booleanmayInterruptIfRunning);

}

看完上面的 Netty 的 Future 接口,我们可以发现,它加了 sync() 和 await() 用于阻塞等待,还加了 Listeners,只要任务结束去回调 Listener 们就可以了,那么我们就不一定要主动调用 isDone() 来获取状态,或通过 get() 阻塞方法来获取值。

所以它其实有两种使用范式

顺便说下 sync() 和 await() 的区别:sync() 内部会先调用 await() 方法,等 await() 方法返回后,会检查下这个任务是否失败,如果失败,重新将导致失败的异常抛出来。也就是说,如果使用 await(),任务抛出异常后,await() 方法会返回,但是不会抛出异常,而 sync() 方法返回的同时会抛出异常。

我们也可以看到,Future 接口没有和 IO 操作关联在一起,还是比较纯净的接口。

接下来,我们来看 Future 接口的子接口 ChannelFuture,这个接口用得最多,它将和 IO 操作中的 Channel 关联在一起了,用于异步处理 Channel 中的事件。

publicinterfaceChannelFutureextendsFuture{

// ChannelFuture 关联的 Channel

Channelchannel();

// 覆写以下几个方法,使得它们返回值为 ChannelFuture 类型 

@Override

ChannelFutureaddListener(GenericFutureListener> listener);

@Override

ChannelFutureaddListeners(GenericFutureListener>... listeners);

@Override

ChannelFutureremoveListener(GenericFutureListener> listener);

@Override

ChannelFutureremoveListeners(GenericFutureListener>... listeners);

@Override

ChannelFuturesync()throwsInterruptedException;

@Override

ChannelFuturesyncUninterruptibly();

@Override

ChannelFutureawait()throwsInterruptedException;

@Override

ChannelFutureawaitUninterruptibly();

// 用来标记该 future 是 void 的,

// 这样就不允许使用 addListener(...), sync(), await() 以及它们的几个重载方法

booleanisVoid();

}

我们看到,ChannelFuture 接口相对于 Future 接口,除了将 channel 关联进来没有增加什么东西。还有个 isVoid() 方法算是不那么重要的存在吧。其他几个都是方法覆写,为了让返回值类型变为 ChannelFuture,而不是原来的 Future。

这里有点跳,我们来介绍下 Promise 接口,它和 ChannelFuture 接口无关,而是和前面的 Future 接口相关,Promise 这个接口非常重要。

Promise 接口和 ChannelFuture 一样,也继承了 Netty 的 Future 接口,然后加了一些 Promise 的内容:

publicinterfacePromiseextendsFuture{

// 标记该 future 成功及设置其执行结果,并且会通知所有的 listeners。

// 如果该操作失败,将抛出异常(失败指的是该 future 已经有了结果了,成功的结果,或者失败的结果)

PromisesetSuccess(V result);

// 和 setSuccess 方法一样,只不过如果失败,它不抛异常,返回 false

booleantrySuccess(V result);

// 标记该 future 失败,及其失败原因。

// 如果失败,将抛出异常(失败指的是已经有了结果了)

PromisesetFailure(Throwable cause);

// 标记该 future 失败,及其失败原因。

// 如果已经有结果,返回 false,不抛出异常

booleantryFailure(Throwable cause);

// 标记该 future 不可以被取消

booleansetUncancellable();

// 这里和 ChannelFuture 一样,对这几个方法进行覆写,目的是为了返回 Promise 类型的实例

@Override

PromiseaddListener(GenericFutureListener> listener);

@Override

PromiseaddListeners(GenericFutureListener>... listeners);

@Override

PromiseremoveListener(GenericFutureListener> listener);

@Override

PromiseremoveListeners(GenericFutureListener>... listeners);

@Override

Promiseawait()throwsInterruptedException;

@Override

PromiseawaitUninterruptibly();

@Override

Promisesync()throwsInterruptedException;

@Override

PromisesyncUninterruptibly();

}

可能有些读者对 Promise 的概念不是很熟悉,这里简单说两句。

我觉得只要明白一点,Promise 实例内部是一个任务,任务的执行往往是异步的,通常是一个线程池来处理任务。Promise 提供的 setSuccess(V result) 或 setFailure(Throwable t) 将来会被某个执行任务的线程在执行完成以后调用,同时那个线程在调用 setSuccess(result) 或 setFailure(t) 后会回调 listeners 的回调函数(当然,回调的具体内容不一定要由执行任务的线程自己来执行,它可以创建新的线程来执行,也可以将回调任务提交到某个线程池来执行)。而且,一旦 setSuccess(...) 或 setFailure(...) 后,那些 await() 或 sync() 的线程就会从等待中返回。

所以这里就有两种编程方式,一种是用 await(),等 await() 方法返回后,得到 promise 的执行结果,然后处理它;另一种就是提供 Listener 实例,我们不太关心任务什么时候会执行完,只要它执行完了以后会去执行 listener 中的处理方法就行。

接下来,我们再来看下ChannelPromise,它继承了前面介绍的 ChannelFuture 和 Promise 接口。

ChannelPromise 接口在 Netty 中使用得比较多,因为它综合了 ChannelFuture 和 Promise 两个接口:

/**

* Special {@linkChannelFuture} which is writable.

*/

publicinterfaceChannelPromiseextendsChannelFuture,Promise{

// 覆写 ChannelFuture 中的 channel() 方法,其实这个方法一点没变

@Override

Channelchannel();

// 下面几个方法是覆写 Promise 中的接口,为了返回值类型是 ChannelPromise

@Override

ChannelPromisesetSuccess(Void result);

ChannelPromisesetSuccess();

booleantrySuccess();

@Override

ChannelPromisesetFailure(Throwable cause);

// 到这里大家应该都熟悉了,下面几个方法的覆写也是为了得到 ChannelPromise 类型的实例

@Override

ChannelPromiseaddListener(GenericFutureListener> listener);

@Override

ChannelPromiseaddListeners(GenericFutureListener>... listeners);

@Override

ChannelPromiseremoveListener(GenericFutureListener> listener);

@Override

ChannelPromiseremoveListeners(GenericFutureListener>... listeners);

@Override

ChannelPromisesync()throwsInterruptedException;

@Override

ChannelPromisesyncUninterruptibly();

@Override

ChannelPromiseawait()throwsInterruptedException;

@Override

ChannelPromiseawaitUninterruptibly();

/**

* Returns a new {@linkChannelPromise} if {@link#isVoid()} returns {@codetrue} otherwise itself.

*/

// 我们忽略这个方法吧。

ChannelPromiseunvoid();

}

我们可以看到,它综合了 ChannelFuture 和 Promise 中的方法,只不过通过覆写将返回值都变为 ChannelPromise 了而已,没有增加什么新的功能

小结一下,我们上面介绍了几个接口,Future 以及它的子接口 ChannelFuture 和 Promise,然后是 ChannelPromise 接口同时继承了 ChannelFuture 和 Promise。

我把这几个接口的主要方法列在一起,这样大家看得清晰些:

接下来,我们需要来一个实现类,这样才能比较直观地看出它们是怎么使用的,因为上面的这些都是接口定义,具体还得看实现类是怎么工作的。

下面,我们来介绍下DefaultPromise这个实现类,这个类很常用,它的源码也不短,我们先介绍几个关键的内容,然后介绍一个示例使用。

首先,我们看下它有哪些属性:

publicclassDefaultPromiseextendsAbstractFutureimplementsPromise{

// 保存执行结果

privatevolatileObject result;

// 执行任务的线程池,promise 持有 executor 的引用,这个其实有点奇怪了

// 因为“任务”其实没必要知道自己在哪里被执行的

privatefinalEventExecutor executor;

// 监听者,回调函数,任务结束后(正常或异常结束)执行

privateObject listeners;

// 等待这个 promise 的线程数(调用sync()/await()进行等待的线程数量)

privateshortwaiters;

// 是否正在唤醒等待线程,用于防止重复执行唤醒,不然会重复执行 listeners 的回调方法

privatebooleannotifyingListeners;

......

}

可以看出,此类实现了 Promise,但是没有实现 ChannelFuture,所以它和 Channel 联系不起来。

别急,我们后面会碰到另一个类 DefaultChannelPromise 的使用,这个类是综合了 ChannelFuture 和 Promise 的,但是它的实现其实大部分都是继承自这里的 DefaultPromise 类的。

说完上面的属性以后,大家可以看下 setSuccess(V result) 、trySuccess(V result) 和 setFailure(Throwable cause) 、 tryFailure(Throwable cause) 这几个方法:


看出 setSuccess(result) 和 trySuccess(result) 的区别了吗?

上面几个方法都非常简单,先设置好值,然后执行监听者们的回调方法。notifyListeners() 方法感兴趣的读者也可以看一看,不过它还涉及到 Netty 线程池的一些内容,我们还没有介绍到线程池,这里就不展开了。上面的代码,在 setSuccess0 或 setFailure0 方法中都会唤醒阻塞在 sync() 或 await() 的线程

另外,就是可以看下 sync() 和 await() 的区别,其他的我觉得随便看看就好了。

@Override

publicPromisesync()throwsInterruptedException{

await();

// 如果任务是失败的,重新抛出相应的异常

rethrowIfFailed();

returnthis;

}

接下来,我们来写个实例代码吧:

 publicstaticvoidmain(String[] args){

// 构造线程池

EventExecutor executor =newDefaultEventExecutor();

// 创建 DefaultPromise 实例

Promise promise =newDefaultPromise(executor);

// 下面给这个 promise 添加两个 listener

promise.addListener(newGenericFutureListener>() {

@Override

publicvoidoperationComplete(Future future)throwsException{

if(future.isSuccess()) {

System.out.println("任务结束,结果:"+ future.get());

}else{

System.out.println("任务失败,异常:"+ future.cause());

}

}

}).addListener(newGenericFutureListener>() {

@Override

publicvoidoperationComplete(Future future)throwsException{

System.out.println("任务结束,balabala...");

}

});

// 提交任务到线程池,五秒后执行结束,设置执行 promise 的结果

executor.submit(newRunnable() {

@Override

publicvoidrun(){

try{

Thread.sleep(5000);

}catch(InterruptedException e) {

}

// 设置 promise 的结果

// promise.setFailure(new RuntimeException());

promise.setSuccess(123456);

}

});

// main 线程阻塞等待执行结果

try{

promise.sync();

}catch(InterruptedException e) {

}

}

运行代码,两个 listener 将在 5 秒后将输出:

任务结束,结果:123456

任务结束,balabala...

读者这里可以试一下 sync() 和 await() 的区别,在任务中调用 promise.setFailure(new RuntimeException()) 试试看。

上面的代码中,大家可能会对线程池 executor 和 promise 之间的关系感到有点迷惑。读者应该也要清楚,具体的任务不一定就要在这个 executor 中被执行。任务结束以后,需要调用 promise.setSuccess(result) 作为通知。

通常来说,promise 代表的 future 是不需要和线程池搅在一起的,future 只关心任务是否结束以及任务的执行结果,至于是哪个线程或哪个线程池执行的任务,future 其实是不关心的。

不过 Netty 毕竟不是要创建一个通用的线程池实现,而是和它要处理的 IO 息息相关的,所以我们只不过要理解它就好了。

这节就说这么多吧,我们回过头来再看一下这张图,看看大家是不是看懂了这节内容:

我们就说说上图左边的部分吧,虽然我们还不知道 bind() 操作中具体会做什么工作,但是我们应该可以猜出一二。

显然,main 线程调用 b.bind(port) 这个方法会返回一个 ChannelFuture,bind() 是一个异步方法,当某个执行线程执行了真正的绑定操作后,那个执行线程一定会标记这个 future 为成功(我们假定 bind 会成功),然后这里的 sync() 方法(main 线程)就会返回了。

如果 bind(port) 失败,我们知道,sync() 方法会将异常抛出来,然后就会执行到 finally 块了。

一旦绑定端口 bind 成功,进入下面一行,f.channel() 方法会返回该 future 关联的 channel。

channel.closeFuture() 也会返回一个 ChannelFuture,然后调用了 sync() 方法,这个 sync() 方法返回的条件是:有其他的线程关闭了 NioServerSocketChannel,往往是因为需要停掉服务了,然后那个线程会设置 future 的状态( setSuccess(result) 或 setFailure(cause) ),这个 sync() 方法才会返回。

这篇文章就到这里,希望大家对 Netty 中的异步编程有些了解,后续碰到源码的时候能知道是怎么使用的了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容