组合式异步编程

  • Future 接口
    它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不再需要呆呆等待耗时的操作完成。
    例如:使用Future以异步的方式执行一个耗时的操作

    ExecutorService executor = Executors.newCachedThreadPool();//创建ExecutorService,通过这个就可以向线程池提交任务
    Future<Double> future = executor.submit(new Callable<Double>() {//向ExecutorService提交一个Callable对象
                public Double call() {
                      return doSomeLongComputation();//以异步的方式在新的线程中执行耗时操作
                }});
    doSomethingElse();//异步操作进行的同时,可以做其他事情
    try {
       Double result = future.get(1, TimeUnit.SECONDS);//获取异步操作的结果,如果最终被阻塞,无法得到结果,那么在最多等待1秒钟之后退出
        } catch (ExecutionException ee) {
       // 计算抛出一个异常
        } catch (InterruptedException ie) {
       // 当前线程在等待过程中被中断
        } catch (TimeoutException te) {
         // 在Future对象完成之前超过已过期
        } 
    
image.png
  • 局限性
  1. 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
  2. 等待Future集合中的所有任务都完成。
  3. 仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。
  4. 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
  5. 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。

  • 使用 CompletableFuture构建异步应用
    创建一个名为“最佳价格查询器”的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格。
    首先,商店应该声明依据指定产品名称返回价格的方法:

    public class Shop {
       public double getPrice(String product) {
         // 待实现(该方法的内部实现会查询商店的数据库,但也有可能执行一些其他耗时的任务,比如联系其他外部服务)
       }
    }
    

    利用delay方法模拟延迟1s:

    public double getPrice(String product) {
         return calculatePrice(product);
    }
    private double calculatePrice(String product) {
         delay();
         return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
    

    此时,在调用该方法时,依旧会被阻塞,未等待同步时间完成而等待1s;

    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread( () -> {//创建CompletableFuture 在另一个 对象,它会包含计算的结果
                    double price = calculatePrice(product);//在另一个 对象,它会包含计算的结果 线程中以异步方式执行计算
                    futurePrice.complete(price);//,结束completableFuture对象的运行,并设置变量的值
                     }).start();
                    return futurePrice;//无需等待还没结束的计算,直接返回Future对象
    }
    

    调用该方法:

    Shop shop = new Shop("BestShop");
    long start = System.nanoTime();
    Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
    long invocationTime = ((System.nanoTime() - start) / 1_000_000);
    System.out.println("Invocation returned after " + invocationTime
     + " msecs");
    // 执行更多任务,比如查询其他商店
    doSomethingElse();
    // 在计算商品价格的同时
    try {
         double price = futurePrice.get();//从Future对象中读取价格,如果价格未知,则发生阻塞
         System.out.printf("Price is %.2f%n", price);
    } catch (Exception e) {
         throw new RuntimeException(e);
    }
    long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
    System.out.println("Price returned after " + retrievalTime + " msecs"); 
    

    结果可能如下:

    Invocation returned after 43 msecs//异步方法调用返回远远早于最终价格计算完成的时间
    Price is 123.26
    Price returned after 1045 msecs 
    
  • 正确地管理异步任务执行过程中可能出现的错误
    如果价格计算过程中产生了错误,用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法返回结果的客户端永久地被阻塞。

    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread( () -> {
                          try {
                               double price = calculatePrice(product);//如果价格正常计算结束,完成Future操作并设置商品价格
                               futurePrice.complete(price);
                           } catch (Exception ex) {
                               futurePrice.completeExceptionally(ex);//否则就抛出导致失败的异常,完成这次Future操作
                           }
        }).start();
       return futurePrice;
    }
    

    此时,客户端会得到如下ExecutionException

    java.util.concurrent.ExecutionException: java.lang.RuntimeException: product
         not available
       at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2237) 
       at lambdasinaction.chap11.AsyncShopClient.main(AsyncShopClient.java:14)
       ... 5 more
    Caused by: java.lang.RuntimeException: product not available
       at lambdasinaction.chap11.AsyncShop.calculatePrice(AsyncShop.java:36)
       at lambdasinaction.chap11.AsyncShop.lambda$getPrice$0(AsyncShop.java:23)
       at lambdasinaction.chap11.AsyncShop$$Lambda$1/24071475.run(Unknown Source)
       at java.lang.Thread.run(Thread.java:744) 
    

  • 使用工厂方法supplyAsync创建CompletableFuture

    public Future<Double> getPriceAsync(String product) {
         return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    } 
    

    supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行线程执行生产者方法。


  • 避免阻塞
    例如一个商家列表,需要实现一个方法,接受产品名作为参数,返回一个字符串列表,这个字符串列表中包括商店的名称、该商店中指定商品的价格:

    List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
                                     new Shop("LetsSaveBig"),
                                     new Shop("MyFavoriteShop"),
                                     new Shop("BuyItAll")); 
    public List<String> findPrices(String product);
    

    使用

    public List<String> findPrices(String product) {
       List<CompletableFuture<String>> priceFutures =
             shops.stream()
                  .map(shop -> CompletableFuture.supplyAsync(//使用CompletableFuture以异步方式计算每种商品的价格
                               () -> shop.getName() + " price is " +
                               shop.getPrice(product)))
                  .collect(Collectors.toList());
    //注意CompletableFuture类中的join方法和Future接口中的get有相同的含义,并且也声明在Future接口中,它们唯一的不同是join不会抛出任何检测到的异常。
       return priceFutures.stream()
                          .map(CompletableFuture::join)
                          .collect(toList());
    } 
    

    这里使用两个不同的Stream流水线,若是在一个流水线上一个接一个放置两个map操作,由于流操作之间的延迟特性,如果你在单一流水线中处理流,发向不同商家的请求只能以同步、顺序执行的方式才会成功。因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定商家的动作、通知join方法返回计算结果。

