异步回调详解

简介

随着移动互联网的蓬勃发展,业务架构也随之变得错综复杂,业务系统越来越多。通常,我们处理方法是异步去调取这些接口。随着高并发系统越来越多,异步回调模式也越来越重要。

问题就来了,如何获取处理异步调用的结果呢 ?让我们一起来探讨一下吧~~

Java Future的异步回调

Callable接口

在聊Callable接口之前,先提一下Runnable接口。Runnable接口是在Java多线程中表示线程的业务代码的抽象接口。但是Runnable没有返回值,为了解决这个问题,Java定义了一个和Runnable类似的接口 --- Callable接口。并将业务处理方法名为call

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

Callable接口是一个范型接口,也声明为了函数式接口。唯一的抽象方法call有返回值,返回值类型为范型形参的实际类型

初探FutureTask类

故名思意,FutureTask类代表一个未来执行的任务,表示新线程执行的操作。同时也位于java.util.concurrent包中。源码如下:

public class FutureTask<V> implements RunnableFuture<V> {
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
}

FutureTask类就像一座搭在Callable实例与Thread线程实例之间的桥。FutureTask内部封装了一个Callable实例,然后自身又作为Thread线程的target。

Future接口

Future接口并不复杂,主要是对并发任务的执行及获取其结果的一些操作。主要有三大功能。

  • 判断并发任务是否执行完。
  • 获取并发的任务完成后的结果
  • 取消并发执行的任务
package java.util.concurrent;
public interface Future<V> {
  boolean cancel(boolean mayInterruptIfRunning);
  boolean isCancelled();
  boolean isDone();
  V get() throws InterruptedException, ExecutionException;
  V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

详细说法如下:

  • V get():获取并发任务执行的结果。这个方法是阻塞的,如果并发任务没有执行完成调用此方法的线程会一直阻塞
  • V get(Long timeout, TimeUtil unit):获取并发任务执行的结果。也是阻塞的,但是有阻塞的时间限制,如果阻塞时间超过设定的时间,该方法将会抛出异常
  • boolean isDone():获取并发任务的状态是否结束
  • boolean isCancelled():获取并发任务的取消状态。如果任务被取消返回true
  • boolean cancel(boolean mayInterruptIfRunning):取消并发任务的执行

再探FutureTask类

在FutureTask类中,有一个Callable的私有成员,Futuretask内部有一个run方法。这个run方法是Runable接口的抽象方法,在FutureTask类的内部提供了自己的实现。在Thread线程实例执行时,会将这个run方法作为target目标去异步执行。在FutureTask内部的run方法中实际是会执行Callable的call方法。

执行完后结果会保存在私有成员-- outcome属性中

private Object outcome; // non-volatile, protected by state reads/writes

outcome负责保存结果。然后FutureTask通过get方法获取这个object的值,那这个FutureTask的任务也就能成功完成了。

public void run() {
  if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                   null, Thread.currentThread()))
    return;
  try {
    Callable<V> c = callable;
    if (c != null && state == NEW) {
      V result;
      boolean ran;
      try {
        result = c.call();
        ran = true;
      } catch (Throwable ex) {
        result = null;
        ran = false;
        setException(ex);
      }
      if (ran)
        // 将result保存到outcome中
        set(result);
    }
  } finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    if (s >= INTERRUPTING)
      handlePossibleCancellationInterrupt(s);
  }
}

实例

package com.zou;

import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;


public class futureTask {
    public static void main(String[] args) throws Exception {
        FutureTask<Boolean> TaskA = new FutureTask<>(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println("A任务准备好了👌");
            } catch (Exception e) {
                System.out.println("A任务出问题了");
                return false;
            }
            System.out.println("A任务运行结束");
            return true;
        });

        FutureTask<Boolean> TaskB = new FutureTask<>(() -> {
            try {
                TimeUnit.SECONDS.sleep(4);
                System.out.println("B任务准备好了👌");
            } catch (Exception e) {
                System.out.println("B任务出问题了");
                return false;
            }
            System.out.println("B任务运行结束");
            return true;
        });

        Thread threadA = new Thread(TaskA);
        Thread threadB = new Thread(TaskB);

        threadA.start();
        threadB.start();

        Thread.currentThread().setName("主线程");
        try {
            boolean a = TaskA.get();
            boolean b = TaskB.get();
            isReady(a, b);
        } catch (Exception e) {
            System.out.println("发生了中断");
        }
        System.out.println("运行结束");
    }
    public static void isReady(boolean a, boolean b) {
        if (a && b) {
            System.out.println("都准备好了");
        } else if (!a) {
            System.out.println("A没准备好");
        } else {
            System.out.println("B没准备好");
        }
    }
}

运行结果为:

A任务准备好了👌
A任务运行结束
B任务准备好了👌
B任务运行结束
都准备好了
运行结束

要是将上面的代码跑一下会发现,这里的FutureTask类的get方法,异步获取结果的同时,主线程是阻塞的。所以可以将其归为异步阻塞模式。

