如何实现异步调用

如何实现异步调用

同步调用是指调用方会被一直阻塞, 直到调用方收到结果。异步是指调用方不回阻塞。传统的socket网络请求就是一个同步调用。那么如何实现一个异步调用?

首先设计自己的异步调用的api
如果只想要进行异步调用,那只需一个线程池或者一个线程不断执行从main线程添加的任务即可,可以写成类似如下代码:

public class ThreadPoolTest2 {
    static List<Event> eventList = Collections.synchronizedList(new ArrayList<>());

    interface Event<V> {
        V doSomething();
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        new Thread(() -> {
            while (true) {
                for (int i = 0; i < eventList.size(); i++) {
                    Event event = eventList.get(i);
                    Object result = event.doSomething();
                    eventList.remove(event);
                }
            }
        }, "thread-1").start();
        doSomethingAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return 1;
        });
        System.out.println(Thread.currentThread().getName());
    }

    private static <T> void doSomethingAsync(Event<T> event) {
        eventList.add(event);
    }

}

如此一来确实可以异步执行部分代码,然而只是这样处理无法掌握异步任务的执行结果,所以需要doSomethingAsync函数能有一个返回值来获得异步任务的执行结果。其中java提供Future接口完美契合,Future一般用做为异步调用的返回值,他的接口设计如下:
Future是在juc下的一个接口,功能是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。最关键的是通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future配合线程池的用法如下:

ExecutorService executor = Executors.newCachedThreadPool();
//一个线程池,做了一些操作后,返回结果
Future<Integer> result = executor.submit(() -> {
      //do something
      return 1;
});

然而线程池功能很丰富导致源码量也很多且复杂,如果只想要最简单的异步功能,并不需要那么多代码,可以只考虑实现Future的代码(不考虑性能),类似如下:

public class AsyncDemo{
    static List<EventFutureImp> eventList = Collections.synchronizedList(new ArrayList<>());

    interface Event<V> {
        V doSomething();
    }

    interface EventFuture<V> extends Event<V>, Future<V> {

    }

    static class EventFutureImp<V> implements EventFuture<V> {
        Event<V> event;
        CountDownLatch countDownLatch;
        V result;

        public EventFutureImp(Event<V> event) {
            this.event = event;
            countDownLatch = new CountDownLatch(1);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean isCancelled() {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean isDone() {
            throw new UnsupportedOperationException();
        }

        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            throw new UnsupportedOperationException();
        }

        //-----以上方法不提供实现,只实现最简单的功能-------------
        @Override
        public V get() throws InterruptedException, ExecutionException {
            countDownLatch.await();
            return result;
        }

        @Override
        public V doSomething() {
            return event.doSomething();
        }

        public void done(V result) {
            this.result = result;
            countDownLatch.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        new Thread(() -> {
            while (true) {
                for (int i = 0; i < eventList.size(); i++) {
                    EventFutureImp event = eventList.get(i);
                    Object result = event.doSomething();
                    event.done(result);
                    eventList.remove(event);
                }
            }
        }, "thread-1").start();
        Future<Integer> future = doSomethingAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return 1;
        });
        System.out.println(Thread.currentThread().getName());
        System.out.println("result :" + future.get());
    }


    private static <T> Future<T> doSomethingAsync(Event<T> event) {
        EventFutureImp<T> eventFuture = new EventFutureImp<>(event);
        eventList.add(eventFuture);
        return eventFuture;
    }

}

逻辑图如下所示:

逻辑图.png

这样设计的一个关键是利用CountDownLatch的特性使得future.get()在未得到结果之前是阻塞的,而得到结果后又马上释放。

网络的异步调用

以上的异步调用其实是单进程内的异步调用,如果要实现一个网络的异步调用,那又比之前复杂了一些。

1. 自定义协议的异步调用

因为服务器的收发行为是可以自定义的,当发送的请求并不是先到达的回包又或者不是每个请求都有回包,这时候主要的问题在于,异步的发送一个网络请求后,并不知道请求的返回应该对应哪个请求。其实也可以通过自定义网络协议设计来解决这个问题:
需要在双方协议中设置requestIdresponseId,由调用方(server)指定requestId并且接受方收到请求后把responseId设置为和requestId一样的值,这样异步请求的调用方也能轻松得到与请求对应的返回包。
具体可以这样做:

  • 首先定义一个map:
 Map<String, Future> map = new ConcurrentHashMap<>();
  • 发送的的时候put:
        EventFutureImp future= new EventFutureImp ();
        map.put(requestId, future);
        //发送
        channel.writeAndFlush(request);
  • 在得到返回的时候remove:
        EventFutureImp future = map.remove(responseId);
        if (future!= null) {
            future .done(response);
        }

关键在于:请求的requestId和回包responseId是一样的,并且不同请求的requestId各自不同。

2. 其他网络协议异步调用

要为现在已有的一些数据库或者服务器实现一个异步调用又该如何做呢。其实很多协议并不会出现发送的请求并不是先到达的回包又或者不是每个请求都有回包这种情况,以http服务器为例,http没有类似requestId这样的字段但对同一个http连接http服务器总是顺序返回请求,那么如果要自己实现一个http异步请求的话,就可以按如下步骤:

  • 首先定义一个queue:
    static Queue<Future> queue = new ConcurrentLinkedDeque<>();
  • 发送的的时候add:
        EventFutureImp future= new EventFutureImp ();
        queue.add(future);
        //发送
        ...send()
  • 在得到返回的时候poll:
        EventFutureImp future = queue.poll(responseId);
        if (future!= null) {
            future .done(response);
        }

如此,由于http的顺序返回请求特性,异步请求的结果也不会发生错乱。

总结

异步的调用肯定能提升外部调用的速度,有时候能解决性能上的瓶颈,把资源利用最大化。但也增加了更多的临时对象以及线程切换的开销,同时也比同步编程模型更复杂,难调试。
参考博客: Java并发编程:Callable、Future和FutureTask

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342

推荐阅读更多精彩内容