image.png
  • 使用定制的执行器
    如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。

    private final Executor executor =
            Executors.newFixedThreadPool(Math.min(shops.size(), 100),//创建一个线程池,线程池中线程的数目为100和商店数目二者中较小的一个值
                                         new ThreadFactory() {
               public Thread newThread(Runnable r) {
                     Thread t = new Thread(r);
                     t.setDaemon(true);//使用守护线程——这种方式不会阻止程序的关停
                     return t;
               }
    });
    

    Java程序无法终止或者退出一个正在运行中的线程,所以最后剩下的那个线程会由于一直等待无法发生的事件而引发问题。与此相反,如果将线程标记为守护进程,意味着程序退出时它也会被回收。这二者之间没有性能上的差异。

    CompletableFuture.supplyAsync(() -> shop.getName() + " price is " +
                                        shop.getPrice(product), executor); 
    

  • 对多个异步任务进行流水线操作
    假设所有的商店都同意使用一个集中式的折扣服务。该折扣服务提供了五个不同的折
    扣代码,每个折扣代码对应不同的折扣率。

    public class Discount {
       public enum Code {
           NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
    
           private final int percentage;
    
           Code(int percentage) {
             this.percentage = percentage;
           }
       }
      
    public static String applyDiscount(Quote quote) {//Qoute解析getPrice返回字符串
         return quote.getShopName() + " price is " +
                 Discount.apply(quote.getPrice(),
                                quote.getDiscountCode());
       }
    private static double apply(double price, Code code) {
         delay();
         return format(price * (100 - code.percentage) / 100);
       } 
    } 
    

    getPrice现在以ShopName:price:DiscountCode的格式返回一个String类型的值

    public String getPrice(String product) {
           double price = calculatePrice(product);
           Discount.Code code = Discount.Code.values()[
                                   random.nextInt(Discount.Code.values().length)];
           return String.format("%s:%.2f:%s", name, price, code);
    }
    private double calculatePrice(String product) {
           delay();
           return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
    
    • 构造异步和同步操作:


      image.png

    由于在进行字符串解析(parse)时,不涉及远程操作,也不进行I/O操作,,几乎可以在第一时间内完成,所以能够采用同步操作,不会带来太多延迟

    thenCompose方法允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。当创建两个CompletableFutures对象,对第一个CompletableFuture对象调用 thenCompose,并向其传递一个函数。当第一个CompletableFuture执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个CompletableFuture的返回做输入计算出的第二个CompletableFuture。对象使用这种方式,即使Future在向不同的商店收集报价,主线程还是能继续执行其他重要的操作。

image.png

若是两个不相干的的CompletableFuture对象的结果整合起来,而且你不是等到第一个任务完全结束才开始第二项任务,应该使用thenCombine,它接收名为BiFunction的第二参数,这个参数定义了当两个CompletableFuture对象完成计算后,结果如何合并。

image.png
image.png

  • 响应 CompletableFuturecompletion 事件
    重构findPrices方法返回一个由Future构成的流
  public Stream<CompletableFuture<String>> findPricesStream(String product) {
         return shops.stream()
                     .map(shop -> CompletableFuture.supplyAsync(
                               () -> shop.getPrice(product), executor))
                     .map(future -> future.thenApply(Quote::parse))
                     .map(future -> future.thenCompose(quote ->
                               CompletableFuture.supplyAsync(
                                         () -> Discount.applyDiscount(quote), executor)));
}

在每个CompletableFuture上注册一个操作,该操作会在CompletableFuture完成执行后使用它的返回值。Java 8的CompletableFuture通 过thenAccept方法提供了这一功能,它接收CompletableFuture执行完毕后的返回值做参数。

findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println)); 

由 于thenAccept方法已经定义了如何处理CompletableFuture返回的结果,一旦CompletableFuture计算得到结果,它就返回一个CompletableFuture<Void>。所以,map操作返回的是一个Stream<CompletableFuture<Void>>。对这个<CompletableFuture-<Void>>对象,只能等待其运行结束。

CompletableFuture[] futures = findPricesStream("myPhone")
    .map(f -> f.thenAccept(System.out::println))
    .toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join(); 
  1. allOf工厂方法接收一个由CompletableFuture构成的数组,数组中的所有CompletableFuture对象执行完成之后,它返回一个CompletableFuture<Void>对象。这意味着,如果需要等待Stream中的所有 CompletableFuture对象执行完毕,对 allOf方法返回的CompletableFuture执行join操作即可。

  2. anyof工厂方法接收一个CompletableFuture对象构成的数组,返回由第一个执行完毕的CompletableFuture对象的返回值构成的CompletableFuture<Object>


image.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容