异步阻塞的效率往往是比较低的,被阻塞的主线程不能干任何事。并没有实现非阻塞的异步结果获取方法。如果需要用到获取异步结果,则需要引入一些框架,先介绍一下Google的Guava框架

Guava的异步回调

Guava是Google提供的Java扩展包,提供一种异步回调的解决方案。相关的源码在com.google.common.util.concurrent包中。包中很多类,都是对java.util.concurrent能力的扩展和增强。比如Guava的异步任务接口ListenableFuture, 实现了非阻塞获取异步结果的功能。

对于异步回调,Guava主要做了以下增强:

  • 引入一个新的接口ListenableFuture,继承了Java的Future接口,使得Java的Future异步任务,在Guava中能被监控和获得非阻塞异步执行的结果。
  • 引入一个新的接口FutureCallback,这是一个独立的新接口。该接口的目的是在异步任务执行完成后,根据异步结果,完成不同的回调处理,并且可以处理回调结果

详解FutureCallback

FutureCallback是一个新增的接口,用来填写异步任务执行完后的监听逻辑。有两个回调方法:

  • onSuccess方法,在异步任务执行成功后回调;调用时,异步任务的执行结果作为onSuccess方法的参数传入
  • onFailure方法,在异步任务执行过程中,抛出异常时被回调;调用时异步任务所抛出的异常,作为onFailure方法的参数传入。

FutureCallback的源代码如下:

public interface FutureCallback<V> {
  /** Invoked with the result of the {@code Future} computation when it is successful. */
  void onSuccess(@Nullable V result);

  /**
   * Invoked when a {@code Future} computation fails or is canceled.
   *
   * <p>If the future's {@link Future#get() get} method throws an {@link ExecutionException}, then
   * the cause is passed to this method. Any other thrown object is passed unaltered.
   */
  void onFailure(Throwable t);
}

注意,Guava的FutureCallable与Java的Callable,名字相近,但实质不同,存在本质的区别:

  1. Java的Callable接口,代表的是一部执行的逻辑
  2. Guava的FutureCallback接口,代表的是Callable异步逻辑执行完之后,根据成功或者失败的两种情况的善后工作

那么问题来了,Guava如何实现异步任务Callable和FutureCallable结果回调之间的监控关系呢?Guava引入了一个新接口ListenableFuture,它继承了Java的Future接口,增强了监控能力。

详解ListenableFuture

GuavaListenableFuture接口是对Java的Future接口的扩展,可以理解为异步任务的实例。源代码如下:

public interface ListenableFuture<V> extends Future<V> {
  void addListener(Runnable listener, Executor executor);
}

ListenableFuture仅仅增加了一个方法 -- addListener方法。他的作用是将前一小节的FutureCallback善后回调工作,封装成一个内部的Runnable异步回调任务,在Callable异步任务完成后,回调FutureCallback进行善后处理

在实际编程中,如何将FutureCallback回调逻辑绑定到异步的ListenableFuture任务呢?可以使用Guaba的Futures工具类,他有一个addCallback静态方法,可以将FutureCallback的回调实例绑定到ListenableFuture异步任务。类似这样的绑定:

Futures.addCallback(ListenableFuture, new FutureCallback<Object>() {

  @Override
  public void onSuccess(@Nullable Object result) {

  }

  @Override
  public void onFailure(Throwable t) {

  }
}, executors);

同时,问题来了,Guava都是对Future异步任务的扩展,但是Guava的异步任务从何而来?

获取ListenableFuture异步任务

要获取Guava的ListenableFuture异步任务实例,主要是通过向线程池提交Callable任务的方式来获取。不过这里所说的线程池不是Java的线程池,而是Guava自己的线程池。

Guava线程池是对Java线程池的一种装饰,创建Guava线程池的方式如下:

ExecutorService jPool = Executors.newFixedThreadPool(10);
ListeningExecutorService Pool = MoreExecutors.listeningDecorator(jPool);

先创建一个Java的线程池,在作为Guava线程池的参数传进去得到Guava的线程池,然后我们通过subimt提交任务就可以获得ListenableFuture异步任务实例了

ListenableFuture<Boolean> task = Pool.submit(() -> {
    return true;
});

Futures.addCallback(task, new FutureCallback<Boolean>() {
    @Override
    public void onSuccess(@Nullable Boolean result) {
      
    }

    @Override
    public void onFailure(Throwable t) {

    }
}, jPool);

实例

package com.zou;

import com.google.common.util.concurrent.*;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class guava {
    public static void main(String[] args) throws Exception {
        // 创建 Guava线程池
        ExecutorService jPool = Executors.newFixedThreadPool(10);
        ListeningExecutorService Pool = MoreExecutors.listeningDecorator(jPool);

        // 获取ListenableFuture异步任务
        ListenableFuture<Boolean> task = Pool.submit(() -> {
            TimeUnit.SECONDS.sleep(2);
            System.out.println("副线程开始执行");
            return true;
        });
                
        // 做回调
        Futures.addCallback(task, new FutureCallback<Boolean>() {
            @Override
            public void onSuccess(@Nullable Boolean result) {
                System.out.println("执行成功");
            }

            @Override
            public void onFailure(Throwable t) {
                System.out.println("执行失败");
                t.printStackTrace();
            }
        }, jPool);

        TimeUnit.SECONDS.sleep(1);
        System.out.println("主线程执行完成");

    }
}

