进行 “最佳价格查询器” 的开发

编程入门 行业动态 更新时间:2024-10-27 10:27:03

进行 “最佳价格<a href=https://www.elefans.com/category/jswz/34/1532071.html style=查询器” 的开发"/>

进行 “最佳价格查询器” 的开发

前置条件

public class Shop {private final String name;private final Random random;public Shop(String name) {this.name = name;random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));}public double getPrice(String product) {return calculatePrice(product);}private double calculatePrice(String product) {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return random.nextDouble() * product.charAt(0) + product.charAt(1);}
}

采用顺序查询所有商店的方式

// 采用顺序查询所有商店的方式
public List<String> findPricesSequential(String product) {return shops.stream().map(shop -> Thread.currentThread().getName() + shop.getName() + "-" + shop.getPrice(product)).collect(Collectors.toList());
}

使用并行流对请求进行并行操作

// 使用并行流对请求进行并行操作
public List<String> findPricesParallel(String product) {return shops.parallelStream().map(shop -> Thread.currentThread().getName() + shop.getName() + "-" + shop.getPrice(product)).collect(Collectors.toList());
}

使用CompletableFuture发起异步请求(使用内部通用线程池)

// 使用CompletableFuture发起异步请求
public List<String> findPricesFuture(String product) {List<CompletableFuture<String>> priceFutures =shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> Thread.currentThread().getName() + shop.getName() + "-" + shop.getPrice(product)))// 内部采用的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值。.collect(Collectors.toList());List<String> prices = priceFutures.stream().map(CompletableFuture::join) // 对List中的所有future对象执行join操作,一个接一个地等待它们运行结束.collect(Collectors.toList());return prices;
}

使用CompletableFuture发起异步请求(使用定制的执行器)

CompletableFuture类中的join方法和Future接口中的get有相同的含义,并且也声明在Future接口中,它们唯一的不同是join不会抛出任何检测到的异常。

private final Executor executor = Executors.newFixedThreadPool(shops.size(), ExecuterThreadFactoryBuilder.build("searcher-thread-%d"));// 使用CompletableFuture发起异步请求+使用定制的执行器
public List<String> findPricesFutureCustom(String product) {List<CompletableFuture<String>> priceFutures =shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> Thread.currentThread().getName() + shop.getName() + "-" + shop.getPrice(product), executor)).collect(Collectors.toList());List<String> prices = priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());return prices;
}

性能比较

笔者电脑是16线程,所以构造测试数据时16个线程任务是个门槛

private List<Shop> shops = new ArrayList<>();
{for (int i = 0; i < 64; i++) {shops.add(new Shop("LetsSaveBig3" + i));}System.out.println(shops.size());
}StopWatch stopWatch = new StopWatch("性能比较");
execute("sequential", () -> bestPriceFinder.findPricesSequential("myPhone27S"), stopWatch);
execute("parallelStream", () -> bestPriceFinder.findPricesParallel("myPhone27S"), stopWatch);
execute("CompletableFuture", () -> bestPriceFinder.findPricesFuture("myPhone27S"), stopWatch);
execute("CompletableFutureExecuter", () -> bestPriceFinder.findPricesFutureCustom("myPhone27S"), stopWatch);
StopWatchUtils.logStopWatch(stopWatch);private static void execute(String msg, Supplier<List<String>> s, StopWatch stopWatch) {stopWatch.start(msg);System.out.println(s.get());stopWatch.stop();
}
availableProcessors() = 164线程任务8线程任务16线程任务20线程任务24线程任务28线程任务32线程任务64线程任务
Sequential4035 ms8057 ms16108 ms20154 ms24131 ms28106 ms32196 ms64325 ms
parallelStream1005 ms1021 ms1022 ms2022 ms2013 ms2008 ms2012 ms4017 ms
CompletableFuture1008 ms1019 ms2022 ms2027 ms2016 ms2006 ms3017 ms5043 ms
CompletableFutureExecuter1012 ms1007 ms1019 ms1023 ms10191012 ms1020 ms1025 ms

线程池如何选择合适的线程数目

线程池中线程的数目取决于你预计你的应用需要处理的负荷,但是你该如何选择合适的线程数目呢?

如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。
如果线程的数目过少,处理器的一些核可能就无法充分利用。

《Java并发编程实战》作者 Brian Goetz 建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算:
N(threads) = N(CPU) * U(CPU) * (1 + W/C)
其中:
·N(CPU)是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()得到
·U(CPU)是期望的CPU利用率(该值应该介于0和1之间)
·W/C是等待时间与计算时间的比率

公式理解:
C / (C+W) = N(CPU) * U(CPU) / N(threads) → 计算时间占比 = 有效CPU在线程数中的占比

线程极限阈值数计算
你的应用99%的时间都在等待商店的响应,所以估算出的W/C比率为100。且CPU利用率是100%,则根据公式极限阈值为16*1*100=1600 ,即创建一个拥有1600个线程的线程池。

你的应用99%的时间都在等待商店的响应,所以估算出的W/C比率为100。这意味着如果你期望的CPU利用率是100%,你需要创建一个拥有1600个线程的线程池。实际操作中,如果你创建的线程数比商店的数目更多,反而是一种浪费,因为这样做之后,你线程池中的有些线程根本没有机会被使用。出于这种考虑,我们建议你将执行器使用的线程数,与你需要查询的商店数目设定为同一个值,这样每个商店都应该对应一个服务线程。不过,为了避免发生由于商店的数目过多导致服务器超负荷而崩溃,你还是需要设置一个上限,比如100个线程。代码清单如下所示。

private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), ExecuterThreadFactoryBuilder.build("searcher-thread-%d"));public List<String> findPricesFutureCustom(String product) {List<CompletableFuture<String>> priceFutures =shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> Thread.currentThread().getName() + "-" + shop.getName() + "-" + shop.getPrice(product), executor)).collect(Collectors.toList());List<String> prices = priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());return prices;
}
Processors=164线程任务(ms)816202428326410050010001600320040008000
Sequential40358057161082015424131281063219664325
parallelStream10051021102220222013200820124017711032240
CompletableFuture10081019202220272016200630175043705834177
newFixedThreadPool shops.size()101210071019102310191012102010251029108113651330240716623129
newFixedThreadPool min(shops.size(),100)109310435116101923243480658

public static ThreadFactory build(String nameFormat) {return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build();
}

注意,当前创建的是一个由守护线程构成的线程池。Java程序无法终止或者退出一个正在运行中的线程,所以最后剩下的那个线程会由于一直等待无法发生的事件而引发问题。如果将线程标记为守护进程,意味着程序退出时它也会被回收。这二者之间没有性能上的差异。

综上比较可知,CompletableFuture + Executer方式最高效。一般而言,这种状态会一直持续,直到商店的数目达到我们之前计算的 阈值 1600。这个例子证明了要创建更适合你的应用特性的执行器,利用CompletableFutures向其提交任务执行是个不错的主意。处理需大量使用异步操作的情况时,这几乎是最有效的策略。


并行——使用流还是CompletableFutures?
目前为止,你已经知道对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。
我们对使用这些API的建议如下。
1、如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
2、如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,你可以像前文讨论的那样,依据等待/计算,或者W/C的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。

参考

《Java8 实战》第11章 CompletableFuture:组合式异步编程

更多推荐

进行 “最佳价格查询器” 的开发

本文发布于:2023-11-15 16:14:16,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1602569.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:查询器   价格

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!