结果为:

主线程执行完成
副线程开始执行
执行成功

可以发现程序已经是异步非阻塞了。

Guava异步回调和Java的FutureTask异步回调,本质的不同在于;

  • Guava是非阻塞的异步回调,调用线程是不阻塞的,可以继续执行自己的业务逻辑
  • FutureTask是阻塞的异步回调,调用线程是阻塞的,在获取异步结果的过程中,一直阻塞,等待异步线程返回结果

Netty的异步回调

Netty官方文档中指出Netty的网络操作都是异步的。在Netty源码中,大量使用异步回调处理模式。在Netty的业务开发层面,Netty应用的Handler处理器中的业务处理代码,也都是异步执行的。所以,了解Netty的异步回调是很有必要而且很重要的。

同样,Netty继承和扩展了JDK Future系列异步回调的API,定义了自身的Future系列接口和类,实现了异步任务的监控,异步执行结果的获取。

总体来说,Netty对JavaFuture异步任务扩展如下:

  • 继承Java的Future接口,得到一个新的属于Netty自己的Future异步任务接口,该接口对原有的接口进行了增强,使得Netty异步任务能够以非阻塞的方式处理回调的结果。Netty没有修改Future的名称,只是调整了所在的包名
  • 引入了一个新接口 -- GenericFutureListener, 用于表示异步执行完的监听器。Netty使用了监听器模式,异步任务的执行完成后的回调逻辑抽象成了Listener监听器接口。可以将Netty的GenericFutureListener监听器接口加入Netty异步任务Future中,实现对异步任务执行状态的事件监听。

总体来说设计思路和Guava差不多。对应关系为:

  • Netty的Future接口,可以对应到Guava的ListenableFuture接口。
  • Netty的GenericFutureListener接口,可以对应到Guava的FutureCallback接口。

详解GenericFutureListener接口

前面提到,和Guava的FutureCallback一样,Netty新增了一个接口来封装异步非阻塞回调的逻辑 ----- 它就是GenericFutureListener接口。

GenericFutureListener位于io.netty.util.concurrent包中,源码如下:

public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future  the source {@link Future} which called this callback
     */
    void operationComplete(F future) throws Exception;
}

GenericFutureListener拥有一个回调方法:operationComplete,表示异步任务操作完成。在Future异步任务执行完成后,将回调此方法。

这里的EventListener是一个空接口,没有任何抽象方法,是一个仅仅具有表示作用的接口。

详解Netty的Future接口

Netty的future接口对一系列的方法做了扩展,对执行的过程进行了监控,对异步回调完成事件进行了监听。Netty的Future接口的源代码如下:

public interface Future<V> extends java.util.concurrent.Future<V> {
  // 增加异步任务是否完成的监听器
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
  // 移除异步任务是否执行完成的监听器
  Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
  .........
}

Netty的Future接口一般不会直接使用,而是会使用子接口。Netty有一系列的子接口,代表不同类型的异步任务,如ChannelFuture接口。

ChannelFuture子接口表示通道IO操作的异步任务;如果在通道的异步IO操作完成后,需要执行回调操作,就需要使用到ChannelFuture。

ChannelFuture的使用

在Netty网络编程中,网络连接通道的输入和输出处理都是异步进行的,都会返回一个ChannelFuture接口的实例。通过返回的异步任务实例,可以为它增加异步回调的监听器。在异步任务真正结束后,回调才执行。

Netty的网络连接的异步回调,实例代码如下:

Bootstrap bootstrap = new Bootstrap();
ChannelFuture connect = bootstrap.connect("localhost", 6666);

connect.addListener(new ChannelFutureListener() {
  @Override
  public void operationComplete(ChannelFuture future) throws Exception {
    if (future.isSuccess()) {
      // 成功
      System.out.println("yes");
    } else {
      // 失败
      System.out.println("exception");
      future.cause().printStackTrace();
    }
  }
});

GenericFutureListener接口在Netty中是一个基础类型接口。在网络编程的异步回调中,一般使用Netty中提供的某个子接口,如ChannelFutureListener接口。

总结

好啦,异步回调的部分基本就到这里了。随着高并发系统越来越多,异步回调模式也越来越重要。我们来回忆一下主要讲了那些异步回调吧~

Java自带的异步回调

  • Future作为接口,对应的FutureTask中的get方法作为结果的回调。但此是异步阻塞的

Guava异步回调

  • ListenableFuture作为接口,对应的FutureCallback做结果的异步回调。异步非阻塞

Netty异步回调

  • Future作为接口(和Java自带的同名不同包),对应的GenericFutureListener做结果的异步回调。异步非阻塞
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容